从Kafka 0.8.2开始,发布了一套新的Java版的client api,KafkaProducer/KafkaConsumer,替代之前的scala版的api。
REF
Kafka: Producer (0.10.0.0)
Kafka源码分析 Producer客户端(超详细)
重要配置项说明
- linger.ms:这是在send的过程中人为设置的一个delay,目的是将尽可能多的消息放到一个batch中,这样能减少发送的次数;
- max.block.ms:如果buffer满了或者无法获取到metadata,那么将阻塞KafkaProducer.send()与partitionFor()这两个方法,为了防止无限等待,这里设置了一个最长等待时间,默认为60s,超出这个时间的消息就不会被发送了;
- request.timeout.ms:连接超时时间,设置为30s;
send
下面是KafkaProducer执行send操作的时序图:
采用的是一种Non-block的工作方式,提供一个callback,调用send后,可以继续发送消息而不用等待。当有结果返回时,callback会被自动通知执行。public Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
// 序列化key和value
byte[] serializedKey= keySerializer.serialize(record.topic(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());
// 根据key选择Partition
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
TopicPartition tp = new TopicPartition(record.topic(), partition);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
// 在每次追加一条消息到收集器之后,都要判断是否满了.如果满了,就执行一次Sender操作,通知Sender将这批数据发送到Kafka
if (result.batchIsFull || result.newBatchCreated) this.sender.wakeup();
return result.future;
}
RecordAccumulator
kafka消息的异步发送是依靠消息缓存实现的,RecordAccumulator扮演的就是这个角色,它创建了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>>
类型的缓存结构,每条消息会被放到对应的双端队列中。
append
public RecordAppendResult append(TopicPartition tp, |
- 为了实现并发,对每个队列的操作都放在同步块中操作
- 新创建了一个RecordBatch,或者当前的RecordBatch的buffer被填满了,那么就可以发车 (唤醒Sender)了
ready
找出每个topic-partition对应的leader,并判断发送条件是否成熟
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
if (leader == null) {
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
...
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
drain
在ready环节已经找出所有的ready nodes,那么接下来就将batchs中的RecordBatch按照node进行分组,方便后续的发送
Request && Response
下面要探究的是如何将RecordBatch转换成server端能理解的请求,以及收到server的响应后如何进行处理。
- 首先,将发往相同broker、topic-partition的RecordBatch组成一个ProduceRequest
- 根据ProduceRequest进而组成一个用于发送的RequestSend
- 在
handleProduceResponse
方法中实现了对ClientResponse的处理,在这里会对batch做done操作,细节在producer回调中有阐述。 - 根据上面的处理方法实现了回调方法,进而完成了
ClientRequest
的组建工作
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); |
回调函数是在NetworkClient层被调用的,具体的是client.poll()
中:for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
NetworkClient
NetworkClient是kafka客户端实现网络连接、数据读写的。是通过Java NIO实现的,并且实现了自己的Selector与KafkaChannel
连接
在Sender的run方法中创建ClientRequest之前需要先筛选出可连接的节点,这里调用的是NetworkClient的ready方法:if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);
然后调用Selector的connect方法:
开启一个SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);将创建的Channel注册在nioSelector上,监听事件为:
OP_CONNECT
- 创建KafkaChannel,并与brokerId绑定
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
因为实现的是非阻塞模式的,因此connect方法在连接成功建立之前就会返回的。为了确保连接成功,需要调用KafkaChannel的finishConnect方法
public boolean finishConnect() throws IOException { |
在连接成功后就注册了OP_READ
。为什么不在发送请求的时候注册嘞,因为一来响应什么时候到我们是无法预料的,二来有些请求并不要求响应。
发送
在组建完ClientRequest后,要调用Selector的send操作,被发送的主体是RequestSend
:public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
channel.setSend(send);
}
private KafkaChannel channelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
}
需要注意的是:
- 每个KafkaChannel上只有一个RequestSend,无法加塞,必须在之前的被发送掉后才能上车
- setSend的操作会注册
OP_WRITE
,然后在发送完成后,会取消掉OP_WRITE
//设置发送的主体 |
poll轮询
上面的setSend、write、send的操作如何组织起来呢,这就是poll的工作了。
简单来说就是nioSelector执行一个select操作,然后遍历SelectionKey,如果有我们感兴趣的操作,就对相应的Channel进行read、write之类的操作。
遍历获取有事件发生的Channel
Iterator<SelectionKey> iterator = selectionKeys.iterator(); |
read from KafkaChannel
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { |
key.isReadable()
只有在Channel监听Read事件,并且Channel有数据写入时才成立- 在完成读操作后,会将NetworkReceive添加到队列:
completedReceives
中write to KafkaChannel
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
不同于read操作时使用了while循环来确保读取完毕,write操作是一次性的,如果发送完毕,就去除
OP_WRITE
事件,并且将Send添加到队列completedSends
中;否则就将不取消,然后在后面的循环中继续执行写操作,直至写完为止
NetworkClient.poll
下面把目光拉回到NetworkClient中,来看看执行完selector的poll操作之后,是如何进一步处理的。
ClientRequest在被NetworkClient层执行发送操作的时候,会被添加到inFlightRequests
中,这个名字很形象——正在飞行中的请求,指代正在发送或者等待响应的请求的集合。该集合中将ClientRequest按照目的节点进行分组,相同目的节点的保存在同一个双端队列中。
handleCompletedSends
for (Send send : this.selector.completedSends()) { |
对于不要求响应的请求,将发送完成的RequestSend对应的ClientRequest从inFlightRequests中移除,直接组建一个body为空的response。
handleCompletedReceives
for (NetworkReceive receive : this.selector.completedReceives()) { |
根据接收到的源地址,移除inFlightRequests中对应的ClientRequest,并且根据收到的内容组建response