天外飞猪的博客


  • Home

  • Tags

  • Archives

表连接方案

Posted on 2018-04-11 | Visitors:

表连接是集合运算

嵌套循环连接

最传统、普遍、重要的连接方式

根据执行计划的执行顺序,解析下该连接方式的执行步骤

  1. 在列account_num的索引中,从满足查询条件account_num>’A000000139’的范围中读取第一个索引行。
  2. 利用account_num中的rowid从表ge_balance_account中读取对应的数据行,此时,所读取的数据行中所有列都将获得常量值,并利用查询条件site_type=29对所读取结果进行检验,如果满足条件则执行下一阶段的操作;否则,返回步骤1重新开始处理下一个索引行。
  3. 利用表a的site_name列的常量值去表s中site_name列的索引中寻找对应的索引行,如果没找到对应的行,匹配失败,返回步骤1重新处理下一个索引行;成功,则返回一条匹配记录。

特征

  • 次序性。按次序处理驱动表查询范围中的每一行数据,按次序执行表连接。
  • 先行性。依据优先读取的表中查询范围的大小,决定所需要处理的数据量。
  • 选择性。即使为WHERE条件中所使用到的列都创建了索引,也不意味着这些索引全都会被使用。

应用准则

  • 适合用于范围扫描方式。可以实现快速响应。
  • 如果执行表连接的某一边表只有在获得了对方所提供的执行结果后才能缩减自身的查询范围,则必须选择嵌套循环表连接方式。
  • 驱动结果集数据量较大时,执行效率不高,主要是因为会发生大量的随机读取。

排序合并连接

##执行步骤

  1. 从表s的site_type列索引中读取满足查询条件s.site_type=29的数据行后,并按照site_name的值对数据进行排序。
  2. 从表a中读取满足查询条件的数据行后,也按照site_name的值进行排序。
  3. 对两个排序后的数据按照site_name相等的要求进行合并

特征

  • 并行性。根据自身的查询条件读取条件并排序。
  • 独立性。不需要从其他集合中获取处理结果。
  • 必须按照全局扫描的方式进行处理。
  • 与表的连接方向无关
  • 在很大程度上减少随机读取次数

应用准则

  1. 在只能按照全局扫描方式来处理的SQL中,可以使用该方法
  2. 在不需要获取对应表中任何常量值也可以充分地实现缩减查询范围的条件下,使用该方法很有效
  3. 不适合OLTP类型的系统,因为排序是非常昂贵的操作。

随机读取的代价:在最坏的情况下,为了读取某一行的数据,需要从磁盘将整个表格数据读入。
排序的代价:对内存造成了很大的负担

哈希连接

术语及基本概念

Hash Area
内存空间,存储包括:位图矢量(向量)、哈希表和分区表。
位图矢量:就是Build Input中的值的集合(集合的唯一性),主要用于过滤操作(如果在Probe Input过程中所读取的数据行不存在于位图向量中,则没必要为其在分期中分配空间)
哈希表中存储各个分区的位置信息。

分区(Partition)

聚簇(Cluster)
在哈希聚簇的时候,把具有相同哈希值的行存储在统一聚簇中。(那么跟分区又有什么区别呢? 书中有个比喻,如果将柜子比作分区,那么cluster就是柜子中的抽屉),好吧,这个意思就是:第一次哈希得到的是分区值(即映射到分区上),第二次哈希得到的值是映射到分区内的聚簇上。

Build Input
提前执行的读取准备操作

Probe Input
后来读取的操作

In-Memory哈希连接
能将Build Input全部加载到Hash Area的情况
指将Build Input全部存储在内存中并未其创建哈希表,在扫描Probe Input的同时实现连接。

  1. 根据统计信息选择结果集较小的作为Build Input
  2. 确定分区数fan-out
  3. 经过第一次hash运算,确定所在的分区
  4. 第二次hash运算,得到hash_value_2
  5. 根据hash_value_2创建哈希表,并将对应的列存入相应分区的聚簇中
  6. 根据连接列的值创建位图向量
  7. 按照上面的步骤对表中所有数据对象进行处理
  8. 从现在开始从Probe Input中读取满足查询条件的数据
  9. 第一次hash运算,并利用位图向量对Probe Input对象进行过滤,若没有通过过滤,则返回并重新读取下一个对象
  10. 对于通过的对象进行第二次hash运算,利用hash表读取相关分区和聚簇找到相应的行,找不到,重新读取下一个
  11. 执行连接操作,将结果发送到运输单位
  12. 反复8~11的操作
  13. 运输单位被填满后直接返回结果
  14. 按照以上步骤对Probe Input进行处理,直到结束为止。

疑问:位图向量中到底存储的是什么?是连接列的值呢,还是经过第一次哈希得到的值呢? OK,还是统一确定为连接列的唯一值好了

特征

  • 不需要使用索引
  • 允许实现局部范围扫描

延迟哈希连接

不能完全加载,需要将超出的部分存储在磁盘中

前6步基本一致

  1. 如果超过了Hash Area的范围,则将分区的地址信息存储在分区表中,并将超出的部分移到磁盘上对应的分区上,后面如果寻找到分区并利用地址信息,就再次将磁盘 上的数据加载到内存中实现连接操作
  2. 处理Bulid Input直到结束为止
  3. 从Probe Input的查询范围内读取数据,进行第一次哈希运算,并利用位图向量对结果进行过滤
  4. 对通过过滤的进行第二次哈希运算,如果对应的Build Input对象存在于内存中则读取哈希表进行连接操作,否则,将Probe Input对象存储在其所属的分区中
  5. 将无法实现连接的分区存储在磁盘上
  6. 按照以上方式对Probe Input的对象进行连续处理
  7. 内存中的连接操作处理完毕,接下来,利用分区表中的地址信息从磁盘上将没有被连接的分区对载入到内存中
  8. 从重新载入到内存中的各个分区中选择一个最小的集合为其创建哈希表。实现角色互换
  9. 对重新确定的Probe Input进行扫描,利用哈希表进行连接。按照这种方式对磁盘中剩余的所有对象进行处理,直到结束为止

特征

  • 延迟哈希连接主要被使用在需要处理大量数据的批处理应用程序中。
  • 哈希连接能弥补sort merge join最大的弱点(对海量数据执行排序操作所需要付出的代价过大)
  • 利用位图向量,对另一个集合进行过滤,这一点与嵌套循环连接很相似。
  • 延迟哈希无法实现局部范围扫描
  • 一般而言,指定的Hash Area大小基本上是排序区域大小的1.5倍
  • 在处理非海量数据的情况下,当额外要求对连接列的值进行排序操作时排序合并更有效

半连接

  • 在使用了子查询的时候为了实现子查询与主查询之间的连接而使用的一种广义表连接
  • 子查询可以无条件的继承主查询的所有属性,反之不成立。(主查询所具有的各个列可以被使用在子查询中)
  • 结果集合始终与主查询的集合类型相同

嵌套循环型半连接


为了维护主查询集合类型的完整性而附加性地增加了SORT(UNIQUE)操作

在这里,子查询被优先执行,因此可以将子查询定义为“提供者”,其执行结果将以常量值的形式提供给主查询Where条件中的连接列。

这个SQL的执行计划中,优先执行的是主查询,然后利用其执行结果与子查询进行了连接。因此在这里可以将自查询定义为“检验者”,对主查询的执行结果进行了检验。
实现这一转变的关键在于

  • 主查询中添加了查询条件t.site_type=30
  • 在子查询中添加了a.site_id = t.id这一句看上去重复的连接条件,这个连接条件可以从逻辑上确保子查询不可能被优先执行

过滤型半连接

哈希型半连接

限制条件:

  1. 在子查询中只能使用一个表
  2. 在子查询中再次嵌套使用子查询时无法使用哈希连接
  3. 连接条件只能是相等
  4. 在查询中不能使用Group BY,Connect by, rownum等限制条件

kafka消息

Posted on 2018-04-11 | Visitors:

带着以下两点疑问,进行kafka server的Log管理源码的分析:

  • producer遇到 NOT_LEADER_EXCEPTION 是在何时产生的
  • 消息是如何在ISR中备份的

下面以从上往下的方式对一条消息写入磁盘的全链路进行分析

KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务,
broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为

handleProducerRequest

  • 该方法用于处理Client的producer请求,ApiKeys = 0
  • 从 RequestChannel 中获取请求,然后根据acks规则进行反馈
  • 写入磁盘的动作在replicaManager.appendRecords中完成

ack规则

acks 规则
0 producer不需要等待ack,server收到消息后直接反馈
1 leader成功写入后反馈给producer
-1 (kafka-client中配置为"all") 在ISR中所有replicas都写入消息后才进行反馈

ReplicaManager.appendMessages

  • 先将消息写入leader的log中(正常情况就是当前这个broker)
  • 将消息写到ISR的其他备份中
  • 超时或者ack规则满足时进行反馈操作(执行回调函数:responseCallback)
//写到leader的log
val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)

if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
//acks=-1时需要创建 delayedProduce实现消息的备份
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

} else {
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}

ReplicaManager.appendToLocalLog

当topic为内部topic(即__consumer_offsets),并且不允许往内部类中写消息时,抛出 InvalidTopicException

val info = partitionOpt match {
case Some(partition) =>
partition.appendRecordsToLeader(records, requiredAcks)

case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
.format(topicPartition, localBrokerId))
}

Partition.appendRecordsToLeader

判断当前broker是否是parition的leader

def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(assignedReplicaMap.get(replicaId))

def leaderReplicaIfLocal: Option[Replica] =
leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
  • assignedReplicaMap:用于存放每个partition对应的leader已经replicas
  • 通过比较leaderId与localBrokerId,判断当前broker是否就是leader
  • 如果不满足,那么将抛出NotLeaderForPartitionException

备份数不满足条件的消息不会写入

 val log = leaderReplica.log.get
val minIsr = log.config.minInSyncReplicas
val inSyncSize = inSyncReplicas.size
// acks为all的时候才会进行判断
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
.format(topicPartition, inSyncSize, minIsr))
}

Log.append

  • 返回的格式为LogAppendInfo,包含第一条及最后一条offset信息
  • 找到该Partition在当前broker上面最新的segment,如果塞不进去 就新建一个segment
  • 将消息添加到segment中
  • 更新segment的LogEndOffset为最新添加消息的lastOffset + 1

    ByteBufferMessageSet

    append方法中传入的消息集的数据结构为ByteBufferMessageSet (注:在0.10.2.0版本后引入新的结构:MemoryRecords),父类为Messageset,其结构如下图所示:

    一个有效的MessageSet的最小长度为12字节

    Message的组成

    Message在magic不同的情况下有不同的结构:

    迭代器的实现

    internalIterator是ByteBufferMessageSet实现的迭代器,迭代单位是MessageAndOffset

magic的值是由server中Log的配置属性:message.format.version决定的,0.10.0之前的版本的magic值为0,之后的版本为1

迭代的过程也是对消息的有效性的检验过程:

  • ByteBuffer(对应MessageSet)的长度是否< 12
  • 消息体(对应Message)的长度是否 < 魔术为0时Message的头部长度(4+1+1+8+4 = 18)
  • ByteBuffer的长度是否<消息体的长度(否则就表明消息不完整)

满足以上条件的消息一定是无法继续迭代的

由于消息的载体实现的是ByteBuffer,那我们就从Buffer的操作的角度来看看message和offset是如何被迭代取出来的:

