Kafka Coordinator实现细节

GroupCoordinator

每个kafka server在启动的时候会创建一个GroupCoordinator用于管理group以及consumer的offset fetch/commit

在创建GroupCoordinator实例时不仅需要brokerId、group以及offset config,还需要传入replicaManager,其作用是

GroupMetadataManager

GroupMetadataManager是GroupCoordinator最重要的组成部分,其作用是管理group的元信息(状态、成员、提交的offset信息)以及作为coordinator的broker所分配到的分区信息,主要成员有4个:

  1. groupMetadataCache:存储group与GroupMetadata的cache
  2. loadingPartitions:”__consumer_offsets”中正在被当前coordinator加载的分区
  3. ownedPartitions:”__consumer_offsets”中分配到当前coordinator的分区(即该broker是这些分区的leader)
  4. scheduler:删除过期offset以及group元数据的定时任务(执行间隔由offsets.retention.check.interval.ms参数控制,默认为10分钟)

__consumer_offsets

__consumer_offsets是用于存储消费者消费信息的topic,存储的消息由两部分组成

  • 一部分是offset信息(kafka.coordinator.OffsetsMessageFormatter类型)的:

    [groupId,topic,partition]::[OffsetMetadata[offset,metadata],CommitTime ExprirationTime]
  • 另一部分是group信息(kafka.coordinator.GroupMetadataMessageFormatter类型):

    groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)]

group分配到哪个分区的策略在kafka cosumer中介绍过了。

作为一个特殊的topic,consumer_offsets也有replica的概念,并且其replica factor与其他topic保持一致consumer_offsets上每个分区都对应一个leader,作为leader的broker上的GroupCoordinator会记录着分区上记录着的group以及offset信息。当leader(__consumer_offsets分布的leader)发生变化时,新的leader需要加载对应分区上的group以及offset信息。

server在处理LeaderAndIsrRequest时会对__consumer_offsets的分区做出入境 操作:

def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}

小实验

“G3”这个group根据hash映射到分区2上,当前的ISR为:

Topic: __consumer_offsets	Partition: 2	Leader: 1	Replicas: 1,3,2	Isr: 2,3,1

接下来,关闭broker1,导致broker3成为了新的leader。

broker3执行入境 操作,加载分区2上面的的offset以及group信息:

[Group Metadata Manager]: Loading offsets and group metadata from [__consumer_offsets,2]
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 1
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 2
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 3
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 4
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3448,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-0.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3441,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-3.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3257,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-5.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3382,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-2.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3397,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-4.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3163,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-1.
[GroupCoordinator 3]: Loading group metadata for G3 with generation 4
[Group Metadata Manager]: Finished loading offsets from [__consumer_offsets,2] in 21 milliseconds.

loadGroupsForPartition方法中通过使用Map确保加载每个group最新generation的信息

在执行入境操作之前,分区2被添加到loadingPartitions中,表示coordinator正在加载该分区里面的信息,这个阶段如果有groupId在loadingPartitions之内的消费请求进来,是无法响应的;
处理完后,分区2被添加到ownedPartitions中。

保存group元数据

方法定义如下:

def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Option[DelayedStore]

  • groupAssignment是group中member的分区分配
  • 返回值是DelayedStore,这并不是一个DO类型的延迟任务,只适用于存放消息的临时媒介,方便后续往replicas中写Log用的。

生成消息

第一步是生成能往Log(确切的说是写入__consumer_offsets中该group对应的partition中)中写入的ByteBufferMessageSet(消息写入磁盘及备份实现分析),key为groupId,value为member以及sessionTimeout,即上面所说的group信息。

设置appendLog回调函数

该回调函数作为参数传入到ReplicaManager.replicaMessages,对append操作的结果进行处理,最主要的就是状态的转换:

case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.GROUP_COORDINATOR_NOT_AVAILABLE

case Errors.NOT_LEADER_FOR_PARTITION =>
Errors.NOT_COORDINATOR_FOR_GROUP

case Errors.REQUEST_TIMED_OUT =>
Errors.REBALANCE_IN_PROGRESS

case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
Errors.UNKNOWN

执行storeGroup回调函数

doSyncGroup中定义了一个用于处理leader的SyncGroupRequest的回调函数:

group synchronized {
if (group.is(AwaitingSync) && generationId == group.generationId) {
if (error != Errors.NONE) {
resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group)
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}

因为在等待回调函数被执行的过程中,可能会有新的member加入,这样的话就无法保证group的状态以及generation不会变化的。

往replicas写group数据

调用GroupMetadataManager的store方法,将DelayedStore中的messageSet以及回调函数传入到replicaManager的appendMessages方法中

replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
delayedStore.messageSet,
delayedStore.callback)

