kafka producer原理梳理

从Kafka 0.8.2开始,发布了一套新的Java版的client api,KafkaProducer/KafkaConsumer,替代之前的scala版的api。

REF

Kafka: Producer (0.10.0.0)
Kafka源码分析 Producer客户端(超详细)

重要配置项说明

  • linger.ms:这是在send的过程中人为设置的一个delay,目的是将尽可能多的消息放到一个batch中,这样能减少发送的次数;
  • max.block.ms:如果buffer满了或者无法获取到metadata,那么将阻塞KafkaProducer.send()与partitionFor()这两个方法,为了防止无限等待,这里设置了一个最长等待时间,默认为60s,超出这个时间的消息就不会被发送了;
  • request.timeout.ms:连接超时时间,设置为30s;

send

下面是KafkaProducer执行send操作的时序图:

采用的是一种Non-block的工作方式,提供一个callback,调用send后,可以继续发送消息而不用等待。当有结果返回时,callback会被自动通知执行。

public Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);

// 序列化key和value
byte[] serializedKey= keySerializer.serialize(record.topic(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());

// 根据key选择Partition
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
TopicPartition tp = new TopicPartition(record.topic(), partition);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);

// 在每次追加一条消息到收集器之后,都要判断是否满了.如果满了,就执行一次Sender操作,通知Sender将这批数据发送到Kafka
if (result.batchIsFull || result.newBatchCreated) this.sender.wakeup();
return result.future;
}

RecordAccumulator

kafka消息的异步发送是依靠消息缓存实现的,RecordAccumulator扮演的就是这个角色,它创建了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>>类型的缓存结构,每条消息会被放到对应的双端队列中。

append

public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//用于记录当前等待处理的batch数量
appendsInProgress.incrementAndGet();
try {
// 有可用的dq就试着将消息入队
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}

// 无法放到之前的dq,那么就分配空间然后创建dq
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
  • 为了实现并发,对每个队列的操作都放在同步块中操作
  • 新创建了一个RecordBatch,或者当前的RecordBatch的buffer被填满了,那么就可以发车 (唤醒Sender)了

ready

找出每个topic-partition对应的leader,并判断发送条件是否成熟

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();

for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
if (leader == null) {
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
...
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}

drain

在ready环节已经找出所有的ready nodes,那么接下来就将batchs中的RecordBatch按照node进行分组,方便后续的发送

Request && Response

下面要探究的是如何将RecordBatch转换成server端能理解的请求,以及收到server的响应后如何进行处理。

  • 首先,将发往相同broker、topic-partition的RecordBatch组成一个ProduceRequest
  • 根据ProduceRequest进而组成一个用于发送的RequestSend
  • handleProduceResponse方法中实现了对ClientResponse的处理,在这里会对batch做done操作,细节在producer回调中有阐述。
  • 根据上面的处理方法实现了回调方法,进而完成了ClientRequest的组建工作
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
RequestSend send = new RequestSend(Integer.toString(destination),
this.client.nextRequestHeader(ApiKeys.PRODUCE),
request.toStruct());
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};

return new ClientRequest(now, acks != 0, send, callback);


回调函数是在NetworkClient层被调用的,具体的是client.poll()中:

for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}

NetworkClient

NetworkClient是kafka客户端实现网络连接、数据读写的。是通过Java NIO实现的,并且实现了自己的Selector与KafkaChannel

连接

在Sender的run方法中创建ClientRequest之前需要先筛选出可连接的节点,这里调用的是NetworkClient的ready方法:

if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);

然后调用Selector的connect方法:

  • 开启一个SocketChannel

    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);
  • 将创建的Channel注册在nioSelector上,监听事件为:OP_CONNECT

  • 创建KafkaChannel,并与brokerId绑定
    KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    key.attach(channel);
    this.channels.put(id, channel);

因为实现的是非阻塞模式的,因此connect方法在连接成功建立之前就会返回的。为了确保连接成功,需要调用KafkaChannel的finishConnect方法

public boolean finishConnect() throws IOException {
boolean connected = socketChannel.finishConnect();
if (connected)
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}

在连接成功后就注册了OP_READ。为什么不在发送请求的时候注册嘞,因为一来响应什么时候到我们是无法预料的,二来有些请求并不要求响应。

发送

在组建完ClientRequest后,要调用Selector的send操作,被发送的主体是RequestSend

public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
channel.setSend(send);
}

private KafkaChannel channelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
}

需要注意的是:

  • 每个KafkaChannel上只有一个RequestSend,无法加塞,必须在之前的被发送掉后才能上车
  • setSend的操作会注册OP_WRITE,然后在发送完成后,会取消掉OP_WRITE
//设置发送的主体
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

//send完成后取消OP_WRITE
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

return send.completed();
}

poll轮询

上面的setSend、write、send的操作如何组织起来呢,这就是poll的工作了。

简单来说就是nioSelector执行一个select操作,然后遍历SelectionKey,如果有我们感兴趣的操作,就对相应的Channel进行read、write之类的操作。

遍历获取有事件发生的Channel

Iterator<SelectionKey> iterator = selectionKeys.iterator();
//遍历获取每个事件上的Channel
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
if (channel.finishConnect()) {
this.connected.add(channel.id());
} else
continue;
}

read from KafkaChannel

if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
  • key.isReadable()只有在Channel监听Read事件,并且Channel有数据写入时才成立
  • 在完成读操作后,会将NetworkReceive添加到队列:completedReceives

    write to KafkaChannel

    if (channel.ready() && key.isWritable()) {
    Send send = channel.write();
    if (send != null) {
    this.completedSends.add(send);
    this.sensors.recordBytesSent(channel.id(), send.size());
    }
    }

不同于read操作时使用了while循环来确保读取完毕,write操作是一次性的,如果发送完毕,就去除OP_WRITE事件,并且将Send添加到队列completedSends中;否则就将不取消,然后在后面的循环中继续执行写操作,直至写完为止

NetworkClient.poll

下面把目光拉回到NetworkClient中,来看看执行完selector的poll操作之后,是如何进一步处理的。

ClientRequest在被NetworkClient层执行发送操作的时候,会被添加到
inFlightRequests中,这个名字很形象——正在飞行中的请求,指代正在发送或者等待响应的请求的集合。该集合中将ClientRequest按照目的节点进行分组,相同目的节点的保存在同一个双端队列中。

handleCompletedSends

for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}

对于不要求响应的请求,将发送完成的RequestSend对应的ClientRequest从inFlightRequests中移除,直接组建一个body为空的response。

handleCompletedReceives

for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}

根据接收到的源地址,移除inFlightRequests中对应的ClientRequest,并且根据收到的内容组建response

producer消息发送过程