为了方便用户感知发送的情况,kafka producer中提供了
callback
机制,springkafka中实现了ProducerListener
遇到的问题
近期生产环境上由于版本BUG问题导致一段时间内某台broker出现假死的异常状态,在这异常期间内发送到该broker上的消息都失败了。在各个系统中我们实现了一个ProducerListener用于将发送失败的消息写入到Hbase中,方便后续处理。
然而,实际情况是Hbase中(与log中一致,排除写入Hbase的错误)的错误记录不全。
接下来以从上往下的层次顺序解析下Producer callback的运行机制,以及对Exception的处理
ProducerListener
首先看下KafkaTemplate的简化UML图
在doSend方法中为producer的send方法设置回调函数:
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { |
如果我们在创建KafkaTemplate时没有设置producerListener属性,那么就会使用默认的:LoggingProducerListener
。
KafkaProducer
之前对KafkaProducer的异步发送机制实现进行过分析,有一个sender线程用于协调消息队列及网络通信的工作,在运行过程中可能发生多种异常需要统一进行处理:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
interceptCallback
将KafkaTemplate注入的ProducerListener以及KafkaProducer配置的拦截器ProducerInterceptors
组合在一起,统一对发送的执行结果进行拦截处理。
通过append方法将拦截器配置到每个RecordBatch中
public FutureRecordMetadata tryAppend( byte[] key, byte[] value, Callback callback) { |
Thunk是内部类,将callback于FutureRecordMetadata捆绑在一起。在每个RecordBatch被处理完成的时候(3种情况:1.发送完成;2.失效;3.producer被强制关闭;
)会执行绑定的callback方法:
public void done(long baseOffset, long timestamp, RuntimeException exception) { |
按理来说,这里已经覆盖了所有的情况,为什么现实使用中会有发送失败未记录下来的情况呢?