保存offset commit

prepareStoreOffsets方法与prepareStoreGroup基本相像:

def prepareStoreOffsets(group: GroupMetadata,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore]

返回的依然是个DelayedStore
组装的MessageSet中,key为[groupId, topic, partition],value为OffsetMetadata,写入的partition由groupId确定。

offsetCommit回调函数

GroupMetadata为offset commit创建了两个Cacheoffsets以及pendingOffsetCommits,consumer提交的offset先存放到pending中,然后根据一定的状态来决定是否移到offsets中。

如果offsetCommit执行结束后group依旧存活那个根据是否有错误对cache执行不同的操作:

  1. 没有错误码,那么就将offset写入到offsets中,并从pendingOffsetCommits中移除;
  2. 有错误,那么仅仅将offset从pendingOffsetCommits中移除;

offset以及group元数据的清理工作

GroupMetadataManager在启动时会开启一个定时执行的清理线程:”delete-expired-group-metadata”,该线程的主要工作是清理__consumer_offsets中失效的offset以及可删除的group信息。

remove expire offset

每个offset commit提交到server时,都会根据配置的保存时间来设置其失效时间,超过该时间的将会被清除掉。

首先,从cache中筛选出可清楚的offset集合:

val expiredOffsets = offsets.filter {
case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
}
offsets --= expiredOffsets.keySet
expiredOffsets

执行完删除操作过后判断group是否可以判定为DEAD

  1. group不包含member
  2. group的两个offsetCache都为空

立墓碑

当offset被移除或者group进入DEAD状态,都会在__consumer_offsets中留下一个墓碑

对于group而言:

  • 如果当前有member存在,那么其存在于__consumer_offsets中的数据是这样的:

    G6::[G6,Some(consumer),Stable,Map(client1-1ee1d482-c9fa-4617-860a-98d3a3d5a836 -> [client1-1ee1d482-c9fa-4617-860a-98d3a3d5a836,client1,/10.45.48.129,120000])]
  • 如果member为空,但是状态不是DEAD

    G6::[G6,None,Empty,Map()]
  • 如果group被该清理线程认为DEAD,该group信息不仅从groupMetadataCache中移除,还会在__cosnumer_offsets中留下一座墓碑:

    G6::NULL

offset的立墓碑操作与group类似。

consumer与Coordinator的连接过程

确定coordinator

kafka cosumer中提到过consumer寻找coordinator的过程。

选择将哪个节点作为coordinator其实是由consumer client决定的,确切的说是向已连接的节点中随机选择一个最空闲的节点发送GroupCoordinatorRequest。(这里随机是指往一个空闲的随机broker发送请求,收到的response中分配到的coordinator是根据group找对__consumer_offsets对应分区的Leader)这里空闲的定义是:NetworkClient中处于inflight状态的请求数量少,下面是consumer寻找到coordinator的日志:

[AbstractCoordinator] Sending coordinator request for group G6 to broker 10.45.4.10:9092
[AbstractCoordinator] Received group coordinator response ClientResponse(receivedTimeMs=1495510047579, disconnected=false, request=ClientRequest(callback=..., request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=client111}, body={group_id=G6}), createdTimeMs=1495510047472, sendTimeMs=1495510047577), responseBody={error_code=0,coordinator={node_id=1,host=10.45.4.9,port=9092}})

handleGroupCoordinatorRequest

server在接收到GroupCoordinatorRequest后:

  • 根据groupId找到对应__consumer_offsets上的分区P
  • 找到P对应的leader作为coordinator

handleJoinGroup

校验

  • 简单校验:
  1. coordinator是否处于工作状态
  2. groupId是否有效
  3. coordinator是否负责该group
  4. sessionTimeoutMs是否合理
  5. groupId是否在loadingPartitions中,如果在的话表明正在Rebalance中。
  • member校验:
    客户端ConsumerCoordinator发送的JoinGroupRequest 中的memberId永远都是空的,也就是说memberId是由server端进行设置的。如果Request中group是已知(存在于GroupMetadataManager.groupMetadataCache)的并且memberId非空,那么Server将拒绝这个请求。

响应请求

group有不同的状态,在不同的状态下有相应的响应joinGroupRequest方法

Dead

group处于Dead状态表明该group中没有了成员,并且其GroupMetadata已被该coordinator移除,这个状态下对任何请求都是返回UnknownMemberIdException

PreparingRebalance

