ISR消息管理

[TOC]

疑问

  • leader是在何时更新Highwater的?
  • kafka-manager上出现Lag为负值是什么原因造成的?
  • Log中的消息被删除时,ISR之间是如何协调的?

下面所有的讨论都是基于一个包含3个broker的kafka集群而言的

Replica

leader vs followers

如果将所有的topic的replicas设置为2(_consumer_offsets除外),那么对于每个partition而言其Log存在于两个broker上,其中一个作为leader,另一个作为followers

下图是一个包含3个分区的topic的replicas以及leader分布情况:

leader与followers的职责

之前在kafka-consumer剖析的文章中介绍过high watermark(简称HW)以及log end offset(简称LEO)的概念。作为leader,其主要职责是:

  • 将消息及offset写入Local log & offset
  • 在followers完全备份了消息后(leader应该是会收到通知),更新HW
  • leader并不需要维护LEO的值,因为在Log中有一个nextOffsetMetadata属性就是每个Log维护的最新offset的信息

反观followers,其职责则体现在与leader的交互上:

  • 备份消息,然后通知leader
  • 更新LEO的值

创建Replica

因为每个partition都有对应的replicas,所以创建replica的操作是在Partition中执行的,具体的方法是getOrCreateReplica

if (isReplicaLocal(replicaId)) {
//创建Log文件
val config = LogConfig.fromProps(...)
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
//获取checkPoint文件
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
} else {
val remoteReplica = new Replica(replicaId, this, time)
addReplicaIfNotExists(remoteReplica)
}
getReplica(replicaId).get

其中:

  • getReplicaaddReplicaIfNotExists都对assignedReplicaMap进行操作
  • checkpoint里维护着该log.dir下面所有topic-parition与offset的对应关系
  • log.dir是Log文件所在目录,log.dir.getParentFile就是server.properties中定义的log.dirs

ReplicaManager

创建Replica的调用路径是ReplicaManager.becomeLeaderOrFollower->makeFollowers。ReplicaManager是broker范畴的,管理着broker上面所有的partition

OffsetCheckPoint

在多磁盘的环境中,log.dirs会定义在多个磁盘上,这样就能将Partiton分布在不同的磁盘上。

每个磁盘中的Log目录下都维护着3个OffsetCheckPoint文件:
: recovery-point-offset-checkpoint
: replication-offset-checkpoint:维护parition与offset信息
: cleaner-offset-checkpoint

replication-offset-checkpoint的内容如下:

0
21
__consumer_offsets 16 0
__consumer_offsets 49 124
admin.benchmark 9 24564634
test 13 16709
theOne 18 0
__consumer_offsets 4 6
...

其中:

  • 第一行是CurrentVersion的值,默认是0
  • 第二行是当前Log目录下面分区的数量
  • 后面紧跟着的21行是分区以及对应的offset信息,格式为:$topic $partition $offset

在ReplicaManager中会将所有log目录下面的replication-offset-checkpoint组成一个名为highWatermarkCheckpoints的Map:

val highWatermarkCheckpoints = 
config.logDirs.map(dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir,ReplicaManager.HighWatermarkFilename)))).toMap

其更新操作是由一个定时任务控制的,每隔replica.high.watermark.checkpoint.interval.ms(默认5s)的时间会将所有log目录下的raplica的HW值写入到对应的replication-offset-checkpoint文件中。该定时任务是在becomeLeaderOrFollower方法中被开启的。

appendMessages

Log分析曾提到过appendMessages方法,该方法内先将消息写入local(即Leader的Log),然后判断ack是否=-1来决定是否需要创建延迟的producer请求(不立即反馈,需要等待备份完成,由delayedProducePurgatory进行管理)。该请求是从leader发往followers的,内容包含:

  • delayedMs:即producer配置的request.timeout
  • requiredOffset: Leader在写入消息后的InextOffsetMetadata值(用LogAppendInfo.lastOffset + 1表示)

在ack = -1的策略下,leader必须在所有的followers都将消息备份的情况下,才会向producer发送反馈,而判断消息是否已完整备份的方法是Partition.checkEnoughReplicasReachOffset

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val curInSyncReplicas = inSyncReplicas
//对ISR中所有replicas的LEO校验是否已跟上leader的步伐(包括leader)
def numAcks = curInSyncReplicas.count { r =>
if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) {
true
}
else
false
else
true
}

val minIsr = leaderReplica.log.get.config.minInSyncReplicas

if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION)
}
}

第一个返回参数表示是否有足够多的replicas达到了requiredOffset

  • 如果HW < requiredOffset,表明leader还未收到足够多的replicas的response
  • HW >= requiredOffset表明HW已经更新过了,此时如果ISR的数量小于misISR,那么就报NOT_ENOUGH_REPLICAS_AFTER_APPEND
  • 如果当前ReplicaManger发现local不再是leader了,说明这个broker出现问题了,报NOT_LEADER_FOR_PARTITION

Increment HW

HW是由leader维护的,其更新时机主要有两种:

  1. ISR发生变化,包括:
  • 新的broker成为leader
  • shrink ISR
  • expand ISR
  1. Replicas已备份完新的消息,判别的方法有两种:
    1. ISR中Replicas中的最小LEO值大于HW
    2. HW的LogOffsetMetadata存在一个老的log segment中,表明上一次Producer请求已处理完成(即消息已经在完成备份)

