为了方便用户感知发送的情况,kafka producer中提供了
callback
机制,springkafka中实现了ProducerListener
遇到的问题
近期生产环境上由于版本BUG问题导致一段时间内某台broker出现假死的异常状态,在这异常期间内发送到该broker上的消息都失败了。在各个系统中我们实现了一个ProducerListener用于将发送失败的消息写入到Hbase中,方便后续处理。
然而,实际情况是Hbase中(与log中一致,排除写入Hbase的错误)的错误记录不全。
接下来以从上往下的层次顺序解析下Producer callback的运行机制,以及对Exception的处理
ProducerListener
首先看下KafkaTemplate的简化UML图
在doSend方法中为producer的send方法设置回调函数:protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
this.producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
if(KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(...);
}
} else {
if(KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(...,exception);
}
}
}
});
}
如果我们在创建KafkaTemplate时没有设置producerListener属性,那么就会使用默认的:LoggingProducerListener
。
KafkaProducer
之前对KafkaProducer的异步发送机制实现进行过分析,有一个sender线程用于协调消息队列及网络通信的工作,在运行过程中可能发生多种异常需要统一进行处理:private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
try {
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
this.sender.wakeup();
return result.future;
//API异常会调用callback的onCompletion方法
}catch (ApiException e) {
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
//其他异常直接抛出
} catch (Exception e){
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
interceptCallback
将KafkaTemplate注入的ProducerListener以及KafkaProducer配置的拦截器ProducerInterceptors
组合在一起,统一对发送的执行结果进行拦截处理。
通过append方法将拦截器配置到每个RecordBatch中public FutureRecordMetadata tryAppend( byte[] key, byte[] value, Callback callback) {
...
if (callback != null)
thunks.add(new Thunk(callback, future));
}
Thunk是内部类,将callback于FutureRecordMetadata捆绑在一起。在每个RecordBatch被处理完成的时候(3种情况:1.发送完成;2.失效;3.producer被强制关闭;
)会执行绑定的callback方法:public void done(long baseOffset, long timestamp, RuntimeException exception) {
for (int i = 0; i < this.thunks.size(); i++) {
Thunk thunk = this.thunks.get(i);
if (exception == null) {
RecordMetadata metadata = new RecordMetadata(...);
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
}
}
按理来说,这里已经覆盖了所有的情况,为什么现实使用中会有发送失败未记录下来的情况呢?