根据Request中memberId是否为空有两套处理逻辑:

  • memberId为空:执行addMember操作

    //新建memberId,格式为:clientId-UUID 
    val memberId = clientId + "-" + group.generateMemberIdSuffix
    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
    sessionTimeoutMs, protocolType, protocols)
    member.awaitingJoinCallback = callback
    group.add(member.memberId, member)
    maybePrepareRebalance(group)
    member
  • memberId不为空:执行updateMember操作

    member.supportedProtocols = protocols
    member.awaitingJoinCallback = callback
    maybePrepareRebalance(group)

以上两个操作都不会触发PrepareRebalance操作,因为当前已经是PreparingReblance状态了。

AwaitingSync

这个状态表明coordinator已经发送了JoinGroupResponse了,正在等待leader发送分区分配的SyncGroupRequest.

这个时候如果收到一个memberId为空的JoinGroupRequest,表明group中有新的成员加入,除了要创建member信息添加到GroupMetadata中之外,还需要prepareRebalance,并将状态重新置为PreparingReblance

如果收到的memberId不为空,有两种情况:

  1. 该成员未收到之前发送过的JoinGroupResponse,这种情况就重新发送一个Response,leader和member分配不会改变的
  2. 这次的Request中改变了分区分配策略,因此需要执行updateMember操作,并且还需要执行PrepareRebalance操作将状态重新置为PreparingReblance

Empty or Stable

处理三种可能的joinGroup情形:

  1. 收到memberId为空的请求,group有新成员加入,执行addMember以及PrepareRebalce操作。
  2. 收到leader的joinGroup请求或者请求中的分配策略发生变化(何种场景下会leader会重发joinGroup请求嘞?分区变化时consumer是如何rejoin的)
  3. 其他情况(followers发送的没有内容变化的Join请求)说明followers可能没收到Response,因此重发Response。

handleSyncGroup

在校验阶段如果发现该coordinator并不负责该group,则会反馈NotCoordinatorForGroupException

能够正常响应SyncGroup请求的group状态为AwaitingSyncStable,其中AwaitingSync状态下就是执行保存group数据

handleLeaveGroup

主要有三步操作

1、从心跳DOP中移除

心跳DOP(heartbeatPurgatory)中保存的是DelayedHeartbeat,该DO操作用于侦测member是否存活,成员变量包括group、member已经超时时间。当member要离开group时需要将该member对应的
DO操作完成并移除掉。

2、从group中移除

将member从其对应的GroupMetadata中的members中移除,然后根据当前group状态进行对应的处理:

  • Stable or Empty:触发PrepareRebalance
  • PreparingRebalance:complete掉joinPurgatory中该group的DJ操作

3、组装反馈信息

其实consumer对LeaveGroupResponse不是很关心,因为不会重发。

handleFetchOffsets

consumer是从coordinator维护的offset cache :offsets中获取group已提交的offset信息的。

FetchRequest中包含group以及topic-partition信息,据此coordinator进行反馈:

  • 如果group不存在 or group状态为DEAD,则返回的PartitionData中offset为-1(代表InvalidOffset)
  • 如果未指定topic以及partition,那么就将offsets中所有的[topic-partition, commit offset]数据都反馈给consunmer
  • 如果指定topic-partition,就将offsets中对应的数据反馈,找不到就反馈-1

Helper

分区变化时consumer是如何rejoin的?