假设当前接收到一个新的ByteBuffer,下面进行迭代:

  1. 获取buffer的片段(第一次算是拷贝):topIter = buffer.slice()
  2. 获取offset(获取前八个字节): offset = topIter.getLong()
  3. 获取size(获取紧接着的四个字节):size = topIter.getInt()
  4. 获取message(当前position指向message的开始位置,截取后续size大小的就可以得到message):message = topIter.slice; message.limit(size)
  5. 将topIter的position指向下一条MessageSet: topIter.position(topIter.position + size)

LogAppendInfo的生成

LogAppendInfo主要包含四个属性,用于描述message set
: firstOffest:第一条消息的offset
: lastOffset:最后一条消息的offset
: maxTimestamp:消息里面包含的最大时间戳
: offsetOfMaxTimestamp:最大时间戳消息的offset

analyzeAndValidateMessageSet方法实现了LogAppendInfo的生成,根据上面提到的迭代器对MessageSet中的消息进行迭代处理,找出并记录offset和timestamp信息,此外也对每条消息进行检验:

  1. 每条消息的大小不能超过max.message.bytes所定义的
  2. 每条消息必须通过循环冗余校验

消息的进一步校验

对消息的进一步校验及转化是在validateMessagesAndAssignOffsets方法中完成的。该方法的参数中涉及到一些概念:

1.topic清理策略

log.cleanup.policy配置项控制着消息在segment中持久化的策略,目前有两种策略供选择:delete和compact,默认选项为delte。

  • delete的策略很好理解,就是当segment时间或者大小到期了就删除。
  • compact的策略是为了满足系统灾后恢复的需求,该选项是针对topic的,比如存在某个topic:email_topic用于存储用户变更的email信息,key=userId,value=emailAddress,compact操作就是在日志删除过程中保留每个userId最新的数据,如果系统崩溃了也能通过该topic获得用户最新修改的email地址。下面的图很好的诠释了这种操作:

    2. 版本与magicValue

  • 由于kafka的版本更新速度比较快,为了能让新的server版本兼容老的client版本以及server的滚动升级的实现,提供了message.format.version配置项定义consumer及Producer的API版本。

  • 不同的API版本对应不同的magicValue,其中0.9.0.X版本之前(包括该版本)的magicValue未0,之后的都为1

    当前我们生产环境使用的API版本为0.10.0.0,server的版本为0.10.1.1

3. 解析非压缩消息

producer可以选择是否对消息进行压缩

message.timestamp.type:时间戳类型
: CreateTime:消息的创建时间<默认值>
: LogAppendTime:添加到log的时间

message.timestamp.difference.max.ms:最大时间间隔,表示收到的消息中的时间戳与当前时间的差的最大容忍值。

对非压缩消息的进一步处理的过程依然是ByteBuffer的操作过程:

/*主要目的是获取maxTimestamp和offsetOfMaxTimestamp*/
var messagePosition = 0
var maxTimestamp = Message.NoTimestamp
var offsetOfMaxTimestamp = -1L
//将position位置存到mark中
buffer.mark()
while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
buffer.position(messagePosition)
//offsetCounter是server维护的下一个offset的值
buffer.putLong(offsetCounter.getAndIncrement())
val messageSize = buffer.getInt()
val messageBuffer = buffer.slice()
messageBuffer.limit(messageSize)
val message = new Message(messageBuffer)
//以上的操作能获取到MessageSet中的一条message
if (message.magic > Message.MagicValue_V0) {
validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
if (message.timestamp > maxTimestamp) {
maxTimestamp = message.timestamp
offsetOfMaxTimestamp = offsetCounter.value - 1
}
}
//将buffer的position移到后一条消息的头部
messagePosition += MessageSet.LogOverhead + messageSize
}
//根据mark恢复position
buffer.reset()

经过上面的操作,MessageSet中的offset被server重新设置了,并且maxTimestamp之类的信息重新收集了。这些信息在append操作中会被使用:

validMessages = validateAndOffsetAssignResult.validatedMessages
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1

其中:
: validMessages就是处理完后的MessageSet
: offset是上面使用到的offsetCounter,即下一个offset值。为什么要-1?因为上面执行了getAndIncrement()操作,因此当前的offset指向的依然是下一个offset值

4.压缩消息的处理

producer的压缩策略必须与broker一致,如果不匹配那么将不会解压缩消息

以下几种情况不会对压缩消息进行解压处理:

  1. topic指定了压缩策略,但是发送的消息中没有key(会报错)
  2. 消息体与server的magic值不一致

找到合适的segment

  • 一个topic由多个parition组成,每个partition又存在多个segment
  • 每个segment的大小由log.segment.bytes控制,下面是segment大小设置为1024的broker上某个partiton的Log情况:
    [test-0]$ du -smh *
    0 00000000000000017699.index
    4.0K 00000000000000017699.log
    4.0K 00000000000000017699.timeindex
    0 00000000000000017724.index
    4.0K 00000000000000017724.log
    0 00000000000000017724.timeindex
  • 每个log的命名规则是取该log中 下一个 offset的值(logEndOffset);
  • 以index结尾的offset index文件的作用是将offset映射到物理文件中
  • timeindex文件将时间戳与segment中的逻辑offset联系起来

满足以下几个条件之一时会创建新的segment:

  1. 当前segment容不下最新的消息
  2. 当前segment非空,并且达到了log.roll.hours的时间
  3. offset 或者 time index满了

下面是写消息时创建出新的segment时server的输出日志:

[2017-04-05 17:14:29,776] DEBUG Rolling new log segment in test-11 (log_size = 1006/1024}, index_size = 0/1310720, time_index_size = 1/873813, inactive_time_ms = 184193/604800000). (kafka.log.Log)

将消息添加到LogSegment中

LogSegment的参数主要有:
: log:File类型的MessageSet,该Set中包含一个FileChannel,能够从ByteBuffer中读取数据
: index: OffsetIndex,逻辑Offset到物理文件位置的索引
: timeIndex:TimeIndex,时间戳到物理位置的索引
: baseOffset:第一个offset

写消息的操作:

将AppendInfo中的消息写入到LogSegment中的FileChannel中

def writeFullyTo(channel: GatheringByteChannel): Int = {
buffer.mark()
var written = 0
while (written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
written
}
_size.getAndAdd(written)

FileMessageSet的search操作

根据offset或者时间戳是从Log中读取消息的常见方法。FileMessageSet提供对应的两个方法:searchForOffsetWithSize和searchForTimestamp。前者是从FileMessageSet的给定位置往后搜索第一个大于等于目标offset的消息,返回的是Offset&Position,后者是从给定位置往后搜索第一个时间戳大于等于目标时间戳的消息,返回Timestamp&Offset。

下图描述的是从0开始搜索offset≥1003的消息:

写OffsetIndex

OffsetIndex中并不会记录所有Offset的映射关系,写入Index的时机由index.interval.bytes参数(default:4096)控制,当segment中积累的消息数量大于该参数时,会将此次写入segment中的MessageSet的第一个消息的offset写入OffsetIndex中:

if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += messages.sizeInBytes

index.append的具体操作如下:

if (_entries == 0 || offset > _lastOffset) {
mmap.putInt((offset - baseOffset).toInt)
mmap.putInt(position)
_entries += 1
_lastOffset = offset
}

上面的操作是将offset相对这个Segmet的baseOffset的偏移值以及物理地址填入到OffsetIndex中定义的MappedByteBuffer。

之所以使用相对偏移值是出于节省存储空间的考虑,相对偏移值只需要4位空间就能存储,而MessageSet中的offset占8位。

写TimeIndex

在每次执行append操作时,TimeIndex记录的是最大时间戳及其对应的offset的索引。

if (timestamp > lastEntry.timestamp) {
mmap.putLong(timestamp)
mmap.putInt((offset - baseOffset).toInt)
_entries += 1
}

写入Buffer中的是时间戳以及相对偏移量

更新LogEndOffset

上面的分析中提到过每个Log都维护了一个记录下一个offset的变量——nextOffsetMetadata,该变量是LogOffsetMetadata类型的:
: messageOffset:绝对偏移值
: segmentBaseOffset:LogSegment的baseOffset值
: relativePositionInSegment:LogSegment的大小(字节数),根据字节数可以定位到这条消息在segment中的物理位置

每次append操作都会执行:
nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
其中:

  • messageOffset为 AppendInfo.lastOffset+1
  • activeSegment是当前可用的segment
  • activeSegment.size由_size.get(),这个_size正是在上面往segment中写消息时进行更新的,每次增加的值是写入channel中的字节数。

下面这张图生动描述了这个更新的操作:

  • 绿色的代表activeSegment,当前的nextOffsetMetadata为10(指的是messageOffset)
  • 写入10条消息后,nextOffsetMetadata更新为21
  • 再次写入10条消息后,更新为31
  • 再来10条消息,无法写入,执行roll操作新建一个segment,messageOffset值未改变,不过activeSegment变化了
  • 将10条消息写入新的segment中,更新为41

Kafka Coordinator实现细节

Posted on 2018-04-04 | Visitors:

GroupCoordinator

每个kafka server在启动的时候会创建一个GroupCoordinator用于管理group以及consumer的offset fetch/commit

在创建GroupCoordinator实例时不仅需要brokerId、group以及offset config,还需要传入replicaManager,其作用是

GroupMetadataManager

GroupMetadataManager是GroupCoordinator最重要的组成部分,其作用是管理group的元信息(状态、成员、提交的offset信息)以及作为coordinator的broker所分配到的分区信息,主要成员有4个:

  1. groupMetadataCache:存储group与GroupMetadata的cache
  2. loadingPartitions:”__consumer_offsets”中正在被当前coordinator加载的分区
  3. ownedPartitions:”__consumer_offsets”中分配到当前coordinator的分区(即该broker是这些分区的leader)
  4. scheduler:删除过期offset以及group元数据的定时任务(执行间隔由offsets.retention.check.interval.ms参数控制,默认为10分钟)

__consumer_offsets

__consumer_offsets是用于存储消费者消费信息的topic,存储的消息由两部分组成

  • 一部分是offset信息(kafka.coordinator.OffsetsMessageFormatter类型)的:

    [groupId,topic,partition]::[OffsetMetadata[offset,metadata],CommitTime ExprirationTime]
  • 另一部分是group信息(kafka.coordinator.GroupMetadataMessageFormatter类型):

    groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)]

group分配到哪个分区的策略在kafka cosumer中介绍过了。

作为一个特殊的topic,consumer_offsets也有replica的概念,并且其replica factor与其他topic保持一致。consumer_offsets上每个分区都对应一个leader,作为leader的broker上的GroupCoordinator会记录着分区上记录着的group以及offset信息。当leader(__consumer_offsets分布的leader)发生变化时,新的leader需要加载对应分区上的group以及offset信息。

server在处理LeaderAndIsrRequest时会对__consumer_offsets的分区做出入境 操作:

def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}

小实验

“G3”这个group根据hash映射到分区2上,当前的ISR为:

Topic: __consumer_offsets	Partition: 2	Leader: 1	Replicas: 1,3,2	Isr: 2,3,1

接下来,关闭broker1,导致broker3成为了新的leader。

broker3执行入境 操作,加载分区2上面的的offset以及group信息:

