[TOC]
start up controller
引入Controller的目的是为了减小ZK的压力以及降低整个分布式系统的复杂度。
注册监听
向zk注册监听两个实例:
- SessionExpirationListener:如果session时效了,zk会负责重连的,kafka这边不需要处理
- LeaderChangeListener:监听集群的leader(即controller)发生改变,对于原leader需要执行resign的操作(
onControllerResignation
)将Leadership上交:- de-register listeners:IsrChangeNotificationListener、ReassignedPartitionsListener、PreferredReplicaElectionListener、ReassignedPartitionsIsrChangeListeners
- 关闭的功能有:删除topic、自动重平衡管理、replica与partition的状态机等
start Elector
首先要判断下zookeeper下面是否存在永久节点:
/controller
,该节点存储着controller的信息,如:{"version":1,"brokerid":2,"timestamp":"1493345638688"}
抢注leader
- 从/controller目录下获取controllerId,成功获取到就返回amILeader。这样就能确保如果有broker注册controller(向zk的/controller目录下面创建临时节点)成功,其他broker就不再尝试注册
- 抢注leader:使用
ZKCheckedEphemeral
注册临时节点,下面是controller(broker)关闭的情况下,broker2注册controller成功的输出日志:[2017-04-28 10:13:58,689] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-04-28 10:13:58,692] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-04-28 10:13:58,693] INFO 2 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2017-04-28 10:13:59,592] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
onControllerFailover
在抢注成功后,该broker需要担负起Leader的职责,包括以下几个方面:
1. 改朝换代
读取 /controller_epoch
的值,获取当前controller的纪元(朝代),然后增加1
2. 注册ZK监听程序
对ZK上的节点进行监听:
- /admin/reassign_partitions:监听partition重分配的动作
- isr_change_notification:该节点用于通知parition的ISR变化
- /admin/preferred_replica_election:
以上都是通过controller注册的,下面是通过特定的状态机向ZK注册监听的:
- PartitionStateMachine:描述parition的状态机
- /brokers/topics:
TopicChangeListener
- /admin/delete_topics:
DeleteTopicsListener
- /brokers/topics:
- replicaStateMachine:描述replicas的状态机
- /brokers/ids:
BrokerChangeListener
- /brokers/ids:
最后,对每个topic添加PartitionModificationsListener
,zk路径为:/brokers/topics/topic_name
3. 初始化controller上下文
收集:brokers、topics、partitions-leader、ISR等信息;开启ControllerChannelManager
和partitionStateMachine
4. 进行分区重分配和分区leader选举操作
详见
5. 发送metadata数据
发送数据到集群上所有的broker,包括live和shutdown的
6. 开启分区重平衡线程
分区的重平衡是否开启由参数auto.leader.rebalance.enable
控制,默认开启;线程的执行间隔由leader.imbalance.check.interval.seconds
控制,默认为300
所有触发Elect的情况
partition reassign
在Kafka-manager上对topic——testReassign(包含3个分区)进行手动partition分配:
这是重分配之前的分区情况:
调整0和1的分区Replicas:
执行Reassign Partitions,观察日志:
PartitionsReassignedListener感知到了ZK上/admin/reassign_partitions节点上内容发生了变化[PartitionsReassignedListener on 2]: Partitions reassigned listener fired for path /admin/reassign_partitions. Record partitions to be reassigned {"version":1,"partitions":[{"topic":"testReassign","partition":2,"replicas":[1,2]},{"topic":"testReassign","partition":1,"replicas":[2,3]},{"topic":"testReassign","partition":0,"replicas":[3,1]}]}
由于分区2未进行重分配,其原来的replicas与现在的一致,因此无视该Reassign请求:kafka.common.KafkaException: Partition [testReassign,2] to be reassigned is already assigned to replicas 1,2. Ignoring request for partition reassignment
接下来对分区0和1进行重分配,就拿1来说:[Controller 2]: Handling reassignment of partition [testReassign,1] to new replicas 2,3
[Controller 2]: New replicas 2,3 for partition [testReassign,1] being reassigned not yet caught up with the leader
[Controller 2]: Updated path /brokers/topics/testReassign with {"version":1,"partitions":{"2":[1,2],"0":[2,3],"1":[2,3,1]}} for replica assignment
[Controller 2]: Updated assigned replicas for partition [testReassign,1] being reassigned to 2,3,1
[[Controller 2]: Updating leader epoch for partition [testReassign,1].
[Controller 2]: Updated leader epoch for partition [testReassign,1] to 1
[Replica state machine on controller 2]: Invoking state change to NewReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
[Controller 2]: Waiting for new replicas 2,3 for partition [testReassign,1] being reassigned to catch up with the leader
疑问:往/brokers/topics/testReassign中写入的为啥是[2,3,1]而不是[2,3]呢?留个坑:在KafkaController.onPartitionReassignment方法中找答案吧~
该方法在多处被调用
partition state machine
下图简单描述了partition的4种状态:
状态机通过一个map类型的partitionState
来存放所有分区的当前状态。
start up
此时cluster的controller已经产生,controller通过读取ZK的partition和ISR节点信息判断每个partition的当前状态。
具体是判断每个分区对应的LeaderIsrAndControllerEpoch信息与当前的Epoch是否吻合:
- 如果吻合,则是OnLine状态
- 存在但不吻合,则是OffLine状态
- 不存在信息,则是New状态
划分好了当前的状态后,下面要将new和offline的转化成Online
如何转化呢?当然是选举leader咯,关键代码在OfflinePartitionLeaderSelector
这个类里,下面这个表是选举时遇到的几种情况
ISR | replicas | result |
---|---|---|
(1,2) | (1,2) | leader = 1 |
null | (2,3) | unclean enabled ? leader=2 : NoReplicaOnlineException |
null | null | NoReplicaOnlineException |
如当前有个repilca设置为1的topic:”jjhtest”,当前leader与isr的分配为:
接下来我重启broker3,会发生什么呢?
由于每个分区的replicas都是固定的(不考虑手动分配),关闭broker3将导致分区1,4,7的replicas中无可用的broker,这3个分区将无法分配到broker,进入offline状态
如果分区的replica>1,在
shutDownBroker
方法中:broker作为leader的分区状态转移是online ==> online,leader选举的selector为:ControlledShutdownPartitionLeaderSelector
broker3恢复后,以上3个分区由offline状态向online转变,触发leader的选举
选举的过程:
partitionReplicaAssignment
该变量是个Mutable Map,存放的是每个分区对应的replicas信息。这个信息有两个来源:
TopicChangeListener监听ZK上的topic变化,比如新增了topic,那么就会从ZK获取分区的replicas分配信息
- PartitionModificationsListener监听parition的变化,比如新增了分区,会从ZK获取新增分区的replicas信息。
electLeaderForPartition
当partition的状态由offline或者online ==> online时,会通过不同的PartitionLeaderSelector选举leader(LeaderSelector缩写为LS):
每次成功执行了leader select,都会更新leader cache(controllerContext.partitionLeadershipInfo
)
preferred replica election
在controller启动的过程中会执行一次preferredReplicaElection(简称PRE),并创建一个间隔为
leader.imbalance.check.interval.seconds
(default:300)的定时任务执行PartitionRebalance操作,该操作会触发PRE
kafka平衡策略的实现依赖于以下几方面的因素:
- 在不考虑手动分配分区的情况下,每个分区分配的Replicas是写死的,包括顺序都是固定的。
- 在出现broker挂掉的情况下,leader的重新选举能保证消息不丢失(未触发脏选举),而PRE(首选备份选举)能保证broker恢复后分区的leader能均衡分布在集群上。
小实验
有一个分区数为6,备份数为2的topic:”TheOne”,当前的分配状态为:
关闭broker3后分区4和5的leader重新选举:
broker3恢复后,又重新加入到了各个topic的ISR中(IsrChangeNotificationListener被触发):
一段时间后,触发PRE:
[Controller 1]: Starting preferred replica leader election for partitions [TheOne,5] (kafka.controller.KafkaController)
[Partition state machine on Controller 1]: Invoking state change to OnlinePartition for partitions [TheOne,5] (kafka.controller.PartitionStateMachine)
[PreferredReplicaPartitionLeaderSelector]: Current leader 1 for partition [TheOne,5] is not the preferred replica. Trigerring preferred replica leader election (kafka.controller.PreferredReplicaPartitionLeaderSelector)
[Controller 1]: Partition [TheOne,5] completed preferred replica leader election. New leader is 3 (kafka.controller.KafkaController)
leader & ISR又恢复了初始状态,达到了均衡。
PRE的选举操作由preferredReplicaPartitionLeaderSelector完成,状态为online ==> online
思考:动态扩展broker如何实现?动态扩展broker能否实现分区的均衡扩展呢?
partition reblance
- 分区重平衡的间隔上面提到过了,默认为5分钟。
触发重平衡由失衡率(imbalanceRatio)决定:
$$ ratio = Sum(partitonsNotLeaded)/Sum(partitonsShouldLeaded)$$
当ratio大于leader.imbalance.per.broker.percentage(默认10%)时会触发重平衡DEBUG: topics not in preferred replica Map() (kafka.controller.KafkaController)
TRACE: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController)
DEBUG: topics not in preferred replica Map() (kafka.controller.KafkaController)
TRACE: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController)
DEBUG: topics not in preferred replica Map([TheOne,4] -> List(3, 2), [TheOne,5] -> List(3, 1),...) (kafka.controller.KafkaController)
TRACE: leader imbalance ratio for broker 3 is 0.951613 (kafka.controller.KafkaController)重平衡的过程主要是执行PRE
Replica state machine
与partition相比多了3种状态
create topic
新建topic涉及到partition以及replica状态的改变:
- NoExistent ==> New
Invoking state change to NewPartition for partitions [TT,6],...(kafka.controller.PartitionStateMachine)
Invoking state change to NewReplica for replicas [Topic=TT,Partition=6,Replica=2],[Topic=TT,Partition=6,Replica=3],... (kafka.controller.ReplicaStateMachine)
在新建topic后,ZK就会为该topic分配replicas
- New ==> Online
#partition state machine
Invoking state change to OnlinePartition for partitions [TT,6],...
Live assigned replicas for partition [TT,6] are: [List(2, 3)]
Initializing leader and isr for partition [TT,6] to (Leader:2,ISR:2,3,LeaderEpoch:0,ControllerEpoch:30)
#replica state machine
Invoking state change to OnlineReplica for replicas [Topic=TT,Partition=6,Replica=2],[Topic=TT,Partition=6,Replica=3],...
ReplicaDeletionStarted
该状态一般在进行手动分区分配时产生的,以parition reassign实验试验中的分区1为例:replicas:[3,1] ==> [2,3]
- broker2成为NewReplica:
Invoking state change to NewReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
- broker2进入OnlineReplica:
Invoking state change to OnlineReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
broker1成为OfflineReplica,从ISR中移除:
Invoking state change to OfflineReplica for replicas [Topic=testReassign,Partition=1,Replica=1]
Removing replica 1 from ISR 3,1 for partition [testReassign,1]broker1进入ReplicaDeletionStarted状态:
Invoking state change to ReplicaDeletionStarted for replicas [Topic=testReassign,Partition=1,Replica=1]
- broker1进入ReplicaDeletionSuccessful :
nvoking state change to ReplicaDeletionSuccessful for replicas [Topic=testReassign,Partition=1,Replica=1]
- broker1进入NonExistentReplica:
Invoking state change to NonExistentReplica for replicas [Topic=testReassign,Partition=1,Replica=1]
ZKListener 详解
下面对Controller中使用到的一些监听ZK节点及子节点变化的listener进行梳理
listener | 监听的节点 | 用处 | 处理逻辑 |
---|---|---|---|
BrokerChangeListener | /brokers/ids 子节点 | 在ReplicaStateMachine中使用 | BCL |
ISRChangeNotificationListener | /isr_change_notification 子节点 | Controller用于监听各个partition的ISR变化 | ICNL |
PartitionModificationsListener | /brokers/topics/topic_name | 监听对应topic分区的变化(只允许增加) | PML |
LeaderChangeListener | /controller | 监听集群leader的变化 | 注册监听 |
Broker Change Listener
/brokers/ids目录下有多个以broker_id命名的子节点,每个子节点上存储着与broker相关的信息,如:{"jmx_port":9999,"timestamp":"1493886226618","endpoints":["PLAINTEXT://ip:9092"],"host":"ip","version":3,"port":9092}
当这些子节点的内容发生变化时,controller感知到后会统计出变化的情况:
- broker2 shutdown:
Newly added brokers: , deleted brokers: 2, all live brokers: 1,3
- broker2 startup:
Newly added brokers: 2, deleted brokers: , all live brokers: 1,2,3
delete broker
shutdownBroker
该方法与KafkaServer中的shutdown方法的区别在于前者用于处理
ControlledShutdown
请求,将分区的职责转移出去,而后者是关闭brokerServer的一堆服务。
需要处理的分区为:
$ List(partition) = { partition | id ∈ Replicas(partition) && replicaFactor(partition) > 1 } $
- 对于作为leader的分区,执行leader的重选举(
ControlledShutdownLS
) - 对于作为普通replica的分区:
- 关闭ReplicaRequest,如broker3是TheOne-1的replica,在关闭broker3时controller会向3发送关闭replica请求的请求:
The stop replica request (delete = false) sent to broker 3 is [Topic=TheOne,Partition=1,Replica=3](kafka.controller.ControllerBrokerRequestBatch)
- broker3收到StopReplica请求后调用ReplicaManager的stopReplicas移除了相关的replica的Fetcher:
[ReplicaFetcherManager on broker 3] Removed fetcher for partitions TheOne-1
- broker3收到StopReplica请求后调用ReplicaManager的stopReplicas移除了相关的replica的Fetcher:
- 将broker3的Replica状态置为OfflineReplica
- 关闭ReplicaRequest,如broker3是TheOne-1的replica,在关闭broker3时controller会向3发送关闭replica请求的请求:
onBrokerFailure
步骤如下:
- 在shutdownBroker的处理的分区中有一项要求是:replicaFactor > 1,而对于replicaFactor = 1的分区来说,如果其leader所在的broker关闭了,那么该分区的状态将置为OfflinePartition。筛选的依据是其leader是否为关闭的broker(其他类型的分区在上面的流程中要么leader重新选举过了,要么只是改变ISR)
- 尝试使用
OfflinePartitionLS
将offline和new类型的partition转化为Online 集体性的将该broker的replica的状态置为OfflineReplica
感觉重复操作了,因为shutdownBroker阶段也有这操作。不同点在于shutdownBroker只处理replica>1分区的replica状态。如果第一步操作未执行,那么将向其他broker发送一个UpdateMetadataRequest
onBrokerStartup
主要处理的依然是分区、replica、ISR以及leader的转化:
发送UpdateMetadataRequest(具体原因不明)
- 将该broker的replica状态置为OnlineReplica
- 尝试使用
OfflinePartitionLS
将offline和new类型的partition转化为Online,这一步的主要目的是为了将onBrokerFailue阶段呗置为OffLinePartition的分区恢复成Online:[OfflinePartitionLeaderSelector]: No broker in ISR is alive for [jjhtest,1]. Pick the leader from the alive assigned replicas: 3 [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [jjhtest,1]. Elect leader 3 from live brokers 3. There is potential data loss. [OfflinePartitionLeaderSelector]: Selected new leader and ISR {"leader":3,"leader_epoch":23,"isr":[3]} for offline partition [jjhtest,1]
- 判断是否需要执行分区重分配的操作
ISR Change Notification Listener
集群正常运行时/isr_change_notification节点下面是没有子节点的,当ISR发生变化的时候,如重启某个broker,那些ISR原来包含该broker的分区需要调整ISR,因此会创建这些partition的子节点。
下面是broker3挂掉后,TheOne这个topic的ISR:
在broker3恢复后,ISNL感知到了某些partition的ISR发生了变化,进行下面3步操作:
- 根据子节点的内容更新对应parition的leader以及ISR cache (从ZK上获取)
- 向集群所有节点发送MetadataRequest:
DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(1, 2, 3) for TopicAndPartitions:Set([TheOne, 2], [TheOne, 4],...)
- 删除子节点
Partition Modifications Listener
Contorller为每个topic都设置了一个PartitionModificationsListener,用于监听新增分区的操作
下面是将TheOne的分区数增加3,PML所感知到的信息:[AddPartitionsListener on 1]: Partition modification trigered {"version":1,"partitions":{"8":[3,2],"4":[3,2],"5":[3,1],"6":[1,3],"1":[1,3],"0":[1,2],"2":[2,3],"7":[2,1],"3":[2,1]}} for path /brokers/topics/TheOne
[AddPartitionsListener on 1]: New partitions to be added Map([TheOne,7] -> List(2, 1), [TheOne,6] -> List(1, 3), [TheOne,8] -> List(3, 2))
新增了Partition,ZK已经为这些partition分配了replicas,在ZK上每个topic的分区上存储着的信息如:{"controller_epoch":30,"leader":3,"version":1,"leader_epoch":0,"isr":[3,2]}
拿到这些消息后,Controller要做的第一件事就是将新增的分区的replicas信息添加到partitionReplicaAssignment中,接下来才是处理这些新建的分区的状态、leader等,这个操作与create topic的处理如出一辙,区别在于前者是对新增的分区进行处理,后者是对这个topic的所有分区进行处理