消费者在长连接的时候,增加了分区的topic,client是如何感知到的呢,下面是实验过程记录:

  • 10:33:08 开启consumer

    [AbstractCoordinator] Sending coordinator request for group G6 to broker 10.45.4.10:9092 (id: -2 rack: null)
    [AbstractCoordinator] Discovered coordinator 10.45.4.9:9092 (id: 2147483646 rack: null) for group G6.
    [ConsumerCoordinator] Revoking previously assigned partitions [] for group G6
    [ConcurrentMessageListenerContainer] partitions revoked:[]
    [AbstractCoordinator] (Re-)joining group G6
    [AbstractCoordinator] Sending JoinGroup ({group_id=G6,session_timeout=120000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator 10.45.4.9:9092 (id: 2147483646 rack: null)
    [AbstractCoordinator] Received successful join group response for group G6:
    [AbstractCoordinator] Successfully joined group G6 with generation 5
    [ConcurrentMessageListenerContainer] partitions assigned:[newOne-8, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3]
  • 10:34:17 controller检测到partitoin变化:

    [AddPartitionsListener on 2]: Partition modification triggered {"version":1,"partitions":{"8":[2,1],"4":[1,3],"11":[2,1],"9":[3,2],"5":[2,1],"10":[1,3],"6":[3,2],"1":[1,2],"0":[3,1],"2":[2,3],"7":[1,3],"3":[3,2]}} for path /brokers/topics/newOne (kafka.controller.PartitionStateMachine$PartitionModificationsListener)
  • 10:38:10 client rejoin

    [ConsumerCoordinator] Revoking previously assigned partitions [newOne-8, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3] for group G6
    [ConcurrentMessageListenerContainer] partitions revoked:[newOne-8, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3]
    [AbstractCoordinator] "(Re-)joining group G6"
    [AbstractCoordinator] Sending JoinGroup ({group_id=G6,session_timeout=120000,member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator 10.45.4.9:9092 (id: 2147483646 rack: null)
    [AbstractCoordinator] Received successful join group response for group G6: {error_code=0,generation_id=6,group_protocol=range,leader_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,members=[{member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}
    [AbstractCoordinator] Sending leader SyncGroup for group G6 to coordinator 10.45.4.9:9092 (id: 2147483646 rack: null): {group_id=G6,generation_id=6,member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,group_assignment=[{member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=70 cap=70]}]}
    [AbstractCoordinator] Successfully joined group G6 with generation 6
    [ConsumerCoordinator] Setting newly assigned partitions [newOne-8, newOne-9, newOne-10, newOne-11, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3] for group G6
    [ConcurrentMessageListenerContainer] partitions assigned:[newOne-8, newOne-9, newOne-10, newOne-11, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3]

那么consumer到底是如何触发rejoin的呢,这个时间间隔有何讲究?

一开始自己关注的点是rejoinNeeded什么时候被置为true

有三种情况下会被置为true:

  1. SyncGroupResponse中存在ERROR
  2. HeartbeatResponse中存在REBALANCE_IN_PROGRESS、ILLEGAL_GENERATION或UNKNOWN_MEMBER_ID的ERROR
  3. consumer leave group

然而从日志来看并没有出现错误,所以注意力转移到needRejoin方法中的其他判断条件:

return subscriptions.partitionsAutoAssigned() &&
(super.needRejoin() || subscriptions.partitionAssignmentNeeded());

partitionsAutoAssigned的条件是一直满足的,partitionAssignmentNeeded被置为true的场景有点多,排查起来比较费时费力。这时有个新的现象进入眼帘:经过多次试验后发现rejoin距离分区调整的时间间隔最长不超过5分钟!。而metadata.max.age.ms这个配置参数的默认值刚好是5分钟:

该配置项是client端强制刷新metadata的最长时间间隔。

因为kafka集群出现broker或者partition变化的时候是不会通知客户端的,因此客户端需要定期的去获取metadata的值。

客户端判断是否需要刷新metadata的方法:

public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

其中metadataExpireMs就是5分钟【默认】。

ConsumerCoordinator为metadata添加了一个Listener监听其更新操作:

private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
...
// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
if (!snapshot.equals(metadataSnapshot)) {
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}

}
});
}

needReassignment()方法将needsPartitionAssignment置为true,这正是partitionAssignmentNeeded()方法所需要的。

how to prepare rebalance

kafka cosumer中做个一个实验:consumer多次关闭重连后,partition将会在较长时间后才能分配到。GroupCoordinator做了什么才导致这样的现象发生嘞?

prepareReblance的代码不长,但是要搞懂到底做了些什么着实不易:

private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(AwaitingSync))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))

val rebalanceTimeout = group.rebalanceTimeoutMs
val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

DelayedOperation And DelayedOperationPurgatory

purgatory wiki

DelayedJoin(简称DJ)是DelayedOperation(简称DO)的子类,DelayedOperationPurgatory(简称DOP)用于记录DO,并将超时的DO执行expired操作?

DO用于执行延迟任务,参数只有一个超时时间,目前已有的实现类有:

比如DelayedFetch的操作允许Fetch等待一定数量的消息或者达到超时时间后再返回。

  • DO完成给定的操作后,会调用onComplete方法(该方法需要子类实现)并且只会被调用一次, isComplete方法将返回true(通过原子变量AtomicBoolean实现);
  • forceCompletetryComplete方法等能触发onComplete,其中前者已经在DO中实现了:
    def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
    // cancel the timeout timer
    cancel()
    onComplete()
    true
    } else {
    false
    }
    }

该方法将原子变量强行换成true(如果当前为false的话),然后调用onComplete操作;后者需要子类实现:在判断是否达到complete条件后再调用forceComplete。

  • safeTryCompleteTryComplete的线程安全版:

    def safeTryComplete(): Boolean = {
    synchronized {
    tryComplete()
    }
    }
  • 如果DO超时,将调用onExpiration操作(0.10.1.1版本中,DJ仍未实现该方法)