ShrinkIsr

becomeLeaderOrFollower

ReplcaFetcher

启动过程ReplicaManger相关日志记录

以broker3启动日志为例:
在Log加载、处理完后,首先是unblock操作:

[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 fetch requests. (kafka.server.ReplicaManager)

应该是解封在关闭阶段被阻塞的一些request

下面开启ReplicaFetcherThread:

[2017-04-25 13:35:57,367] INFO [ReplicaFetcherThread-0-1], Starting  (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,384] INFO [ReplicaFetcherThread-3-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,384] INFO [ReplicaFetcherThread-1-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,398] INFO [ReplicaFetcherThread-2-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,406] INFO [ReplicaFetcherThread-1-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,414] INFO [ReplicaFetcherThread-0-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,423] INFO [ReplicaFetcherThread-2-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,435] INFO [ReplicaFetcherThread-3-2], Starting (kafka.server.ReplicaFetcherThread)

数字编号的含义是什么呢??

接下来是添加Fetcher请求,向Remote Broker请求那些本地不是leader的分区的数据。摘取片段如下:

INFO [ReplicaFetcherManager on broker 3] Added fetcher for partitions List([topic1-8, initOffset 6538 to broker BrokerEndPoint(1,10.45.4.9,9092)] ,[topic2-9, initOffset 0 to broker BrokerEndPoint(1,10.45.4.9,9092)]...)

ReplicaFetcherThread

每个ReplicaManager不仅要维护一个producer的炼狱(delayedProducePurgatory),还维护了一个fetcher的炼狱(delayedFetchPurgatory)。fetch这个操作不仅仅是由consumer触发的,followers备份消息也是通过向leader fetch实现的。

ReplicaFetcherManager

  • ReplicaFetcherManager管理着ReplicaFetcherThread的创建和关闭工作。
  • 该manager的命名格式为:“ReplicaFetcherManager on broker $ID”

thread

  • fetcher的数量由num.replica.fetchers控制:
    (31 * topic.hashCode() + partitionId) % numFetchers,当该参数设置为4,那么 $fetcherId∈[0,1,2,3]$,对于3节点replicas=2的集群,每个broker需要$4*2=8$个fetcherThread:
    val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
    BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

上面的代码中将partition-broker这个map按照broker->fetcherId的对应关系进行分组。

  • fetcherThread的命名方式为:ReplicaFetcherThread-fetcherId-brokerId
  • ReplicaFetcherThread是个定时执行的线程,执行间隔由replica.fetch.backoff.ms控制,默认为1s
  • 每个ReplicaFethcerThread的fetch目标是一个broker上的若干topic-partition

工作流程

processPartitionData

当前:
: broker2作为test-1的follower
: 其LEO值为17314
: leader的HW=17314
: FetchRequest中的fetchOffset = 17314

  • 收到一条消息:

    [ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17314 for partition test-1. Received 45 messages and leader hw 17314
  • 对log执行append操作

    [ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17315 after appending 45 bytes of messages for partition test-1

appendMessagesToLeader时不相同的是,follower执行append操作不会再进行offset配置操作,用得就是leader传过来的offset

  • 设置follower的HW值
    [ReplicaFetcherThread-3-1], Follower 2 set replica high watermark for partition [test,1] to 17314

1s后再次fetch到消息:

[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17315 for partition test-1. Received 45 messages and leader hw 17315
[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17316 after appending 45 bytes of messages for partition test-1
[ReplicaFetcherThread-3-1], Follower 2 set replica high watermark for partition [test,1] to 17315

由上面的日志可以注意到:

  1. LEO的值由17314 => 17315,这是因为前面执行的Log.append操作使得nextOffsetMetadata的值+1
  2. leader的HW值变成了17315,这是由于leader检测到了followers(就是broker2)LEO的最小值已经变成了17315,所以更新了HW值,具体实现在Increment HW中有介绍

handleOffsetOutOfRange

当ReplicaFether的fetchOffset不在leader的offset之内(即大于最大值或者小于最小值),那么就会收到OffsetOutOfRangeException

假设当前test-1这个partition的leader为1,新增broker2作为follower

broker2在catch up的过程中broker1挂了,broker2被选为leader。(需要unclean.leader.election.enable配置为1,才能允许这样的脏选举)

情况一、Replica fetchOffset > leader LEO


broker1在恢复后,成为了follower,向leader发送ReplicaFetcherRequest,其中fetchOffset=7,该值大于leader的LEO,所以需要将broker1上该分区的Log执行truncate操作使得LEO值与leader保持一致。

值得注意的是,这样的truncate操作后,follower与leader的消息并不完全一致的,上例中,offset为3和4的消息就是不一致的

情况二、Replica fetchOffset < leader start offset

broker1在恢复之前,broker2添加了很多消息,并且也删除了一个消息,导致其最小的offset大于broker1中的LEO,这种情况下,broker1中的所有消息都没有意义了(因为ISR中的leader不再维护这些消息),所以就删除掉所有的segment,然后fetch leader的第一条消息

Kafka_0.10.1.1版本对于收到OffsetOutOfRangeException时follower的LEO处于leader开始与LEO之间的情况没有对策。