[TOC]
疑问
- leader是在何时更新Highwater的?
- kafka-manager上出现Lag为负值是什么原因造成的?
- Log中的消息被删除时,ISR之间是如何协调的?
下面所有的讨论都是基于一个包含3个broker的kafka集群而言的
Replica
leader vs followers
如果将所有的topic的replicas
设置为2(_consumer_offsets除外),那么对于每个partition而言其Log存在于两个broker上,其中一个作为leader,另一个作为followers
下图是一个包含3个分区的topic的replicas以及leader分布情况:
leader与followers的职责
之前在kafka-consumer剖析的文章中介绍过high watermark(简称HW)以及log end offset(简称LEO)的概念。作为leader,其主要职责是:
- 将消息及offset写入Local log & offset
- 在followers完全备份了消息后(leader应该是会收到通知),更新HW
- leader并不需要维护LEO的值,因为在Log中有一个
nextOffsetMetadata
属性就是每个Log维护的最新offset的信息
反观followers,其职责则体现在与leader的交互上:
- 备份消息,然后通知leader
- 更新LEO的值
创建Replica
因为每个partition都有对应的replicas,所以创建replica的操作是在Partition中执行的,具体的方法是getOrCreateReplica
:if (isReplicaLocal(replicaId)) {
//创建Log文件
val config = LogConfig.fromProps(...)
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
//获取checkPoint文件
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
} else {
val remoteReplica = new Replica(replicaId, this, time)
addReplicaIfNotExists(remoteReplica)
}
getReplica(replicaId).get
其中:
- getReplica与addReplicaIfNotExists都对
assignedReplicaMap
进行操作 - checkpoint里维护着该log.dir下面所有topic-parition与offset的对应关系
- log.dir是Log文件所在目录,log.dir.getParentFile就是server.properties中定义的
log.dirs
ReplicaManager
创建Replica的调用路径是ReplicaManager.becomeLeaderOrFollower->makeFollowers。ReplicaManager是broker范畴的,管理着broker上面所有的partition
OffsetCheckPoint
在多磁盘的环境中,log.dirs会定义在多个磁盘上,这样就能将Partiton分布在不同的磁盘上。
每个磁盘中的Log目录下都维护着3个OffsetCheckPoint文件:
: recovery-point-offset-checkpoint
: replication-offset-checkpoint:维护parition与offset信息
: cleaner-offset-checkpoint
replication-offset-checkpoint的内容如下:0
21
__consumer_offsets 16 0
__consumer_offsets 49 124
admin.benchmark 9 24564634
test 13 16709
theOne 18 0
__consumer_offsets 4 6
...
其中:
- 第一行是
CurrentVersion
的值,默认是0 - 第二行是当前Log目录下面分区的数量
- 后面紧跟着的21行是分区以及对应的offset信息,格式为:
$topic $partition $offset
在ReplicaManager中会将所有log目录下面的replication-offset-checkpoint组成一个名为highWatermarkCheckpoints
的Map:val highWatermarkCheckpoints =
config.logDirs.map(dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir,ReplicaManager.HighWatermarkFilename)))).toMap
其更新操作是由一个定时任务控制的,每隔replica.high.watermark.checkpoint.interval.ms
(默认5s)的时间会将所有log目录下的raplica的HW值写入到对应的replication-offset-checkpoint文件中。该定时任务是在becomeLeaderOrFollower
方法中被开启的。
appendMessages
在Log分析曾提到过appendMessages
方法,该方法内先将消息写入local(即Leader的Log),然后判断ack是否=-1来决定是否需要创建延迟的producer请求(不立即反馈,需要等待备份完成,由delayedProducePurgatory
进行管理)。该请求是从leader发往followers的,内容包含:
- delayedMs:即producer配置的request.timeout
- requiredOffset: Leader在写入消息后的InextOffsetMetadata值(用
LogAppendInfo.lastOffset + 1
表示)
在ack = -1的策略下,leader必须在所有的followers都将消息备份的情况下,才会向producer发送反馈,而判断消息是否已完整备份的方法是Partition.checkEnoughReplicasReachOffset
:def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val curInSyncReplicas = inSyncReplicas
//对ISR中所有replicas的LEO校验是否已跟上leader的步伐(包括leader)
def numAcks = curInSyncReplicas.count { r =>
if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) {
true
}
else
false
else
true
}
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION)
}
}
第一个返回参数表示是否有足够多的replicas达到了requiredOffset
- 如果HW < requiredOffset,表明leader还未收到足够多的replicas的response
- HW >= requiredOffset表明HW已经更新过了,此时如果ISR的数量小于misISR,那么就报
NOT_ENOUGH_REPLICAS_AFTER_APPEND
错 - 如果当前ReplicaManger发现local不再是leader了,说明这个broker出现问题了,报
NOT_LEADER_FOR_PARTITION
错
Increment HW
HW是由leader维护的,其更新时机主要有两种:
- ISR发生变化,包括:
- 新的broker成为leader
- shrink ISR
- expand ISR
- Replicas已备份完新的消息,判别的方法有两种:
- ISR中Replicas中的最小LEO值大于HW
- HW的LogOffsetMetadata存在一个老的log segment中,表明上一次Producer请求已处理完成(即消息已经在完成备份)
ShrinkIsr
becomeLeaderOrFollower
ReplcaFetcher
启动过程ReplicaManger相关日志记录
以broker3启动日志为例:
在Log加载、处理完后,首先是unblock操作:[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
应该是解封在关闭阶段被阻塞的一些request
下面开启ReplicaFetcherThread:[2017-04-25 13:35:57,367] INFO [ReplicaFetcherThread-0-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,384] INFO [ReplicaFetcherThread-3-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,384] INFO [ReplicaFetcherThread-1-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,398] INFO [ReplicaFetcherThread-2-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,406] INFO [ReplicaFetcherThread-1-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,414] INFO [ReplicaFetcherThread-0-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,423] INFO [ReplicaFetcherThread-2-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,435] INFO [ReplicaFetcherThread-3-2], Starting (kafka.server.ReplicaFetcherThread)
数字编号的含义是什么呢??
接下来是添加Fetcher请求,向Remote Broker请求那些本地不是leader的分区的数据。摘取片段如下:INFO [ReplicaFetcherManager on broker 3] Added fetcher for partitions List([topic1-8, initOffset 6538 to broker BrokerEndPoint(1,10.45.4.9,9092)] ,[topic2-9, initOffset 0 to broker BrokerEndPoint(1,10.45.4.9,9092)]...)
ReplicaFetcherThread
每个ReplicaManager不仅要维护一个producer的炼狱(delayedProducePurgatory),还维护了一个fetcher的炼狱(delayedFetchPurgatory)。fetch这个操作不仅仅是由consumer触发的,followers备份消息也是通过向leader fetch实现的。
ReplicaFetcherManager
- ReplicaFetcherManager管理着ReplicaFetcherThread的创建和关闭工作。
- 该manager的命名格式为:“ReplicaFetcherManager on broker $ID”
thread
- fetcher的数量由
num.replica.fetchers
控制:(31 * topic.hashCode() + partitionId) % numFetchers
,当该参数设置为4,那么 $fetcherId∈[0,1,2,3]$,对于3节点replicas=2的集群,每个broker需要$4*2=8$个fetcherThread:val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
上面的代码中将partition-broker这个map按照broker->fetcherId的对应关系进行分组。
- fetcherThread的命名方式为:ReplicaFetcherThread-fetcherId-brokerId
- ReplicaFetcherThread是个定时执行的线程,执行间隔由
replica.fetch.backoff.ms
控制,默认为1s - 每个ReplicaFethcerThread的fetch目标是一个broker上的若干topic-partition
工作流程
processPartitionData
当前:
: broker2作为test-1的follower
: 其LEO值为17314
: leader的HW=17314
: FetchRequest中的fetchOffset = 17314
收到一条消息:
[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17314 for partition test-1. Received 45 messages and leader hw 17314
对log执行append操作
[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17315 after appending 45 bytes of messages for partition test-1
与
appendMessagesToLeader
时不相同的是,follower执行append操作不会再进行offset配置操作,用得就是leader传过来的offset
- 设置follower的HW值
[ReplicaFetcherThread-3-1], Follower 2 set replica high watermark for partition [test,1] to 17314
1s后再次fetch到消息:[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17315 for partition test-1. Received 45 messages and leader hw 17315
[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17316 after appending 45 bytes of messages for partition test-1
[ReplicaFetcherThread-3-1], Follower 2 set replica high watermark for partition [test,1] to 17315
由上面的日志可以注意到:
- LEO的值由17314 => 17315,这是因为前面执行的Log.append操作使得nextOffsetMetadata的值+1
- leader的HW值变成了17315,这是由于leader检测到了followers(就是broker2)LEO的最小值已经变成了17315,所以更新了HW值,具体实现在Increment HW中有介绍
handleOffsetOutOfRange
当ReplicaFether的fetchOffset不在leader的offset之内(即大于最大值或者小于最小值),那么就会收到
OffsetOutOfRangeException
假设当前test-1这个partition的leader为1,新增broker2作为follower
broker2在catch up的过程中broker1挂了,broker2被选为leader。(需要unclean.leader.election.enable
配置为1,才能允许这样的脏选举)
情况一、Replica fetchOffset > leader LEO
broker1在恢复后,成为了follower,向leader发送ReplicaFetcherRequest,其中fetchOffset=7,该值大于leader的LEO,所以需要将broker1上该分区的Log执行truncate操作使得LEO值与leader保持一致。
值得注意的是,这样的truncate操作后,follower与leader的消息并不完全一致的,上例中,offset为3和4的消息就是不一致的
情况二、Replica fetchOffset < leader start offset
broker1在恢复之前,broker2添加了很多消息,并且也删除了一个消息,导致其最小的offset大于broker1中的LEO,这种情况下,broker1中的所有消息都没有意义了(因为ISR中的leader不再维护这些消息),所以就删除掉所有的segment,然后fetch leader的第一条消息
Kafka_0.10.1.1版本对于收到OffsetOutOfRangeException时follower的LEO处于leader开始与LEO之间的情况没有对策。