[Group Metadata Manager]: Loading offsets and group metadata from [__consumer_offsets,2]
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 1
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 2
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 3
[Group Metadata Manager]: Loaded group metadata for group G3 with generation 4
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3448,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-0.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3441,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-3.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3257,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-5.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3382,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-2.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3397,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-4.
[Group Metadata Manager]: Loaded offset [OffsetMetadata[3163,NO_METADATA],CommitTime 1495503059311,ExpirationTime 1495506659311] for newOne-1.
[GroupCoordinator 3]: Loading group metadata for G3 with generation 4
[Group Metadata Manager]: Finished loading offsets from [__consumer_offsets,2] in 21 milliseconds.

在loadGroupsForPartition方法中通过使用Map确保加载每个group最新generation的信息

在执行入境操作之前,分区2被添加到loadingPartitions中,表示coordinator正在加载该分区里面的信息,这个阶段如果有groupId在loadingPartitions之内的消费请求进来,是无法响应的;
处理完后,分区2被添加到ownedPartitions中。

保存group元数据

方法定义如下:

def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Option[DelayedStore]

  • groupAssignment是group中member的分区分配
  • 返回值是DelayedStore,这并不是一个DO类型的延迟任务,只适用于存放消息的临时媒介,方便后续往replicas中写Log用的。

生成消息

第一步是生成能往Log(确切的说是写入__consumer_offsets中该group对应的partition中)中写入的ByteBufferMessageSet(消息写入磁盘及备份实现分析),key为groupId,value为member以及sessionTimeout,即上面所说的group信息。

设置appendLog回调函数

该回调函数作为参数传入到ReplicaManager.replicaMessages,对append操作的结果进行处理,最主要的就是状态的转换:

case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.GROUP_COORDINATOR_NOT_AVAILABLE

case Errors.NOT_LEADER_FOR_PARTITION =>
Errors.NOT_COORDINATOR_FOR_GROUP

case Errors.REQUEST_TIMED_OUT =>
Errors.REBALANCE_IN_PROGRESS

case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
Errors.UNKNOWN

执行storeGroup回调函数

在doSyncGroup中定义了一个用于处理leader的SyncGroupRequest的回调函数:

group synchronized {
if (group.is(AwaitingSync) && generationId == group.generationId) {
if (error != Errors.NONE) {
resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group)
} else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}

因为在等待回调函数被执行的过程中,可能会有新的member加入,这样的话就无法保证group的状态以及generation不会变化的。

往replicas写group数据

调用GroupMetadataManager的store方法,将DelayedStore中的messageSet以及回调函数传入到replicaManager的appendMessages方法中

replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
delayedStore.messageSet,
delayedStore.callback)

保存offset commit

prepareStoreOffsets方法与prepareStoreGroup基本相像:

def prepareStoreOffsets(group: GroupMetadata,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore]

返回的依然是个DelayedStore。
组装的MessageSet中,key为[groupId, topic, partition],value为OffsetMetadata,写入的partition由groupId确定。

offsetCommit回调函数

GroupMetadata为offset commit创建了两个Cache:offsets以及pendingOffsetCommits,consumer提交的offset先存放到pending中,然后根据一定的状态来决定是否移到offsets中。

如果offsetCommit执行结束后group依旧存活那个根据是否有错误对cache执行不同的操作:

  1. 没有错误码,那么就将offset写入到offsets中,并从pendingOffsetCommits中移除;
  2. 有错误,那么仅仅将offset从pendingOffsetCommits中移除;

offset以及group元数据的清理工作

GroupMetadataManager在启动时会开启一个定时执行的清理线程:”delete-expired-group-metadata”,该线程的主要工作是清理__consumer_offsets中失效的offset以及可删除的group信息。

remove expire offset

每个offset commit提交到server时,都会根据配置的保存时间来设置其失效时间,超过该时间的将会被清除掉。

首先,从cache中筛选出可清楚的offset集合:

val expiredOffsets = offsets.filter {
case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
}
offsets --= expiredOffsets.keySet
expiredOffsets

执行完删除操作过后判断group是否可以判定为DEAD:

  1. group不包含member
  2. group的两个offsetCache都为空

立墓碑

当offset被移除或者group进入DEAD状态,都会在__consumer_offsets中留下一个墓碑。

对于group而言:

  • 如果当前有member存在,那么其存在于__consumer_offsets中的数据是这样的:

    G6::[G6,Some(consumer),Stable,Map(client1-1ee1d482-c9fa-4617-860a-98d3a3d5a836 -> [client1-1ee1d482-c9fa-4617-860a-98d3a3d5a836,client1,/10.45.48.129,120000])]
  • 如果member为空,但是状态不是DEAD:

    G6::[G6,None,Empty,Map()]
  • 如果group被该清理线程认为DEAD,该group信息不仅从groupMetadataCache中移除,还会在__cosnumer_offsets中留下一座墓碑:

    G6::NULL

offset的立墓碑操作与group类似。

consumer与Coordinator的连接过程

确定coordinator

在kafka cosumer中提到过consumer寻找coordinator的过程。

选择将哪个节点作为coordinator其实是由consumer client决定的,确切的说是向已连接的节点中随机选择一个最空闲的节点发送GroupCoordinatorRequest。(这里随机是指往一个空闲的随机broker发送请求,收到的response中分配到的coordinator是根据group找对__consumer_offsets对应分区的Leader)这里空闲的定义是:NetworkClient中处于inflight状态的请求数量少,下面是consumer寻找到coordinator的日志:

[AbstractCoordinator] Sending coordinator request for group G6 to broker 10.45.4.10:9092
[AbstractCoordinator] Received group coordinator response ClientResponse(receivedTimeMs=1495510047579, disconnected=false, request=ClientRequest(callback=..., request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=client111}, body={group_id=G6}), createdTimeMs=1495510047472, sendTimeMs=1495510047577), responseBody={error_code=0,coordinator={node_id=1,host=10.45.4.9,port=9092}})

handleGroupCoordinatorRequest

server在接收到GroupCoordinatorRequest后:

  • 根据groupId找到对应__consumer_offsets上的分区P
  • 找到P对应的leader作为coordinator

handleJoinGroup

校验

  • 简单校验:
  1. coordinator是否处于工作状态
  2. groupId是否有效
  3. coordinator是否负责该group
  4. sessionTimeoutMs是否合理
  5. groupId是否在loadingPartitions中,如果在的话表明正在Rebalance中。
  • member校验:
    客户端ConsumerCoordinator发送的JoinGroupRequest 中的memberId永远都是空的,也就是说memberId是由server端进行设置的。如果Request中group是已知(存在于GroupMetadataManager.groupMetadataCache)的并且memberId非空,那么Server将拒绝这个请求。

响应请求

group有不同的状态,在不同的状态下有相应的响应joinGroupRequest方法

Dead

group处于Dead状态表明该group中没有了成员,并且其GroupMetadata已被该coordinator移除,这个状态下对任何请求都是返回UnknownMemberIdException

PreparingRebalance

根据Request中memberId是否为空有两套处理逻辑:

  • memberId为空:执行addMember操作

    //新建memberId,格式为:clientId-UUID 
    val memberId = clientId + "-" + group.generateMemberIdSuffix
    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
    sessionTimeoutMs, protocolType, protocols)
    member.awaitingJoinCallback = callback
    group.add(member.memberId, member)
    maybePrepareRebalance(group)
    member
  • memberId不为空:执行updateMember操作

    member.supportedProtocols = protocols
    member.awaitingJoinCallback = callback
    maybePrepareRebalance(group)

以上两个操作都不会触发PrepareRebalance操作,因为当前已经是PreparingReblance状态了。

AwaitingSync

这个状态表明coordinator已经发送了JoinGroupResponse了,正在等待leader发送分区分配的SyncGroupRequest.

这个时候如果收到一个memberId为空的JoinGroupRequest,表明group中有新的成员加入,除了要创建member信息添加到GroupMetadata中之外,还需要prepareRebalance,并将状态重新置为PreparingReblance

如果收到的memberId不为空,有两种情况:

  1. 该成员未收到之前发送过的JoinGroupResponse,这种情况就重新发送一个Response,leader和member分配不会改变的
  2. 这次的Request中改变了分区分配策略,因此需要执行updateMember操作,并且还需要执行PrepareRebalance操作将状态重新置为PreparingReblance。

Empty or Stable

处理三种可能的joinGroup情形:

  1. 收到memberId为空的请求,group有新成员加入,执行addMember以及PrepareRebalce操作。
  2. 收到leader的joinGroup请求或者请求中的分配策略发生变化(何种场景下会leader会重发joinGroup请求嘞?分区变化时consumer是如何rejoin的)
  3. 其他情况(followers发送的没有内容变化的Join请求)说明followers可能没收到Response,因此重发Response。

handleSyncGroup

在校验阶段如果发现该coordinator并不负责该group,则会反馈NotCoordinatorForGroupException。

能够正常响应SyncGroup请求的group状态为AwaitingSync和Stable,其中AwaitingSync状态下就是执行保存group数据

handleLeaveGroup

主要有三步操作

1、从心跳DOP中移除

心跳DOP(heartbeatPurgatory)中保存的是DelayedHeartbeat,该DO操作用于侦测member是否存活,成员变量包括group、member已经超时时间。当member要离开group时需要将该member对应的
DO操作完成并移除掉。

2、从group中移除

将member从其对应的GroupMetadata中的members中移除,然后根据当前group状态进行对应的处理:

  • Stable or Empty:触发PrepareRebalance
  • PreparingRebalance:complete掉joinPurgatory中该group的DJ操作

3、组装反馈信息

其实consumer对LeaveGroupResponse不是很关心,因为不会重发。

handleFetchOffsets

consumer是从coordinator维护的offset cache :offsets中获取group已提交的offset信息的。

FetchRequest中包含group以及topic-partition信息,据此coordinator进行反馈:

  • 如果group不存在 or group状态为DEAD,则返回的PartitionData中offset为-1(代表InvalidOffset)
  • 如果未指定topic以及partition,那么就将offsets中所有的[topic-partition, commit offset]数据都反馈给consunmer
  • 如果指定topic-partition,就将offsets中对应的数据反馈,找不到就反馈-1

Helper

分区变化时consumer是如何rejoin的?