Watchers

DOP形象的定义了Watchers这个类用于存放和“观察”DO,使用到的数据结构是ConcurrentLinkedQueue

  • watch:该方法将DO添加到队列中
  • tryComplteWatched:遍历队列,将已完成的DO从队列中移除,调用未完成DOsafeTryComplte方法,并记录在该方法中完成的DO数量。
  • purgeCompleted:该方法只是将已完成的移除掉,并返回移除的数量。
  • 每个Watchers都与一些key关联(HashMap),定义为watchersForKey
    • Key的类型没有限制,
    • 当key对应的DO全部完成后,key以及对应的Watchers一起从Map中移除。

其他参数及属性

  • timeoutTimer
  • brokerId
  • purgeInterval:清理基准线,当DOP中已完成的DO数量达到该基准线后开始清理操作
  • reaperenbaled:是否允许清除DO
  • watchersForKey:存放watcher与对应的key
  • expirationReaper:超时DO清道夫

tryCompleteElseWatch

该方法将一个DO塞进Watchers中并与多个Key进行关联。如果每次与Key进行关联时都执行一次tryComplete操作成本很大。kafka选择了一个折中的策略,保证在该方法内最多调用两次tryComplete方法:

  1. 执行第一次tryComplete操作,成功就返回
  2. 遍历keys,如果DO未完成就将key与Watchers进行绑定,添加到watchersForKey
  3. 执行第二次tryComplete,成功就返回
  4. 依然未成功的话就添加到timeoutTimer
    (进入这里面的DO将怎么处理?怎么才能再次触发onComplete操作呢?)
    • 放到Timer中的DO在超时的时候会执行TimerTask#run()方法
    • DelayedOperation基础了TimerTask类,覆盖了父类的run方法:
        override def run(): Unit = {
      if (forceComplete())
      onExpiration()
      }

DelayedJoin

GroupCoordinator中实现了DJ作为DO的各个方法,在分析这些方法前需要关注的是GroupMetadataMemberMetadata的两个属性:

notYetRejoinedMembers AND awaitingJoinCallback

  • awaitingJoinCallback是Mebmer的属性,初始为null;当member所在的group处于PreparingReblance状态下,member向coordinator发送了JoinGroup请求,那么该字段用于存放将JoinGroupResult反馈的回调方法,在重平衡结束后或者coordinator执行出境操作等会将该字段重新置为Null
  • notYetRejoinedMembers存放该group中没有awaitingJoinCallback的member,在Rebalance期间,该集合中存储的是那些没有发送JoinGroup请求的member。

tryComplete

如果group中所有的成员都发送了JoinGroupRequest 就调用forceComplete方法(DO中的方法):

def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group synchronized {
if (group.notYetRejoinedMembers.isEmpty)
forceComplete()
else false
}
}

留个问题 如果某个member迟迟不发送JoinGroup请求的话,那总不能永久等待吧?member在什么情况下会被移出group呢?(关键点在于DelayedHeartbeat的onExpireHeartbeat方法)

onExpireHeartbeat

如果coordinator听不到member的心跳:

member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

heartbeat的超时时间由consumer的session.timeout.ms控制【默认为100s】
那么就会将member从group中移除:

private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
group.remove(member.memberId)
group.currentState match {
case Dead | Empty =>
case Stable | AwaitingSync => maybePrepareRebalance(group)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}

如果当前group处于PreparingRebalance状态,那么将会将查下是否可以complete join操作。那么前面两个问题已经明朗了:

  1. 添加到DOP的timeoutTimer中的DO只是放入一个定时器内,超时后移除,DO能否在超时前执行onComplete操作完全靠外部触发;
  2. 如果某个group存在多个member,如果其中存在member非正常退出(即没有执行unsubscribe操作),那么coordinator必须依赖心跳超时来检查该member是否dead,在此期间内的joinGroup请求无法立即得到响应。

onCompleteJoin

疑惑 该方法的第一步看不懂:

// remove any members who haven't joined the group yet
group.notYetRejoinedMembers.foreach { failedMember =>
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}

能够执行onCompleteJoin说明notYetRejoinedMembers已经是空的了,这里的移除操作感觉多余了????

JoinGroupResult

val joinResult = JoinGroupResult(
members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
memberId=member.memberId,
generationId=group.generationId,
subProtocol=group.protocol,
leaderId=group.leaderId,
errorCode=Errors.NONE.code)

group的leaderId采取先来后到的原则,新来的memberId作为leaderId。

对于非正常重启一个consumer所遇到的长时间等待可以用下图加深理解: