Controller工作原理

[TOC]

start up controller

引入Controller的目的是为了减小ZK的压力以及降低整个分布式系统的复杂度。

注册监听

向zk注册监听两个实例:

  • SessionExpirationListener:如果session时效了,zk会负责重连的,kafka这边不需要处理
  • LeaderChangeListener:监听集群的leader(即controller)发生改变,对于原leader需要执行resign的操作(onControllerResignation)将Leadership上交:
    • de-register listeners:IsrChangeNotificationListener、ReassignedPartitionsListener、PreferredReplicaElectionListener、ReassignedPartitionsIsrChangeListeners
    • 关闭的功能有:删除topic、自动重平衡管理、replica与partition的状态机等

start Elector

首先要判断下zookeeper下面是否存在永久节点:/controller,该节点存储着controller的信息,如:{"version":1,"brokerid":2,"timestamp":"1493345638688"}

抢注leader

  • /controller目录下获取controllerId,成功获取到就返回amILeader。这样就能确保如果有broker注册controller(向zk的/controller目录下面创建临时节点)成功,其他broker就不再尝试注册
  • 抢注leader:使用ZKCheckedEphemeral注册临时节点,下面是controller(broker)关闭的情况下,broker2注册controller成功的输出日志:
    [2017-04-28 10:13:58,689] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    [2017-04-28 10:13:58,692] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    [2017-04-28 10:13:58,693] INFO 2 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    [2017-04-28 10:13:59,592] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

onControllerFailover

在抢注成功后,该broker需要担负起Leader的职责,包括以下几个方面:

1. 改朝换代

读取 /controller_epoch 的值,获取当前controller的纪元(朝代),然后增加1

2. 注册ZK监听程序

对ZK上的节点进行监听:

  • /admin/reassign_partitions:监听partition重分配的动作
  • isr_change_notification:该节点用于通知parition的ISR变化
  • /admin/preferred_replica_election:

以上都是通过controller注册的,下面是通过特定的状态机向ZK注册监听的:

  • PartitionStateMachine:描述parition的状态机
    • /brokers/topicsTopicChangeListener
    • /admin/delete_topicsDeleteTopicsListener
  • replicaStateMachine:描述replicas的状态机
    • /brokers/idsBrokerChangeListener

最后,对每个topic添加PartitionModificationsListener,zk路径为:/brokers/topics/topic_name

3. 初始化controller上下文

收集:brokers、topics、partitions-leader、ISR等信息;开启ControllerChannelManagerpartitionStateMachine

4. 进行分区重分配和分区leader选举操作

详见

5. 发送metadata数据

发送数据到集群上所有的broker,包括live和shutdown的

6. 开启分区重平衡线程

分区的重平衡是否开启由参数auto.leader.rebalance.enable控制,默认开启;线程的执行间隔由leader.imbalance.check.interval.seconds控制,默认为300

所有触发Elect的情况

partition reassign

在Kafka-manager上对topic——testReassign(包含3个分区)进行手动partition分配:

这是重分配之前的分区情况:

调整0和1的分区Replicas:

执行Reassign Partitions,观察日志:

PartitionsReassignedListener感知到了ZK上/admin/reassign_partitions节点上内容发生了变化

[PartitionsReassignedListener on 2]: Partitions reassigned listener fired for path /admin/reassign_partitions. Record partitions to be reassigned {"version":1,"partitions":[{"topic":"testReassign","partition":2,"replicas":[1,2]},{"topic":"testReassign","partition":1,"replicas":[2,3]},{"topic":"testReassign","partition":0,"replicas":[3,1]}]}

由于分区2未进行重分配,其原来的replicas与现在的一致,因此无视该Reassign请求:

kafka.common.KafkaException: Partition [testReassign,2] to be reassigned is already assigned to replicas 1,2. Ignoring request for partition reassignment

接下来对分区0和1进行重分配,就拿1来说:

[Controller 2]: Handling reassignment of partition [testReassign,1] to new replicas 2,3 
[Controller 2]: New replicas 2,3 for partition [testReassign,1] being reassigned not yet caught up with the leader
[Controller 2]: Updated path /brokers/topics/testReassign with {"version":1,"partitions":{"2":[1,2],"0":[2,3],"1":[2,3,1]}} for replica assignment
[Controller 2]: Updated assigned replicas for partition [testReassign,1] being reassigned to 2,3,1
[[Controller 2]: Updating leader epoch for partition [testReassign,1].
[Controller 2]: Updated leader epoch for partition [testReassign,1] to 1
[Replica state machine on controller 2]: Invoking state change to NewReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
[Controller 2]: Waiting for new replicas 2,3 for partition [testReassign,1] being reassigned to catch up with the leader

疑问:往/brokers/topics/testReassign中写入的为啥是[2,3,1]而不是[2,3]呢?留个坑:在KafkaController.onPartitionReassignment方法中找答案吧~
该方法在多处被调用

partition state machine

下图简单描述了partition的4种状态:

状态机通过一个map类型的partitionState来存放所有分区的当前状态。

start up

此时cluster的controller已经产生,controller通过读取ZK的partition和ISR节点信息判断每个partition的当前状态。

具体是判断每个分区对应的LeaderIsrAndControllerEpoch信息与当前的Epoch是否吻合:

  • 如果吻合,则是OnLine状态
  • 存在但不吻合,则是OffLine状态
  • 不存在信息,则是New状态

划分好了当前的状态后,下面要将new和offline的转化成Online

如何转化呢?当然是选举leader咯,关键代码在OfflinePartitionLeaderSelector这个类里,下面这个表是选举时遇到的几种情况

ISR replicas result
(1,2) (1,2) leader = 1
null (2,3) unclean enabled ? leader=2 : NoReplicaOnlineException
null null NoReplicaOnlineException

如当前有个repilca设置为1的topic:”jjhtest”,当前leader与isr的分配为:

接下来我重启broker3,会发生什么呢?

  1. 由于每个分区的replicas都是固定的(不考虑手动分配),关闭broker3将导致分区1,4,7的replicas中无可用的broker,这3个分区将无法分配到broker,进入offline状态

    如果分区的replica>1,在shutDownBroker方法中:broker作为leader的分区状态转移是online ==> online,leader选举的selector为:ControlledShutdownPartitionLeaderSelector

  2. broker3恢复后,以上3个分区由offline状态向online转变,触发leader的选举

  3. 选举的过程:

    partitionReplicaAssignment

    该变量是个Mutable Map,存放的是每个分区对应的replicas信息。这个信息有两个来源:

  4. TopicChangeListener监听ZK上的topic变化,比如新增了topic,那么就会从ZK获取分区的replicas分配信息

  5. PartitionModificationsListener监听parition的变化,比如新增了分区,会从ZK获取新增分区的replicas信息。

electLeaderForPartition

当partition的状态由offline或者online ==> online时,会通过不同的PartitionLeaderSelector选举leader(LeaderSelector缩写为LS):

每次成功执行了leader select,都会更新leader cache(controllerContext.partitionLeadershipInfo)

preferred replica election

在controller启动的过程中会执行一次preferredReplicaElection(简称PRE),并创建一个间隔为leader.imbalance.check.interval.seconds(default:300)的定时任务执行PartitionRebalance操作,该操作会触发PRE

kafka平衡策略的实现依赖于以下几方面的因素

  1. 在不考虑手动分配分区的情况下,每个分区分配的Replicas是写死的,包括顺序都是固定的。
  2. 在出现broker挂掉的情况下,leader的重新选举能保证消息不丢失(未触发脏选举),而PRE(首选备份选举)能保证broker恢复后分区的leader能均衡分布在集群上。

小实验

有一个分区数为6,备份数为2的topic:”TheOne”,当前的分配状态为:

  • 关闭broker3后分区4和5的leader重新选举:

  • broker3恢复后,又重新加入到了各个topic的ISR中(IsrChangeNotificationListener被触发):

  • 一段时间后,触发PRE:

    [Controller 1]: Starting preferred replica leader election for partitions [TheOne,5] (kafka.controller.KafkaController)
    [Partition state machine on Controller 1]: Invoking state change to OnlinePartition for partitions [TheOne,5] (kafka.controller.PartitionStateMachine)
    [PreferredReplicaPartitionLeaderSelector]: Current leader 1 for partition [TheOne,5] is not the preferred replica. Trigerring preferred replica leader election (kafka.controller.PreferredReplicaPartitionLeaderSelector)
    [Controller 1]: Partition [TheOne,5] completed preferred replica leader election. New leader is 3 (kafka.controller.KafkaController)

leader & ISR又恢复了初始状态,达到了均衡。

PRE的选举操作由preferredReplicaPartitionLeaderSelector完成,状态为online ==> online

思考:动态扩展broker如何实现?动态扩展broker能否实现分区的均衡扩展呢?

partition reblance

  • 分区重平衡的间隔上面提到过了,默认为5分钟。
  • 触发重平衡由失衡率(imbalanceRatio)决定:
    $$ ratio = Sum(partitonsNotLeaded)/Sum(partitonsShouldLeaded)$$
    当ratio大于leader.imbalance.per.broker.percentage(默认10%)时会触发重平衡

    DEBUG: topics not in preferred replica Map() (kafka.controller.KafkaController)
    TRACE: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController)
    DEBUG: topics not in preferred replica Map() (kafka.controller.KafkaController)
    TRACE: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController)
    DEBUG: topics not in preferred replica Map([TheOne,4] -> List(3, 2), [TheOne,5] -> List(3, 1),...) (kafka.controller.KafkaController)
    TRACE: leader imbalance ratio for broker 3 is 0.951613 (kafka.controller.KafkaController)
  • 重平衡的过程主要是执行PRE

Replica state machine

与partition相比多了3种状态

create topic

新建topic涉及到partition以及replica状态的改变:

  • NoExistent ==> New
    Invoking state change to NewPartition for partitions [TT,6],...(kafka.controller.PartitionStateMachine)
    Invoking state change to NewReplica for replicas [Topic=TT,Partition=6,Replica=2],[Topic=TT,Partition=6,Replica=3],... (kafka.controller.ReplicaStateMachine)

在新建topic后,ZK就会为该topic分配replicas

  • New ==> Online
    #partition state machine
    Invoking state change to OnlinePartition for partitions [TT,6],...
    Live assigned replicas for partition [TT,6] are: [List(2, 3)]
    Initializing leader and isr for partition [TT,6] to (Leader:2,ISR:2,3,LeaderEpoch:0,ControllerEpoch:30)
    #replica state machine
    Invoking state change to OnlineReplica for replicas [Topic=TT,Partition=6,Replica=2],[Topic=TT,Partition=6,Replica=3],...

ReplicaDeletionStarted

该状态一般在进行手动分区分配时产生的,以parition reassign实验试验中的分区1为例:replicas:[3,1] ==> [2,3]

  • broker2成为NewReplicaInvoking state change to NewReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
  • broker2进入OnlineReplicaInvoking state change to OnlineReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
  • broker1成为OfflineReplica,从ISR中移除:

    Invoking state change to OfflineReplica for replicas [Topic=testReassign,Partition=1,Replica=1]
    Removing replica 1 from ISR 3,1 for partition [testReassign,1]
  • broker1进入ReplicaDeletionStarted状态:Invoking state change to ReplicaDeletionStarted for replicas [Topic=testReassign,Partition=1,Replica=1]

  • broker1进入ReplicaDeletionSuccessfulnvoking state change to ReplicaDeletionSuccessful for replicas [Topic=testReassign,Partition=1,Replica=1]
  • broker1进入NonExistentReplicaInvoking state change to NonExistentReplica for replicas [Topic=testReassign,Partition=1,Replica=1]

ZKListener 详解

下面对Controller中使用到的一些监听ZK节点及子节点变化的listener进行梳理

listener 监听的节点 用处 处理逻辑
BrokerChangeListener /brokers/ids 子节点 在ReplicaStateMachine中使用 BCL
ISRChangeNotificationListener /isr_change_notification 子节点 Controller用于监听各个partition的ISR变化 ICNL
PartitionModificationsListener /brokers/topics/topic_name 监听对应topic分区的变化(只允许增加) PML
LeaderChangeListener /controller 监听集群leader的变化 注册监听

Broker Change Listener

/brokers/ids目录下有多个以broker_id命名的子节点,每个子节点上存储着与broker相关的信息,如:

{"jmx_port":9999,"timestamp":"1493886226618","endpoints":["PLAINTEXT://ip:9092"],"host":"ip","version":3,"port":9092}

当这些子节点的内容发生变化时,controller感知到后会统计出变化的情况:

  • broker2 shutdown:Newly added brokers: , deleted brokers: 2, all live brokers: 1,3
  • broker2 startup: Newly added brokers: 2, deleted brokers: , all live brokers: 1,2,3

delete broker

shutdownBroker

该方法与KafkaServer中的shutdown方法的区别在于前者用于处理ControlledShutdown请求,将分区的职责转移出去,而后者是关闭brokerServer的一堆服务。

需要处理的分区为:
$ List(partition) = { partition | id ∈ Replicas(partition) && replicaFactor(partition) > 1 } $

  1. 对于作为leader的分区,执行leader的重选举(ControlledShutdownLS)
  2. 对于作为普通replica的分区:
    • 关闭ReplicaRequest,如broker3是TheOne-1的replica,在关闭broker3时controller会向3发送关闭replica请求的请求The stop replica request (delete = false) sent to broker 3 is [Topic=TheOne,Partition=1,Replica=3](kafka.controller.ControllerBrokerRequestBatch)
      • broker3收到StopReplica请求后调用ReplicaManagerstopReplicas移除了相关的replica的Fetcher:[ReplicaFetcherManager on broker 3] Removed fetcher for partitions TheOne-1
    • 将broker3的Replica状态置为OfflineReplica

onBrokerFailure

步骤如下:

  1. shutdownBroker的处理的分区中有一项要求是:replicaFactor > 1,而对于replicaFactor = 1的分区来说,如果其leader所在的broker关闭了,那么该分区的状态将置为OfflinePartition。筛选的依据是其leader是否为关闭的broker(其他类型的分区在上面的流程中要么leader重新选举过了,要么只是改变ISR)
  2. 尝试使用OfflinePartitionLS将offline和new类型的partition转化为Online
  3. 集体性的将该broker的replica的状态置为OfflineReplica

    感觉重复操作了,因为shutdownBroker阶段也有这操作。不同点在于shutdownBroker只处理replica>1分区的replica状态。

  4. 如果第一步操作未执行,那么将向其他broker发送一个UpdateMetadataRequest

    onBrokerStartup

    主要处理的依然是分区、replica、ISR以及leader的转化:

  5. 发送UpdateMetadataRequest(具体原因不明)

  6. 将该broker的replica状态置为OnlineReplica
  7. 尝试使用OfflinePartitionLS将offline和new类型的partition转化为Online,这一步的主要目的是为了将onBrokerFailue阶段呗置为OffLinePartition的分区恢复成Online[OfflinePartitionLeaderSelector]: No broker in ISR is alive for [jjhtest,1]. Pick the leader from the alive assigned replicas: 3 [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [jjhtest,1]. Elect leader 3 from live brokers 3. There is potential data loss. [OfflinePartitionLeaderSelector]: Selected new leader and ISR {"leader":3,"leader_epoch":23,"isr":[3]} for offline partition [jjhtest,1]
  8. 判断是否需要执行分区重分配的操作

ISR Change Notification Listener

集群正常运行时/isr_change_notification节点下面是没有子节点的,当ISR发生变化的时候,如重启某个broker,那些ISR原来包含该broker的分区需要调整ISR,因此会创建这些partition的子节点。

下面是broker3挂掉后,TheOne这个topic的ISR:

在broker3恢复后,ISNL感知到了某些partition的ISR发生了变化,进行下面3步操作:

  1. 根据子节点的内容更新对应parition的leader以及ISR cache (从ZK上获取)
  2. 向集群所有节点发送MetadataRequestDEBUG Sending MetadataRequest to Brokers:ArrayBuffer(1, 2, 3) for TopicAndPartitions:Set([TheOne, 2], [TheOne, 4],...)
  3. 删除子节点

Partition Modifications Listener

Contorller为每个topic都设置了一个PartitionModificationsListener,用于监听新增分区的操作

下面是将TheOne的分区数增加3,PML所感知到的信息:

[AddPartitionsListener on 1]: Partition modification trigered {"version":1,"partitions":{"8":[3,2],"4":[3,2],"5":[3,1],"6":[1,3],"1":[1,3],"0":[1,2],"2":[2,3],"7":[2,1],"3":[2,1]}} for path /brokers/topics/TheOne
[AddPartitionsListener on 1]: New partitions to be added Map([TheOne,7] -> List(2, 1), [TheOne,6] -> List(1, 3), [TheOne,8] -> List(3, 2))

新增了Partition,ZK已经为这些partition分配了replicas,在ZK上每个topic的分区上存储着的信息如:{"controller_epoch":30,"leader":3,"version":1,"leader_epoch":0,"isr":[3,2]}

拿到这些消息后,Controller要做的第一件事就是将新增的分区的replicas信息添加到partitionReplicaAssignment中,接下来才是处理这些新建的分区的状态、leader等,这个操作与create topic的处理如出一辙,区别在于前者是对新增的分区进行处理,后者是对这个topic的所有分区进行处理