消费者在长连接的时候,增加了分区的topic,client是如何感知到的呢,下面是实验过程记录:

  • 10:33:08 开启consumer

    [AbstractCoordinator] Sending coordinator request for group G6 to broker 10.45.4.10:9092 (id: -2 rack: null)
    [AbstractCoordinator] Discovered coordinator 10.45.4.9:9092 (id: 2147483646 rack: null) for group G6.
    [ConsumerCoordinator] Revoking previously assigned partitions [] for group G6
    [ConcurrentMessageListenerContainer] partitions revoked:[]
    [AbstractCoordinator] (Re-)joining group G6
    [AbstractCoordinator] Sending JoinGroup ({group_id=G6,session_timeout=120000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator 10.45.4.9:9092 (id: 2147483646 rack: null)
    [AbstractCoordinator] Received successful join group response for group G6:
    [AbstractCoordinator] Successfully joined group G6 with generation 5
    [ConcurrentMessageListenerContainer] partitions assigned:[newOne-8, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3]
  • 10:34:17 controller检测到partitoin变化:

    [AddPartitionsListener on 2]: Partition modification triggered {"version":1,"partitions":{"8":[2,1],"4":[1,3],"11":[2,1],"9":[3,2],"5":[2,1],"10":[1,3],"6":[3,2],"1":[1,2],"0":[3,1],"2":[2,3],"7":[1,3],"3":[3,2]}} for path /brokers/topics/newOne (kafka.controller.PartitionStateMachine$PartitionModificationsListener)
  • 10:38:10 client rejoin

    [ConsumerCoordinator] Revoking previously assigned partitions [newOne-8, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3] for group G6
    [ConcurrentMessageListenerContainer] partitions revoked:[newOne-8, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3]
    [AbstractCoordinator] "(Re-)joining group G6"
    [AbstractCoordinator] Sending JoinGroup ({group_id=G6,session_timeout=120000,member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator 10.45.4.9:9092 (id: 2147483646 rack: null)
    [AbstractCoordinator] Received successful join group response for group G6: {error_code=0,generation_id=6,group_protocol=range,leader_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,members=[{member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}
    [AbstractCoordinator] Sending leader SyncGroup for group G6 to coordinator 10.45.4.9:9092 (id: 2147483646 rack: null): {group_id=G6,generation_id=6,member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,group_assignment=[{member_id=client111-39aa3f5d-cdf2-492f-bf41-4affe1a00421,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=70 cap=70]}]}
    [AbstractCoordinator] Successfully joined group G6 with generation 6
    [ConsumerCoordinator] Setting newly assigned partitions [newOne-8, newOne-9, newOne-10, newOne-11, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3] for group G6
    [ConcurrentMessageListenerContainer] partitions assigned:[newOne-8, newOne-9, newOne-10, newOne-11, newOne-4, newOne-5, newOne-6, newOne-7, newOne-0, newOne-1, newOne-2, newOne-3]

那么consumer到底是如何触发rejoin的呢,这个时间间隔有何讲究?

一开始自己关注的点是rejoinNeeded什么时候被置为true

有三种情况下会被置为true:

  1. SyncGroupResponse中存在ERROR
  2. HeartbeatResponse中存在REBALANCE_IN_PROGRESS、ILLEGAL_GENERATION或UNKNOWN_MEMBER_ID的ERROR
  3. consumer leave group

然而从日志来看并没有出现错误,所以注意力转移到needRejoin方法中的其他判断条件:

return subscriptions.partitionsAutoAssigned() &&
(super.needRejoin() || subscriptions.partitionAssignmentNeeded());

partitionsAutoAssigned的条件是一直满足的,partitionAssignmentNeeded被置为true的场景有点多,排查起来比较费时费力。这时有个新的现象进入眼帘:经过多次试验后发现rejoin距离分区调整的时间间隔最长不超过5分钟!。而metadata.max.age.ms这个配置参数的默认值刚好是5分钟:

该配置项是client端强制刷新metadata的最长时间间隔。

因为kafka集群出现broker或者partition变化的时候是不会通知客户端的,因此客户端需要定期的去获取metadata的值。

客户端判断是否需要刷新metadata的方法:

public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

其中metadataExpireMs就是5分钟【默认】。

ConsumerCoordinator为metadata添加了一个Listener监听其更新操作:

private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
...
// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
if (!snapshot.equals(metadataSnapshot)) {
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}

}
});
}

needReassignment()方法将needsPartitionAssignment置为true,这正是partitionAssignmentNeeded()方法所需要的。

how to prepare rebalance

在kafka cosumer中做个一个实验:consumer多次关闭重连后,partition将会在较长时间后才能分配到。GroupCoordinator做了什么才导致这样的现象发生嘞?

prepareReblance的代码不长,但是要搞懂到底做了些什么着实不易:

private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(AwaitingSync))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))

val rebalanceTimeout = group.rebalanceTimeoutMs
val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

DelayedOperation And DelayedOperationPurgatory

purgatory wiki

DelayedJoin(简称DJ)是DelayedOperation(简称DO)的子类,DelayedOperationPurgatory(简称DOP)用于记录DO,并将超时的DO执行expired操作?

DO用于执行延迟任务,参数只有一个超时时间,目前已有的实现类有:

比如DelayedFetch的操作允许Fetch等待一定数量的消息或者达到超时时间后再返回。

  • 当DO完成给定的操作后,会调用onComplete方法(该方法需要子类实现)并且只会被调用一次, isComplete方法将返回true(通过原子变量AtomicBoolean实现);
  • forceComplete和tryComplete方法等能触发onComplete,其中前者已经在DO中实现了:
    def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
    // cancel the timeout timer
    cancel()
    onComplete()
    true
    } else {
    false
    }
    }

该方法将原子变量强行换成true(如果当前为false的话),然后调用onComplete操作;后者需要子类实现:在判断是否达到complete条件后再调用forceComplete。

  • safeTryComplete是TryComplete的线程安全版:

    def safeTryComplete(): Boolean = {
    synchronized {
    tryComplete()
    }
    }
  • 如果DO超时,将调用onExpiration操作(0.10.1.1版本中,DJ仍未实现该方法)

Watchers

DOP形象的定义了Watchers这个类用于存放和“观察”DO,使用到的数据结构是ConcurrentLinkedQueue。

  • watch:该方法将DO添加到队列中
  • tryComplteWatched:遍历队列,将已完成的DO从队列中移除,调用未完成DO的safeTryComplte方法,并记录在该方法中完成的DO数量。
  • purgeCompleted:该方法只是将已完成的移除掉,并返回移除的数量。
  • 每个Watchers都与一些key关联(HashMap),定义为watchersForKey
    • Key的类型没有限制,
    • 当key对应的DO全部完成后,key以及对应的Watchers一起从Map中移除。

其他参数及属性

  • timeoutTimer
  • brokerId
  • purgeInterval:清理基准线,当DOP中已完成的DO数量达到该基准线后开始清理操作
  • reaperenbaled:是否允许清除DO
  • watchersForKey:存放watcher与对应的key
  • expirationReaper:超时DO清道夫

tryCompleteElseWatch

该方法将一个DO塞进Watchers中并与多个Key进行关联。如果每次与Key进行关联时都执行一次tryComplete操作成本很大。kafka选择了一个折中的策略,保证在该方法内最多调用两次tryComplete方法:

  1. 执行第一次tryComplete操作,成功就返回
  2. 遍历keys,如果DO未完成就将key与Watchers进行绑定,添加到watchersForKey
  3. 执行第二次tryComplete,成功就返回
  4. 依然未成功的话就添加到timeoutTimer中
    (进入这里面的DO将怎么处理?怎么才能再次触发onComplete操作呢?)
    • 放到Timer中的DO在超时的时候会执行TimerTask#run()方法
    • DelayedOperation基础了TimerTask类,覆盖了父类的run方法:
        override def run(): Unit = {
      if (forceComplete())
      onExpiration()
      }

DelayedJoin

GroupCoordinator中实现了DJ作为DO的各个方法,在分析这些方法前需要关注的是GroupMetadata 与 MemberMetadata的两个属性:

notYetRejoinedMembers AND awaitingJoinCallback

  • awaitingJoinCallback是Mebmer的属性,初始为null;当member所在的group处于PreparingReblance状态下,member向coordinator发送了JoinGroup请求,那么该字段用于存放将JoinGroupResult反馈的回调方法,在重平衡结束后或者coordinator执行出境操作等会将该字段重新置为Null
  • notYetRejoinedMembers存放该group中没有awaitingJoinCallback的member,在Rebalance期间,该集合中存储的是那些没有发送JoinGroup请求的member。

tryComplete

如果group中所有的成员都发送了JoinGroupRequest 就调用forceComplete方法(DO中的方法):

def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group synchronized {
if (group.notYetRejoinedMembers.isEmpty)
forceComplete()
else false
}
}

留个问题 如果某个member迟迟不发送JoinGroup请求的话,那总不能永久等待吧?member在什么情况下会被移出group呢?(关键点在于DelayedHeartbeat的onExpireHeartbeat方法)

onExpireHeartbeat

如果coordinator听不到member的心跳:

member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

heartbeat的超时时间由consumer的session.timeout.ms控制【默认为100s】
那么就会将member从group中移除:

private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
group.remove(member.memberId)
group.currentState match {
case Dead | Empty =>
case Stable | AwaitingSync => maybePrepareRebalance(group)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}

如果当前group处于PreparingRebalance状态,那么将会将查下是否可以complete join操作。那么前面两个问题已经明朗了:

  1. 添加到DOP的timeoutTimer中的DO只是放入一个定时器内,超时后移除,DO能否在超时前执行onComplete操作完全靠外部触发;
  2. 如果某个group存在多个member,如果其中存在member非正常退出(即没有执行unsubscribe操作),那么coordinator必须依赖心跳超时来检查该member是否dead,在此期间内的joinGroup请求无法立即得到响应。

onCompleteJoin

疑惑 该方法的第一步看不懂:

// remove any members who haven't joined the group yet
group.notYetRejoinedMembers.foreach { failedMember =>
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}

能够执行onCompleteJoin说明notYetRejoinedMembers已经是空的了,这里的移除操作感觉多余了????

JoinGroupResult:

val joinResult = JoinGroupResult(
members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
memberId=member.memberId,
generationId=group.generationId,
subProtocol=group.protocol,
leaderId=group.leaderId,
errorCode=Errors.NONE.code)

group的leaderId采取先来后到的原则,新来的memberId作为leaderId。

对于非正常重启一个consumer所遇到的长时间等待可以用下图加深理解:

test_blog

Posted on 2018-04-04 | Visitors:

this is title

this is the second title

//this is code
public static void main(String[] args) {
System.out.println("hello world");
}

Hello World

Posted on 2018-04-04 | Visitors:

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

$ hexo new "My New Post"

More info: Writing

Run server

$ hexo server

More info: Server

Generate static files

$ hexo generate

More info: Generating

Deploy to remote sites

$ hexo deploy

More info: Deployment

《Effective Java》读书笔记

Posted on 2017-09-30 | Visitors:

构造函数 VS 静态工厂方法

静态工厂方法与设计模式无关,只是一种创建实例的方式,如:

public static Boolean valueOf(boolean b) {
return (b ? TRUE : FALSE);
}

与传统的构造函数相比,使用静态工厂方法的优势有三点

  1. 静态工厂方法的命名更灵活,使用者不需要查看API也能通过静态工厂方法创建想要的实例
  2. 静态工厂方法创建的对象是单例的:
    Boolean fa = Boolean.valueOf(false);
    Boolean fb = Boolean.valueOf(false);
    assertTrue(fa == fb);

    Boolean ca = new Boolean(false);
    Boolean cb = new Boolean(false);
    assertTrue(ca != cb);
    assertTur(fa.equals(ca));

如果比较的两个对象都是单例的,那么通过 ==而非equals方法来判断俩对象是否相同的效率更高

  1. 静态工厂方法能返回员返回类型的任意子类型的对象:

    EnumSet是一个抽象类,RegularEnumSet以及JumboEnumSet是其实现类,使用者可以通过以下静态工厂方法创建一个存放枚举的集合,而这个集合的实现类是什么,使用者并不关心。使用者关心的是EnumSet提供的API方法能否正常使用就可以了。

    public static <E extends Enum<E>> EnumSet<E> noneOf(Class<E> elementType) {
    Enum<?>[] universe = getUniverse(elementType);
    if (universe == null)
    throw new ClassCastException(elementType + " not an enum");

    if (universe.length <= 64)
    return new RegularEnumSet<>(elementType, universe);
    else
    return new JumboEnumSet<>(elementType, universe);
    }

仅有的劣势:如果某个类只能通过静态工厂方法实例化,而缺乏public或者protected的构造方法因而无法扩展。

如何构造多属性的对象

当我们需要构建一个包含最多6个,最少1个属性值的对象时,常用的方式有以下几种

1. 重构构造函数

提供多个包含不同属性值的构造函数

