kafka的socket server是基于java NIO,使用
Reactor
模式开发的。socketserver主要用于处理kafka server对外提交网络请求的操作,用于检查连接数,把请求添加到请求的队列中,对KafkaApis提供操作支持.
其线程模型为:
- 一个Acceptor线程接受/处理所有的新连接
- N个Processor线程,每个Processor都有自己的selector,从每个连接中读取请求。数量由
num.network.threads
控制,默认为3 - M个Handler线程处理请求,并将产生的请求返回给Processor线程用于写回客户端。数量由
queued.max.requests
控制,默认为500
Acceptor
每个broker只有一个acceptor进程每个endPoint
(由server.properties中的listeners
定义)对应一个Acceptor,采用轮询的方式来处理新的连接,步骤如下:
1. 新建ServerSocketChannel
使用endPoint的ip:port
strong text作为channel监听的socket地址:val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
serverChannel.socket.bind(socketAddress)
recvBufferSize的大小由socket.receive.buffer.bytes
控制,默认是100KB,目前生产环境上配置的是1MB
2. selector绑定channel,开始工作
首先注册一个OP_ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
等待客户端的连接
val ready = nioSelector.select(500)
采用Round Robin的方式将连接分配给processor进行处理
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
}
}接受客户端的连接,并将获得的SocketChannel交给processor
def accept(key: SelectionKey, processor: Processor) {
//根据SelectionKey获取对应的serverSocketChannel
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
//调用accept方法建立与客户端的连接
val socketChannel = serverSocketChannel.accept()
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)
}
sendBufferSize由socket.send.buffer.bytes
控制
至此,Acceptor的工作就完成了,接下来处理下一个连接请求。
Processor
Processor有两个重要的概念:
- newConnections:Processor在新收到一个从Acceptor转发过来的SocketChannel时先存放到该链表中,在运行时从链表中拿出一个Channel与客户端的ID(这个ID由本地ip:port+远程ip:port构造)进行绑定,并注册
OP_READ
- inflightResponses:用于存放正在发送或者等待发送的服务器响应报文,如果成功发送了就移除掉
processor的运行模式跟客户端的NetworkClient相似
while(isRunning){
configureNewConnections()
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
}
其中Selector的poll方法是跟客户端共用的。
processCompletedReceives
private def processCompletedReceives() { |
completedReceives
中存放的是客户端的请求(NetworkReceive)- receive.source指的是客户端ID
- selector的mute操作会将取消对应channel的
OP_READ
注册,这一点与客户端不同,client在与server建立连接后,channel上的OP_READ
状态会一直保持着,因为client无法预知server的数据会在什么时候到来。而server则是在建立连接后,注册OP_READ
,在收到client的请求后取消读。 - server根据收到的请求,组建了一个响应的包,然后放到了
requestChannel
中。
processCompletedSends
private def processCompletedSends() { |
completedSends
存放的是已经成功发送的响应,因此需要将其从inflightResponses中移除- selector.unmute方法能将channel注册
OP_READ
那么问题来了:放到requestChannel中的request和response是如何被处理的呢?
我们先来看下requestChannel
:
每个服务端只存在一个RequestChannel,用于连接Processor与Handler,他定义了两个阻塞队列:
- requestQueue:缓存服务端发送的request,大小为
queued.max.requests
- responseQueues:每个processor都有一个队列
KafkaRequestHandler
SocketServer在初始化阶段会创建一个Handler的线程池:requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
KafkaRequestHandler的数量由num.io.threads
控制,默认为8。val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
}
他的run方法也非常简单,就是从RequestChannel中获取request,然后调用KafkaApis对请求进行处理:req = requestChannel.receiveRequest(300)
apis.handle(req)
KafkaApis在处理完成后,会调用sendResponse
方法。
ChannelBuilders
SocketServer中的selector
初始化过程:private val selector = new KSelector(
maxRequestSize,
connectionsMaxIdleMs,
metrics,
time,
"socket-server",
metricTags,
false,
ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))
其中ChannelBuilders是根据安全策略来创建KafkaChannel
的
create channel using protocole
首先来看下第一个参数:protocol
,该参数是由socketServer初始化时创建Processor的时候根据EndPoint
中声明的protocolType决定的:endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol)
如这个listeners=PLAINTEXT://10.45.4.9:9093,SASL_PLAINTEXT://10.45.4.9:9094
定义了两个EndPoint:第一个endpoint的protocol是PLAINTEXT即明文无安全验证,第二个是使用SASL_PLAINTEXT安全验证的。每个endpoint都会创建一定数量(由num.network.threads
控制)的processor。
下面再来看看创建Channel的主要逻辑:switch (securityProtocol) {
case SSL:
...
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
if (mode == Mode.CLIENT && clientSaslMechanism == null)
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder();
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}
这里主要关心的是SASL_PLAINTEXT以及PLAINTEXT的处理。
当security.protocol=SASL_PLAINTEXT时Client端创建的Channel中的saslMechanism不能为空。可以看到上面socketServer创建Channel时传过来的saslMechanism字段为空。我们再来看看producer/consumer创建Channel时的代码:String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);
客户端sasl.mechanism
默认值为GSSAPI。KafkaChannel构造函数包含四个属性:
private final String id; |
当security.protocol=SASL_PLAINTEXT || PLAINTEXT时,用的都是PlaintextTransportLayer
,该类中包含一个Principal:ANONYMOUS
,如果kafka开启了权限控制,那么当不安全(security.protocol=PLAINTEXT)的client连接不安全的endpoint时需要为ANONYMOUS
用户分配访问权限。
Channel的验证工具有两种对应于client和server
Note left of SaslClient: SEND_HANDSHAKE_REQUEST |