这种方式有很明显的缺陷:

  • 随着属性值数量的增大构造函数将变得非常庞大、臃肿,可读性较差
  • 在给构造函数传递参数时很容易搞错(虽然现在强大如IDEA这样的编辑器会有提示参数名的功能)。比如一个Human类中身高以及体重参数的类型都是Double,传递参数时如果将身高体重搞混了编译器并不会报错,但是这样的结果难免就是很奇葩了。

2. 使用JavaBeans模式

这种模式就是先通过无参构造函数创建一个空的对象,然后使用setter方法对属性进行赋值,这种方式虽然提高了可读性,但是存在较为严重的缺陷:无法保证状态一致性,并且无法创建不可变的对象,安全性较差。

3. builder模式【推荐】

builder模式不直接生成对象,而是通过调用构造器传入必要的参数获得一个内部builder对象,通过调用该对象的build方法获得一个不可变的对象。

  • 使用builder模式创建对象提高了可读性以及可扩展性,
  • 一般只有在参数可数大于4的情况下才会使用builder模式,尤其适用于参数可选的情况下,springKakfa在生成Container时我们就采用的builder模式。

类与接口

封装是软件设计的基本原则之一,设计良好的模块会隐藏所有的实现细节,将API与实现隔离开来

尽可能地使每个类或者成员不被外界访问

  • 如果在一个发行的版本中某个类或者接口是共有的,那么你就有责任永远支持它已保证其兼容性!

  • 成员的4种访问级别:

    • public:在任何地方都可以访问该成员
    • protected:声明该成员的类的子类可以访问这个成员
    • package private:声明该成员的包内部的类可以访问这个成员【default】
    • private:声明该成员的顶层类内部可以访问
  • 如果子类覆盖了父类的某个方法,那么子类中对应方法的访问权限不能低于父类的。(如果覆盖了父类的protected方法,那么子类中该方法必须声明为protected或者public)

  • 因为接口中所有方法都隐含着public访问级别,因此实现接口的方法必须声明为public。

通过对象引用实现策略模式

在C语言中可以通过函数指针来实现策略模式,比如qsort函数要求用一个指向comparator函数的指针作为参数。在Java中没有函数指针的概念,但是可以通过对象引用来实现同样的功能。

下面的代码展示了最简单的策略使用:
首先定义一个具体的策略类,考虑到策略类会被频繁使用,并且该类是无状态的,所以使用单例模式来导出策略类的实例比较好,这样能减少不必要的对象创建开销

public class StringLengthComparator {
private StringLengthComparator() {}

public static final StringLengthComparator SLC = new StringLengthComparator();

public int compare(String s1, String s2) {
return s1.length() - s2.length();
}
}

然后在一个客户端方法中将策略类当做参数传入:

public class StringArrayUtil {

public void sortStringList(List<String> list, StringLengthComparator comparator) {
Assert.assertTrue(list.size() > 2);
if (comparator.compare(list.get(0), list.get(1)) > 0) {
String tmp = list.get(0);
list.set(0, list.get(1));
list.set(1, tmp);
}
}
}

这里传入的参数限定为具体策略类不方便扩展,所以可以定义一个接口:

public interface Comparator<String> {
public int compare(String s1, String s2);
}

客户端的声明可以改为:public void sortStringList(List<String> list, Comparator comparator)

具体的策略类往往使用匿名类声明,比如最常见的:

Arrays.sort(array, new Comparator<String>() {
public int compare(String s1, String s2) {
return s1.length() -s2.length();
}
});

以这种方式使用匿名类时,每次执行调用的时候都会创建一个新的实例,可以考虑将函数对象存储到一个私有的静态final域里。

泛型

泛型相关的术语

参数化类型是不可变的

对于不同的类型Type1和Type2,List<Type1>既不是List<Type2>的子类型也不是其超类型。
我们可以将任意对象放到List<Object>中,但是却只能将字符串对象放到List<String>中

以Stack为例:

public class Stack<E> extends Vector<E> {
...
public E push(E item) {
addElement(item);
return item;
}
}

由于子类型的对象可以转化成父类型,所以以下操作可行的:

Stack<Number> stack = new Stack<>();
stack.push(new Integer(1));

但是如果我在自己的Stack中要实现一个pushAll的功能,将一组数据push到栈中:

public class MyStack<E> extends Stack<E>{
public void pushAll(Iterable<E> src) {
for (E e : src) {
push(e);
}
}
}

当传入数据的类型与Stack的类型不一致时,即使传入对象是Stack类型的子类型,编译期会提示错误

有限制通配符的妙用

前面的pushAll方法其参数称为:E的Iterable接口 ,通过有限制通配符将参数改为:E的某个子类型的Iterable接口:public void pushAll(Iterable<? extends E> src)

接下来,需要实现一个popAll的方法将Stack中的数据pop到一个集合中,首先想到的实现是这样的:

public void popAll(Collection<E> dest) {
while (!this.isEmpty()) {
dest.add(pop());
}
}

这样的实现有个限制,就是只能导出到一个类似于List<Number>的集合中,如果尝试导出到List<Object>则会报错:

同样的借助于有限制通配符将popAll的参数由E的集合类型转化成E的某种超类的集合:
public void popAll(Collection<? super E> dest)

类型安全的异构容器

泛型常用于容器,参数化的容器一般限制了参数类型的数量,如一个List只有一个类型参数,一个Map有两个类型参数。

如果想要获得更多的灵活性,就需要对参数进行泛型化,而不是对容器进行泛型化。考虑下面这个类:

public class Favorites {
private static Map<Class<?>, Object> map = new HashMap<>();

public static <T> void put(Class<T> type, T instance) {
map.put(type, instance);
}

public static <T> T get(Class<T> type) {
return (T) map.get(type);
}
}

  • 在Java 1.5之后对Class进行了泛型化处理
  • 对map的Key进行了泛型化,这样的话我们就能将不同类型的对象存到容器中,这就是异构
  • Map的value类型是Object,因此容器并不能保证键值对之间的类型关系,如果传入原生态的Class,那么就可恶意的将一个String对象映射到其他类型,进而破坏Favorites的内部结构
    Class c = Integer.class;
    Favorites.put(c, "da");
    System.out.println(Favorites.get(Integer.class));

为了避免出现不可预料的运行时异常,在put过程应该严格把关,确保传入的类型与对象类型是一致的,这可以借助于Class的cast方法:

public T cast(Object obj) {
if (obj != null && !isInstance(obj))
throw new ClassCastException(cannotCastMsg(obj));
return (T) obj;
}

其中isInstance方法能判断对象是否为指定类的对象。

异常与并发

ConcurrentModificationException

这是Java提供的一种标准异常,适用于的场景为:在禁止并发修改的情况下检测到了对象的并发修改

看下面一个简单的例子:

public void testConcurrentModificationException() {
Set<Integer> set = new HashSet<>(Arrays.asList(1,2,3));
for (Integer i : set) {
System.out.println(i);
if (i == 2) {
set.remove(3);
}
}
}

输出的结果:

1
2

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
at java.util.HashMap$KeyIterator.next(HashMap.java:1461)
at Test.testConcurrentModificationException(Test.java:549)

只要i<=2就会报这个错,因为企图在遍历列表的过程中,将一个元素从列表中删除是非法的

如何愉快的使用线程池

Posted on 2017-09-07 | Visitors:

近来,在工作中多次使用了多线程编程,其中使用最多的就是ExecutorService了,在使用中遇到了诸多问题,最主要的问题是——何时关闭线程池?

何时关闭线程池的问题可以转化为如何获知所有线程执行完毕。关于这个问题,stackoverflow上有一篇很有意思的讨论

可以总结出以下几种方式:

1. shutdown + awaitTermination的方式

模式:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}

调用了shutdown方法后,线程池不再接受新的任务,等待当前所有任务执行完毕后就会退出。这种方式适用于所有线程是一次性执行的,也就是说线程池中的线程只会被调用一次,如果需要执行的任务次数大于线程池的数量,调用shutdown方法会导致后续任务无法分配到线程执行。

2. 使用countDownLatch

CountDownLatch是一个常用于线程同步的闭锁,这个适用于已知任务总执行次数的情况。
模式:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}

try {
latch.await();
} catch (InterruptedException E) {
// handle
}

在每个线程中执行一次countDown操作。

3. 使用Future.get()

因为future.get()方法会等待线程执行完成,并获取返回值,所以当所有task对应的future都有返回值了,就可以关闭线程池了。
模式:

ExecutorService pool = Executors.newFixedThreadPool(concurrent);
List<Future> futures = new ArrayList<>();
for (Map<String, Date> dateSplit : dateSplits) {
futures.add(pool.submit(new MyCallableTask());
}
for (Future future : futures) {
future.get();
}
pool.shutdown();

4. 使用Future + CompletionService

CompletionService也是java.util.concurrent提供的。
模式:

ExecutorService threadPool = Executors.newFixedThreadPool(N_THREADS);
CompletionService<List<Object>> pool = new
ExecutorCompletionService<~>(threadPoolpool);
List<Future<List<Object>>> futures = new ArrayList<>();
while(...) {
futures.add(pool.submit(new CallableTask()));
}

for (future in futures) {
future.get();
}
treadPool.shutdown();

com.best.oasis.express.util.kafka.utils.KafkaConsumerUtils中就是使用这种模式来获取每个分区上对应时间段内的消息的。该模式适用于需要处理task返回值的情况

5. 使用CompletableFuture (需要JDK1.8)

用的不多,直接看模式:

CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, pool))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
pool.shutdown();

6. 使用ListenableFuture (需要Guava)

上面几种模式都是等待线程执行完,而这个则是通过回调,在所有任务执行完后通知用户。
模式:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
.newFixedThreadPool(concurrent));
List<ListenableFuture<Object>> futures = new ArrayList<>();
for (Map<String, Date> dateSplit : dateSplits) {
ListenableFuture<Object> lf = service.submit(new MyCallableTask());
futures.add(lf);
}
ListenableFuture<List<Object>> lf = Futures.successfulAsList(futures);
Futures.addCallback(lf, new FutureCallback<List<Object>>() {
@Override
public void onSuccess(List<Object> result) {
logger.info("所有线程处理完毕");
service.shutdownNow();
}

@Override
public void onFailure(Throwable t) {
logger.info("某个线程出错了" + t);
}
});

successfulAsList在所有Future都执行成功后返回一个由这些Future返回值组成的List,而且错误或者取消的Future都是在onSuccess中处理的,只是返回值为null。顺便一提:使用这个Future扩展形式的话,task中的异常是会被拦截的

Controller工作原理

Posted on 2017-04-28 | Visitors:

[TOC]

start up controller

引入Controller的目的是为了减小ZK的压力以及降低整个分布式系统的复杂度。

注册监听

向zk注册监听两个实例:

  • SessionExpirationListener:如果session时效了,zk会负责重连的,kafka这边不需要处理
  • LeaderChangeListener:监听集群的leader(即controller)发生改变,对于原leader需要执行resign的操作(onControllerResignation)将Leadership上交:
    • de-register listeners:IsrChangeNotificationListener、ReassignedPartitionsListener、PreferredReplicaElectionListener、ReassignedPartitionsIsrChangeListeners
    • 关闭的功能有:删除topic、自动重平衡管理、replica与partition的状态机等

start Elector

首先要判断下zookeeper下面是否存在永久节点:/controller,该节点存储着controller的信息,如:{"version":1,"brokerid":2,"timestamp":"1493345638688"}

抢注leader

  • 从/controller目录下获取controllerId,成功获取到就返回amILeader。这样就能确保如果有broker注册controller(向zk的/controller目录下面创建临时节点)成功,其他broker就不再尝试注册
  • 抢注leader:使用ZKCheckedEphemeral注册临时节点,下面是controller(broker)关闭的情况下,broker2注册controller成功的输出日志:
    [2017-04-28 10:13:58,689] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    [2017-04-28 10:13:58,692] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    [2017-04-28 10:13:58,693] INFO 2 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    [2017-04-28 10:13:59,592] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

onControllerFailover

在抢注成功后,该broker需要担负起Leader的职责,包括以下几个方面:

1. 改朝换代

读取 /controller_epoch 的值,获取当前controller的纪元(朝代),然后增加1

2. 注册ZK监听程序

对ZK上的节点进行监听:

  • /admin/reassign_partitions:监听partition重分配的动作
  • isr_change_notification:该节点用于通知parition的ISR变化
  • /admin/preferred_replica_election:

以上都是通过controller注册的,下面是通过特定的状态机向ZK注册监听的:

  • PartitionStateMachine:描述parition的状态机
    • /brokers/topics:TopicChangeListener
    • /admin/delete_topics:DeleteTopicsListener
  • replicaStateMachine:描述replicas的状态机
    • /brokers/ids:BrokerChangeListener

最后,对每个topic添加PartitionModificationsListener,zk路径为:/brokers/topics/topic_name

3. 初始化controller上下文

收集:brokers、topics、partitions-leader、ISR等信息;开启ControllerChannelManager和partitionStateMachine

4. 进行分区重分配和分区leader选举操作

详见

  • partition reassign
  • PreferredReplicaElection

5. 发送metadata数据

发送数据到集群上所有的broker,包括live和shutdown的

6. 开启分区重平衡线程

分区的重平衡是否开启由参数auto.leader.rebalance.enable控制,默认开启;线程的执行间隔由leader.imbalance.check.interval.seconds控制,默认为300

所有触发Elect的情况

partition reassign

在Kafka-manager上对topic——testReassign(包含3个分区)进行手动partition分配:

这是重分配之前的分区情况:

调整0和1的分区Replicas:

执行Reassign Partitions,观察日志:

PartitionsReassignedListener感知到了ZK上/admin/reassign_partitions节点上内容发生了变化

[PartitionsReassignedListener on 2]: Partitions reassigned listener fired for path /admin/reassign_partitions. Record partitions to be reassigned {"version":1,"partitions":[{"topic":"testReassign","partition":2,"replicas":[1,2]},{"topic":"testReassign","partition":1,"replicas":[2,3]},{"topic":"testReassign","partition":0,"replicas":[3,1]}]}

由于分区2未进行重分配,其原来的replicas与现在的一致,因此无视该Reassign请求:

kafka.common.KafkaException: Partition [testReassign,2] to be reassigned is already assigned to replicas 1,2. Ignoring request for partition reassignment

接下来对分区0和1进行重分配,就拿1来说:

[Controller 2]: Handling reassignment of partition [testReassign,1] to new replicas 2,3 
[Controller 2]: New replicas 2,3 for partition [testReassign,1] being reassigned not yet caught up with the leader
[Controller 2]: Updated path /brokers/topics/testReassign with {"version":1,"partitions":{"2":[1,2],"0":[2,3],"1":[2,3,1]}} for replica assignment
[Controller 2]: Updated assigned replicas for partition [testReassign,1] being reassigned to 2,3,1
[[Controller 2]: Updating leader epoch for partition [testReassign,1].
[Controller 2]: Updated leader epoch for partition [testReassign,1] to 1
[Replica state machine on controller 2]: Invoking state change to NewReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
[Controller 2]: Waiting for new replicas 2,3 for partition [testReassign,1] being reassigned to catch up with the leader

疑问:往/brokers/topics/testReassign中写入的为啥是[2,3,1]而不是[2,3]呢?留个坑:在KafkaController.onPartitionReassignment方法中找答案吧~
该方法在多处被调用

partition state machine

下图简单描述了partition的4种状态:

状态机通过一个map类型的partitionState来存放所有分区的当前状态。

start up

此时cluster的controller已经产生,controller通过读取ZK的partition和ISR节点信息判断每个partition的当前状态。

具体是判断每个分区对应的LeaderIsrAndControllerEpoch信息与当前的Epoch是否吻合:

  • 如果吻合,则是OnLine状态
  • 存在但不吻合,则是OffLine状态
  • 不存在信息,则是New状态

划分好了当前的状态后,下面要将new和offline的转化成Online

如何转化呢?当然是选举leader咯,关键代码在OfflinePartitionLeaderSelector这个类里,下面这个表是选举时遇到的几种情况

ISR replicas result
(1,2) (1,2) leader = 1
null (2,3) unclean enabled ? leader=2 : NoReplicaOnlineException
null null NoReplicaOnlineException

如当前有个repilca设置为1的topic:”jjhtest”,当前leader与isr的分配为:

接下来我重启broker3,会发生什么呢?

  1. 由于每个分区的replicas都是固定的(不考虑手动分配),关闭broker3将导致分区1,4,7的replicas中无可用的broker,这3个分区将无法分配到broker,进入offline状态

    如果分区的replica>1,在shutDownBroker方法中:broker作为leader的分区状态转移是online ==> online,leader选举的selector为:ControlledShutdownPartitionLeaderSelector

  2. broker3恢复后,以上3个分区由offline状态向online转变,触发leader的选举

  3. 选举的过程:

    partitionReplicaAssignment

    该变量是个Mutable Map,存放的是每个分区对应的replicas信息。这个信息有两个来源:

  4. TopicChangeListener监听ZK上的topic变化,比如新增了topic,那么就会从ZK获取分区的replicas分配信息

  5. PartitionModificationsListener监听parition的变化,比如新增了分区,会从ZK获取新增分区的replicas信息。

electLeaderForPartition

当partition的状态由offline或者online ==> online时,会通过不同的PartitionLeaderSelector选举leader(LeaderSelector缩写为LS):

每次成功执行了leader select,都会更新leader cache(controllerContext.partitionLeadershipInfo)

preferred replica election

在controller启动的过程中会执行一次preferredReplicaElection(简称PRE),并创建一个间隔为leader.imbalance.check.interval.seconds(default:300)的定时任务执行PartitionRebalance操作,该操作会触发PRE

kafka平衡策略的实现依赖于以下几方面的因素:

  1. 在不考虑手动分配分区的情况下,每个分区分配的Replicas是写死的,包括顺序都是固定的。
  2. 在出现broker挂掉的情况下,leader的重新选举能保证消息不丢失(未触发脏选举),而PRE(首选备份选举)能保证broker恢复后分区的leader能均衡分布在集群上。

小实验

有一个分区数为6,备份数为2的topic:”TheOne”,当前的分配状态为:

  • 关闭broker3后分区4和5的leader重新选举:

  • broker3恢复后,又重新加入到了各个topic的ISR中(IsrChangeNotificationListener被触发):

  • 一段时间后,触发PRE:

    [Controller 1]: Starting preferred replica leader election for partitions [TheOne,5] (kafka.controller.KafkaController)
    [Partition state machine on Controller 1]: Invoking state change to OnlinePartition for partitions [TheOne,5] (kafka.controller.PartitionStateMachine)
    [PreferredReplicaPartitionLeaderSelector]: Current leader 1 for partition [TheOne,5] is not the preferred replica. Trigerring preferred replica leader election (kafka.controller.PreferredReplicaPartitionLeaderSelector)
    [Controller 1]: Partition [TheOne,5] completed preferred replica leader election. New leader is 3 (kafka.controller.KafkaController)

leader & ISR又恢复了初始状态,达到了均衡。

PRE的选举操作由preferredReplicaPartitionLeaderSelector完成,状态为online ==> online

思考:动态扩展broker如何实现?动态扩展broker能否实现分区的均衡扩展呢?

partition reblance

  • 分区重平衡的间隔上面提到过了,默认为5分钟。
  • 触发重平衡由失衡率(imbalanceRatio)决定:
    $$ ratio = Sum(partitonsNotLeaded)/Sum(partitonsShouldLeaded)$$
    当ratio大于leader.imbalance.per.broker.percentage(默认10%)时会触发重平衡

    DEBUG: topics not in preferred replica Map() (kafka.controller.KafkaController)
    TRACE: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController)
    DEBUG: topics not in preferred replica Map() (kafka.controller.KafkaController)
    TRACE: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController)
    DEBUG: topics not in preferred replica Map([TheOne,4] -> List(3, 2), [TheOne,5] -> List(3, 1),...) (kafka.controller.KafkaController)
    TRACE: leader imbalance ratio for broker 3 is 0.951613 (kafka.controller.KafkaController)
  • 重平衡的过程主要是执行PRE

Replica state machine

与partition相比多了3种状态

create topic

新建topic涉及到partition以及replica状态的改变:

  • NoExistent ==> New
    Invoking state change to NewPartition for partitions [TT,6],...(kafka.controller.PartitionStateMachine)
    Invoking state change to NewReplica for replicas [Topic=TT,Partition=6,Replica=2],[Topic=TT,Partition=6,Replica=3],... (kafka.controller.ReplicaStateMachine)

在新建topic后,ZK就会为该topic分配replicas

  • New ==> Online
    #partition state machine
    Invoking state change to OnlinePartition for partitions [TT,6],...
    Live assigned replicas for partition [TT,6] are: [List(2, 3)]
    Initializing leader and isr for partition [TT,6] to (Leader:2,ISR:2,3,LeaderEpoch:0,ControllerEpoch:30)
    #replica state machine
    Invoking state change to OnlineReplica for replicas [Topic=TT,Partition=6,Replica=2],[Topic=TT,Partition=6,Replica=3],...

ReplicaDeletionStarted

该状态一般在进行手动分区分配时产生的,以parition reassign实验试验中的分区1为例:replicas:[3,1] ==> [2,3]

  • broker2成为NewReplica:Invoking state change to NewReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
  • broker2进入OnlineReplica:Invoking state change to OnlineReplica for replicas [Topic=testReassign,Partition=1,Replica=2]
  • broker1成为OfflineReplica,从ISR中移除:

    Invoking state change to OfflineReplica for replicas [Topic=testReassign,Partition=1,Replica=1]
    Removing replica 1 from ISR 3,1 for partition [testReassign,1]
  • broker1进入ReplicaDeletionStarted状态:Invoking state change to ReplicaDeletionStarted for replicas [Topic=testReassign,Partition=1,Replica=1]

  • broker1进入ReplicaDeletionSuccessful :nvoking state change to ReplicaDeletionSuccessful for replicas [Topic=testReassign,Partition=1,Replica=1]
  • broker1进入NonExistentReplica:Invoking state change to NonExistentReplica for replicas [Topic=testReassign,Partition=1,Replica=1]

ZKListener 详解

下面对Controller中使用到的一些监听ZK节点及子节点变化的listener进行梳理

listener 监听的节点 用处 处理逻辑
BrokerChangeListener /brokers/ids 子节点 在ReplicaStateMachine中使用 BCL
ISRChangeNotificationListener /isr_change_notification 子节点 Controller用于监听各个partition的ISR变化 ICNL
PartitionModificationsListener /brokers/topics/topic_name 监听对应topic分区的变化(只允许增加) PML
LeaderChangeListener /controller 监听集群leader的变化 注册监听

Broker Change Listener

/brokers/ids目录下有多个以broker_id命名的子节点,每个子节点上存储着与broker相关的信息,如:

{"jmx_port":9999,"timestamp":"1493886226618","endpoints":["PLAINTEXT://ip:9092"],"host":"ip","version":3,"port":9092}

当这些子节点的内容发生变化时,controller感知到后会统计出变化的情况:

  • broker2 shutdown:Newly added brokers: , deleted brokers: 2, all live brokers: 1,3
  • broker2 startup: Newly added brokers: 2, deleted brokers: , all live brokers: 1,2,3

delete broker

shutdownBroker

该方法与KafkaServer中的shutdown方法的区别在于前者用于处理ControlledShutdown请求,将分区的职责转移出去,而后者是关闭brokerServer的一堆服务。

需要处理的分区为:
$ List(partition) = { partition | id ∈ Replicas(partition) && replicaFactor(partition) > 1 } $

  1. 对于作为leader的分区,执行leader的重选举(ControlledShutdownLS)
  2. 对于作为普通replica的分区:
    • 关闭ReplicaRequest,如broker3是TheOne-1的replica,在关闭broker3时controller会向3发送关闭replica请求的请求:The stop replica request (delete = false) sent to broker 3 is [Topic=TheOne,Partition=1,Replica=3](kafka.controller.ControllerBrokerRequestBatch)
      • broker3收到StopReplica请求后调用ReplicaManager的stopReplicas移除了相关的replica的Fetcher:[ReplicaFetcherManager on broker 3] Removed fetcher for partitions TheOne-1
    • 将broker3的Replica状态置为OfflineReplica

onBrokerFailure

步骤如下:

  1. 在shutdownBroker的处理的分区中有一项要求是:replicaFactor > 1,而对于replicaFactor = 1的分区来说,如果其leader所在的broker关闭了,那么该分区的状态将置为OfflinePartition。筛选的依据是其leader是否为关闭的broker(其他类型的分区在上面的流程中要么leader重新选举过了,要么只是改变ISR)
  2. 尝试使用OfflinePartitionLS将offline和new类型的partition转化为Online
  3. 集体性的将该broker的replica的状态置为OfflineReplica

    感觉重复操作了,因为shutdownBroker阶段也有这操作。不同点在于shutdownBroker只处理replica>1分区的replica状态。

  4. 如果第一步操作未执行,那么将向其他broker发送一个UpdateMetadataRequest

    onBrokerStartup

    主要处理的依然是分区、replica、ISR以及leader的转化:

  5. 发送UpdateMetadataRequest(具体原因不明)

  6. 将该broker的replica状态置为OnlineReplica
  7. 尝试使用OfflinePartitionLS将offline和new类型的partition转化为Online,这一步的主要目的是为了将onBrokerFailue阶段呗置为OffLinePartition的分区恢复成Online:[OfflinePartitionLeaderSelector]: No broker in ISR is alive for [jjhtest,1]. Pick the leader from the alive assigned replicas: 3 [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [jjhtest,1]. Elect leader 3 from live brokers 3. There is potential data loss. [OfflinePartitionLeaderSelector]: Selected new leader and ISR {"leader":3,"leader_epoch":23,"isr":[3]} for offline partition [jjhtest,1]
  8. 判断是否需要执行分区重分配的操作

ISR Change Notification Listener

集群正常运行时/isr_change_notification节点下面是没有子节点的,当ISR发生变化的时候,如重启某个broker,那些ISR原来包含该broker的分区需要调整ISR,因此会创建这些partition的子节点。

下面是broker3挂掉后,TheOne这个topic的ISR:

在broker3恢复后,ISNL感知到了某些partition的ISR发生了变化,进行下面3步操作:

  1. 根据子节点的内容更新对应parition的leader以及ISR cache (从ZK上获取)
  2. 向集群所有节点发送MetadataRequest:DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(1, 2, 3) for TopicAndPartitions:Set([TheOne, 2], [TheOne, 4],...)
  3. 删除子节点

Partition Modifications Listener

Contorller为每个topic都设置了一个PartitionModificationsListener,用于监听新增分区的操作

下面是将TheOne的分区数增加3,PML所感知到的信息:

[AddPartitionsListener on 1]: Partition modification trigered {"version":1,"partitions":{"8":[3,2],"4":[3,2],"5":[3,1],"6":[1,3],"1":[1,3],"0":[1,2],"2":[2,3],"7":[2,1],"3":[2,1]}} for path /brokers/topics/TheOne
[AddPartitionsListener on 1]: New partitions to be added Map([TheOne,7] -> List(2, 1), [TheOne,6] -> List(1, 3), [TheOne,8] -> List(3, 2))

新增了Partition,ZK已经为这些partition分配了replicas,在ZK上每个topic的分区上存储着的信息如:{"controller_epoch":30,"leader":3,"version":1,"leader_epoch":0,"isr":[3,2]}

拿到这些消息后,Controller要做的第一件事就是将新增的分区的replicas信息添加到partitionReplicaAssignment中,接下来才是处理这些新建的分区的状态、leader等,这个操作与create topic的处理如出一辙,区别在于前者是对新增的分区进行处理,后者是对这个topic的所有分区进行处理

ISR消息管理

Posted on 2017-04-24 | Visitors:

[TOC]

疑问

  • leader是在何时更新Highwater的?
  • kafka-manager上出现Lag为负值是什么原因造成的?
  • Log中的消息被删除时,ISR之间是如何协调的?

下面所有的讨论都是基于一个包含3个broker的kafka集群而言的

Replica

leader vs followers

如果将所有的topic的replicas设置为2(_consumer_offsets除外),那么对于每个partition而言其Log存在于两个broker上,其中一个作为leader,另一个作为followers

下图是一个包含3个分区的topic的replicas以及leader分布情况:

leader与followers的职责

之前在kafka-consumer剖析的文章中介绍过high watermark(简称HW)以及log end offset(简称LEO)的概念。作为leader,其主要职责是:

  • 将消息及offset写入Local log & offset
  • 在followers完全备份了消息后(leader应该是会收到通知),更新HW
  • leader并不需要维护LEO的值,因为在Log中有一个nextOffsetMetadata属性就是每个Log维护的最新offset的信息

反观followers,其职责则体现在与leader的交互上:

  • 备份消息,然后通知leader
  • 更新LEO的值

创建Replica

因为每个partition都有对应的replicas,所以创建replica的操作是在Partition中执行的,具体的方法是getOrCreateReplica:

if (isReplicaLocal(replicaId)) {
//创建Log文件
val config = LogConfig.fromProps(...)
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
//获取checkPoint文件
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
} else {
val remoteReplica = new Replica(replicaId, this, time)
addReplicaIfNotExists(remoteReplica)
}
getReplica(replicaId).get

其中:

  • getReplica与addReplicaIfNotExists都对assignedReplicaMap进行操作
  • checkpoint里维护着该log.dir下面所有topic-parition与offset的对应关系
  • log.dir是Log文件所在目录,log.dir.getParentFile就是server.properties中定义的log.dirs

ReplicaManager

创建Replica的调用路径是ReplicaManager.becomeLeaderOrFollower->makeFollowers。ReplicaManager是broker范畴的,管理着broker上面所有的partition

OffsetCheckPoint

在多磁盘的环境中,log.dirs会定义在多个磁盘上,这样就能将Partiton分布在不同的磁盘上。

每个磁盘中的Log目录下都维护着3个OffsetCheckPoint文件:
: recovery-point-offset-checkpoint
: replication-offset-checkpoint:维护parition与offset信息
: cleaner-offset-checkpoint

replication-offset-checkpoint的内容如下:

0
21
__consumer_offsets 16 0
__consumer_offsets 49 124
admin.benchmark 9 24564634
test 13 16709
theOne 18 0
__consumer_offsets 4 6
...

其中:

  • 第一行是CurrentVersion的值,默认是0
  • 第二行是当前Log目录下面分区的数量
  • 后面紧跟着的21行是分区以及对应的offset信息,格式为:$topic $partition $offset

在ReplicaManager中会将所有log目录下面的replication-offset-checkpoint组成一个名为highWatermarkCheckpoints的Map:

val highWatermarkCheckpoints = 
config.logDirs.map(dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir,ReplicaManager.HighWatermarkFilename)))).toMap

其更新操作是由一个定时任务控制的,每隔replica.high.watermark.checkpoint.interval.ms(默认5s)的时间会将所有log目录下的raplica的HW值写入到对应的replication-offset-checkpoint文件中。该定时任务是在becomeLeaderOrFollower方法中被开启的。

appendMessages

在Log分析曾提到过appendMessages方法,该方法内先将消息写入local(即Leader的Log),然后判断ack是否=-1来决定是否需要创建延迟的producer请求(不立即反馈,需要等待备份完成,由delayedProducePurgatory进行管理)。该请求是从leader发往followers的,内容包含:

  • delayedMs:即producer配置的request.timeout
  • requiredOffset: Leader在写入消息后的InextOffsetMetadata值(用LogAppendInfo.lastOffset + 1表示)

在ack = -1的策略下,leader必须在所有的followers都将消息备份的情况下,才会向producer发送反馈,而判断消息是否已完整备份的方法是Partition.checkEnoughReplicasReachOffset:

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val curInSyncReplicas = inSyncReplicas
//对ISR中所有replicas的LEO校验是否已跟上leader的步伐(包括leader)
def numAcks = curInSyncReplicas.count { r =>
if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) {
true
}
else
false
else
true
}

val minIsr = leaderReplica.log.get.config.minInSyncReplicas

if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION)
}
}

第一个返回参数表示是否有足够多的replicas达到了requiredOffset

  • 如果HW < requiredOffset,表明leader还未收到足够多的replicas的response
  • HW >= requiredOffset表明HW已经更新过了,此时如果ISR的数量小于misISR,那么就报NOT_ENOUGH_REPLICAS_AFTER_APPEND错
  • 如果当前ReplicaManger发现local不再是leader了,说明这个broker出现问题了,报NOT_LEADER_FOR_PARTITION错

Increment HW

HW是由leader维护的,其更新时机主要有两种:

  1. ISR发生变化,包括:
  • 新的broker成为leader
  • shrink ISR
  • expand ISR
  1. Replicas已备份完新的消息,判别的方法有两种:
    1. ISR中Replicas中的最小LEO值大于HW
    2. HW的LogOffsetMetadata存在一个老的log segment中,表明上一次Producer请求已处理完成(即消息已经在完成备份)

ShrinkIsr

becomeLeaderOrFollower

ReplcaFetcher

启动过程ReplicaManger相关日志记录

以broker3启动日志为例:
在Log加载、处理完后,首先是unblock操作:

[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 producer requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2017-04-25 13:35:57,270] DEBUG [Replica Manager on Broker 3]: Request key __consumer_offsets-25 unblocked 0 fetch requests. (kafka.server.ReplicaManager)

应该是解封在关闭阶段被阻塞的一些request

下面开启ReplicaFetcherThread:

[2017-04-25 13:35:57,367] INFO [ReplicaFetcherThread-0-1], Starting  (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,384] INFO [ReplicaFetcherThread-3-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,384] INFO [ReplicaFetcherThread-1-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,398] INFO [ReplicaFetcherThread-2-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,406] INFO [ReplicaFetcherThread-1-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,414] INFO [ReplicaFetcherThread-0-2], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,423] INFO [ReplicaFetcherThread-2-1], Starting (kafka.server.ReplicaFetcherThread)
[2017-04-25 13:35:57,435] INFO [ReplicaFetcherThread-3-2], Starting (kafka.server.ReplicaFetcherThread)

数字编号的含义是什么呢??

接下来是添加Fetcher请求,向Remote Broker请求那些本地不是leader的分区的数据。摘取片段如下:

INFO [ReplicaFetcherManager on broker 3] Added fetcher for partitions List([topic1-8, initOffset 6538 to broker BrokerEndPoint(1,10.45.4.9,9092)] ,[topic2-9, initOffset 0 to broker BrokerEndPoint(1,10.45.4.9,9092)]...)

ReplicaFetcherThread

每个ReplicaManager不仅要维护一个producer的炼狱(delayedProducePurgatory),还维护了一个fetcher的炼狱(delayedFetchPurgatory)。fetch这个操作不仅仅是由consumer触发的,followers备份消息也是通过向leader fetch实现的。

ReplicaFetcherManager

  • ReplicaFetcherManager管理着ReplicaFetcherThread的创建和关闭工作。
  • 该manager的命名格式为:“ReplicaFetcherManager on broker $ID”

thread

  • fetcher的数量由num.replica.fetchers控制:
    (31 * topic.hashCode() + partitionId) % numFetchers,当该参数设置为4,那么 $fetcherId∈[0,1,2,3]$,对于3节点replicas=2的集群,每个broker需要$4*2=8$个fetcherThread:
    val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
    BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

上面的代码中将partition-broker这个map按照broker->fetcherId的对应关系进行分组。

  • fetcherThread的命名方式为:ReplicaFetcherThread-fetcherId-brokerId
  • ReplicaFetcherThread是个定时执行的线程,执行间隔由replica.fetch.backoff.ms控制,默认为1s
  • 每个ReplicaFethcerThread的fetch目标是一个broker上的若干topic-partition

工作流程

processPartitionData

当前:
: broker2作为test-1的follower
: 其LEO值为17314
: leader的HW=17314
: FetchRequest中的fetchOffset = 17314

  • 收到一条消息:

    [ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17314 for partition test-1. Received 45 messages and leader hw 17314
  • 对log执行append操作

    [ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17315 after appending 45 bytes of messages for partition test-1

与appendMessagesToLeader时不相同的是,follower执行append操作不会再进行offset配置操作,用得就是leader传过来的offset

  • 设置follower的HW值
    [ReplicaFetcherThread-3-1], Follower 2 set replica high watermark for partition [test,1] to 17314

1s后再次fetch到消息:

[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17315 for partition test-1. Received 45 messages and leader hw 17315
[ReplicaFetcherThread-3-1], Follower 2 has replica log end offset 17316 after appending 45 bytes of messages for partition test-1
[ReplicaFetcherThread-3-1], Follower 2 set replica high watermark for partition [test,1] to 17315

由上面的日志可以注意到:

  1. LEO的值由17314 => 17315,这是因为前面执行的Log.append操作使得nextOffsetMetadata的值+1
  2. leader的HW值变成了17315,这是由于leader检测到了followers(就是broker2)LEO的最小值已经变成了17315,所以更新了HW值,具体实现在Increment HW中有介绍

handleOffsetOutOfRange

当ReplicaFether的fetchOffset不在leader的offset之内(即大于最大值或者小于最小值),那么就会收到OffsetOutOfRangeException

假设当前test-1这个partition的leader为1,新增broker2作为follower

broker2在catch up的过程中broker1挂了,broker2被选为leader。(需要unclean.leader.election.enable配置为1,才能允许这样的脏选举)

情况一、Replica fetchOffset > leader LEO


broker1在恢复后,成为了follower,向leader发送ReplicaFetcherRequest,其中fetchOffset=7,该值大于leader的LEO,所以需要将broker1上该分区的Log执行truncate操作使得LEO值与leader保持一致。

值得注意的是,这样的truncate操作后,follower与leader的消息并不完全一致的,上例中,offset为3和4的消息就是不一致的

情况二、Replica fetchOffset < leader start offset

broker1在恢复之前,broker2添加了很多消息,并且也删除了一个消息,导致其最小的offset大于broker1中的LEO,这种情况下,broker1中的所有消息都没有意义了(因为ISR中的leader不再维护这些消息),所以就删除掉所有的segment,然后fetch leader的第一条消息

Kafka_0.10.1.1版本对于收到OffsetOutOfRangeException时follower的LEO处于leader开始与LEO之间的情况没有对策。

SocketServer工作原理

Posted on 2017-04-01 | Visitors:

kafka的socket server是基于java NIO,使用Reactor模式开发的。socketserver主要用于处理kafka server对外提交网络请求的操作,用于检查连接数,把请求添加到请求的队列中,对KafkaApis提供操作支持.

其线程模型为:

  • 一个Acceptor线程接受/处理所有的新连接
  • N个Processor线程,每个Processor都有自己的selector,从每个连接中读取请求。数量由num.network.threads控制,默认为3
  • M个Handler线程处理请求,并将产生的请求返回给Processor线程用于写回客户端。数量由queued.max.requests控制,默认为500

Acceptor

每个broker只有一个acceptor进程每个endPoint(由server.properties中的listeners定义)对应一个Acceptor,采用轮询的方式来处理新的连接,步骤如下:

1. 新建ServerSocketChannel

使用endPoint的ip:portstrong text作为channel监听的socket地址:

val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
serverChannel.socket.bind(socketAddress)

recvBufferSize的大小由socket.receive.buffer.bytes控制,默认是100KB,目前生产环境上配置的是1MB

2. selector绑定channel,开始工作

  • 首先注册一个OP_ACCEPT事件
    serverChannel.register(selector, SelectionKey.OP_ACCEPT)

  • 等待客户端的连接
    val ready = nioSelector.select(500)

  • 采用Round Robin的方式将连接分配给processor进行处理

    if(ready > 0) {
    val keys = selector.selectedKeys()
    val iter = keys.iterator()
    while(iter.hasNext && isRunning) {
    var key: SelectionKey = null
    key = iter.next
    iter.remove()
    if(key.isAcceptable)
    accept(key, processors(currentProcessor))

    // round robin to the next processor thread
    currentProcessor = (currentProcessor + 1) % processors.length
    }
    }
  • 接受客户端的连接,并将获得的SocketChannel交给processor

    def accept(key: SelectionKey, processor: Processor) {
    //根据SelectionKey获取对应的serverSocketChannel
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    //调用accept方法建立与客户端的连接
    val socketChannel = serverSocketChannel.accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    processor.accept(socketChannel)
    }

sendBufferSize由socket.send.buffer.bytes控制

至此,Acceptor的工作就完成了,接下来处理下一个连接请求。

Processor

Processor有两个重要的概念:

  • newConnections:Processor在新收到一个从Acceptor转发过来的SocketChannel时先存放到该链表中,在运行时从链表中拿出一个Channel与客户端的ID(这个ID由本地ip:port+远程ip:port构造)进行绑定,并注册OP_READ
  • inflightResponses:用于存放正在发送或者等待发送的服务器响应报文,如果成功发送了就移除掉

processor的运行模式跟客户端的NetworkClient相似

while(isRunning){
configureNewConnections()
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
}

其中Selector的poll方法是跟客户端共用的。

processCompletedReceives

private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
val openChannel = selector.channel(receive.source)
val session = {
val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
}
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
}
}
  • completedReceives中存放的是客户端的请求(NetworkReceive)
  • receive.source指的是客户端ID
  • selector的mute操作会将取消对应channel的OP_READ注册,这一点与客户端不同,client在与server建立连接后,channel上的OP_READ状态会一直保持着,因为client无法预知server的数据会在什么时候到来。而server则是在建立连接后,注册OP_READ,在收到client的请求后取消读。
  • server根据收到的请求,组建了一个响应的包,然后放到了requestChannel中。

processCompletedSends

private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination)
}
}
  • completedSends存放的是已经成功发送的响应,因此需要将其从inflightResponses中移除
  • selector.unmute方法能将channel注册OP_READ

那么问题来了:放到requestChannel中的request和response是如何被处理的呢?

我们先来看下requestChannel:
每个服务端只存在一个RequestChannel,用于连接Processor与Handler,他定义了两个阻塞队列:

  • requestQueue:缓存服务端发送的request,大小为queued.max.requests
  • responseQueues:每个processor都有一个队列

KafkaRequestHandler

SocketServer在初始化阶段会创建一个Handler的线程池:

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

KafkaRequestHandler的数量由num.io.threads控制,默认为8。

val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
}

他的run方法也非常简单,就是从RequestChannel中获取request,然后调用KafkaApis对请求进行处理:

req = requestChannel.receiveRequest(300)
apis.handle(req)

KafkaApis在处理完成后,会调用sendResponse方法。

ChannelBuilders

SocketServer中的selector初始化过程:

private val selector = new KSelector(
maxRequestSize,
connectionsMaxIdleMs,
metrics,
time,
"socket-server",
metricTags,
false,
ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))

其中ChannelBuilders是根据安全策略来创建KafkaChannel的

create channel using protocole

首先来看下第一个参数:protocol,该参数是由socketServer初始化时创建Processor的时候根据EndPoint中声明的protocolType决定的:

endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads

for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol)

如这个listeners=PLAINTEXT://10.45.4.9:9093,SASL_PLAINTEXT://10.45.4.9:9094定义了两个EndPoint:第一个endpoint的protocol是PLAINTEXT即明文无安全验证,第二个是使用SASL_PLAINTEXT安全验证的。每个endpoint都会创建一定数量(由num.network.threads控制)的processor。

下面再来看看创建Channel的主要逻辑:

switch (securityProtocol) {
case SSL:
...
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
if (mode == Mode.CLIENT && clientSaslMechanism == null)
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder();
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}

这里主要关心的是SASL_PLAINTEXT以及PLAINTEXT的处理。

当security.protocol=SASL_PLAINTEXT时Client端创建的Channel中的saslMechanism不能为空。可以看到上面socketServer创建Channel时传过来的saslMechanism字段为空。我们再来看看producer/consumer创建Channel时的代码:

String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);

客户端sasl.mechanism默认值为GSSAPI。KafkaChannel构造函数包含四个属性:

private final String id;
private final TransportLayer transportLayer;
private final Authenticator authenticator;
private final int maxReceiveSize;

当security.protocol=SASL_PLAINTEXT || PLAINTEXT时,用的都是PlaintextTransportLayer,该类中包含一个Principal:ANONYMOUS,如果kafka开启了权限控制,那么当不安全(security.protocol=PLAINTEXT)的client连接不安全的endpoint时需要为ANONYMOUS用户分配访问权限。
Channel的验证工具有两种对应于client和server

Note left of SaslClient: SEND_HANDSHAKE_REQUEST
Note right of SaslServer: HANDSHAKE_REQUEST
SaslClient->SaslServer: SaslHandshakeRequest (ApiVersion=*,correlationId=*,clientMechanism=*)
Note left of SaslClient: RECEIVE_HANDSHAKE_RESPONSE
Note left of SaslServer: check clientMechanism is enabled
SaslServer->SaslClient: SaslHandshakeResponse
Note right of SaslClient: check error code in response
Note left of SaslClient: FAILED or INITAL
Note right of SaslServer: AUTHENICATE
12
fbZhu

fbZhu

人为什么越长大越孤单? 答:内心中有秘密,无法诉说

16 posts
7 tags
© 2018 fbZhu
Theme — NexT.Mist v5.1.4