SocketServer工作原理

kafka的socket server是基于java NIO,使用Reactor模式开发的。socketserver主要用于处理kafka server对外提交网络请求的操作,用于检查连接数,把请求添加到请求的队列中,对KafkaApis提供操作支持.

其线程模型为:

  • 一个Acceptor线程接受/处理所有的新连接
  • N个Processor线程,每个Processor都有自己的selector,从每个连接中读取请求。数量由num.network.threads控制,默认为3
  • M个Handler线程处理请求,并将产生的请求返回给Processor线程用于写回客户端。数量由queued.max.requests控制,默认为500

Acceptor

每个broker只有一个acceptor进程每个endPoint(由server.properties中的listeners定义)对应一个Acceptor,采用轮询的方式来处理新的连接,步骤如下:

1. 新建ServerSocketChannel

使用endPoint的ip:portstrong text作为channel监听的socket地址:

val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
serverChannel.socket.bind(socketAddress)

recvBufferSize的大小由socket.receive.buffer.bytes控制,默认是100KB,目前生产环境上配置的是1MB

2. selector绑定channel,开始工作

  • 首先注册一个OP_ACCEPT事件
    serverChannel.register(selector, SelectionKey.OP_ACCEPT)

  • 等待客户端的连接
    val ready = nioSelector.select(500)

  • 采用Round Robin的方式将连接分配给processor进行处理

    if(ready > 0) {
    val keys = selector.selectedKeys()
    val iter = keys.iterator()
    while(iter.hasNext && isRunning) {
    var key: SelectionKey = null
    key = iter.next
    iter.remove()
    if(key.isAcceptable)
    accept(key, processors(currentProcessor))

    // round robin to the next processor thread
    currentProcessor = (currentProcessor + 1) % processors.length
    }
    }
  • 接受客户端的连接,并将获得的SocketChannel交给processor

    def accept(key: SelectionKey, processor: Processor) {
    //根据SelectionKey获取对应的serverSocketChannel
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    //调用accept方法建立与客户端的连接
    val socketChannel = serverSocketChannel.accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    processor.accept(socketChannel)
    }

sendBufferSize由socket.send.buffer.bytes控制

至此,Acceptor的工作就完成了,接下来处理下一个连接请求。

Processor

Processor有两个重要的概念:

  • newConnections:Processor在新收到一个从Acceptor转发过来的SocketChannel时先存放到该链表中,在运行时从链表中拿出一个Channel与客户端的ID(这个ID由本地ip:port+远程ip:port构造)进行绑定,并注册OP_READ
  • inflightResponses:用于存放正在发送或者等待发送的服务器响应报文,如果成功发送了就移除掉

processor的运行模式跟客户端的NetworkClient相似

while(isRunning){
configureNewConnections()
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
}

其中Selector的poll方法是跟客户端共用的。

processCompletedReceives

private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
val openChannel = selector.channel(receive.source)
val session = {
val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
}
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
}
}
  • completedReceives中存放的是客户端的请求(NetworkReceive)
  • receive.source指的是客户端ID
  • selector的mute操作会将取消对应channel的OP_READ注册,这一点与客户端不同,client在与server建立连接后,channel上的OP_READ状态会一直保持着,因为client无法预知server的数据会在什么时候到来。而server则是在建立连接后,注册OP_READ,在收到client的请求后取消读。
  • server根据收到的请求,组建了一个响应的包,然后放到了requestChannel中。

processCompletedSends

private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination)
}
}
  • completedSends存放的是已经成功发送的响应,因此需要将其从inflightResponses中移除
  • selector.unmute方法能将channel注册OP_READ

那么问题来了:放到requestChannel中的request和response是如何被处理的呢?

我们先来看下requestChannel
每个服务端只存在一个RequestChannel,用于连接Processor与Handler,他定义了两个阻塞队列:

  • requestQueue:缓存服务端发送的request,大小为queued.max.requests
  • responseQueues:每个processor都有一个队列

KafkaRequestHandler

SocketServer在初始化阶段会创建一个Handler的线程池:

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

KafkaRequestHandler的数量由num.io.threads控制,默认为8。

val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
}

他的run方法也非常简单,就是从RequestChannel中获取request,然后调用KafkaApis对请求进行处理:

req = requestChannel.receiveRequest(300)
apis.handle(req)

KafkaApis在处理完成后,会调用sendResponse方法。

ChannelBuilders

SocketServer中的selector初始化过程:

private val selector = new KSelector(
maxRequestSize,
connectionsMaxIdleMs,
metrics,
time,
"socket-server",
metricTags,
false,
ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))

其中ChannelBuilders是根据安全策略来创建KafkaChannel

create channel using protocole

首先来看下第一个参数:protocol,该参数是由socketServer初始化时创建Processor的时候根据EndPoint中声明的protocolType决定的:

endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads

for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol)

如这个listeners=PLAINTEXT://10.45.4.9:9093,SASL_PLAINTEXT://10.45.4.9:9094定义了两个EndPoint:第一个endpoint的protocol是PLAINTEXT即明文无安全验证,第二个是使用SASL_PLAINTEXT安全验证的。每个endpoint都会创建一定数量(由num.network.threads控制)的processor。

下面再来看看创建Channel的主要逻辑:

switch (securityProtocol) {
case SSL:
...
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
if (mode == Mode.CLIENT && clientSaslMechanism == null)
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder();
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}

这里主要关心的是SASL_PLAINTEXT以及PLAINTEXT的处理。

security.protocol=SASL_PLAINTEXT时Client端创建的Channel中的saslMechanism不能为空。可以看到上面socketServer创建Channel时传过来的saslMechanism字段为空。我们再来看看producer/consumer创建Channel时的代码:

String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);

客户端sasl.mechanism默认值为GSSAPI。KafkaChannel构造函数包含四个属性:

private final String id;
private final TransportLayer transportLayer;
private final Authenticator authenticator;
private final int maxReceiveSize;

当security.protocol=SASL_PLAINTEXT || PLAINTEXT时,用的都是PlaintextTransportLayer,该类中包含一个Principal:ANONYMOUS,如果kafka开启了权限控制,那么当不安全(security.protocol=PLAINTEXT)的client连接不安全的endpoint时需要为ANONYMOUS用户分配访问权限。
Channel的验证工具有两种对应于client和server

Note left of SaslClient: SEND_HANDSHAKE_REQUEST
Note right of SaslServer: HANDSHAKE_REQUEST
SaslClient->SaslServer: SaslHandshakeRequest (ApiVersion=*,correlationId=*,clientMechanism=*)
Note left of SaslClient: RECEIVE_HANDSHAKE_RESPONSE
Note left of SaslServer: check clientMechanism is enabled
SaslServer->SaslClient: SaslHandshakeResponse
Note right of SaslClient: check error code in response
Note left of SaslClient: FAILED or INITAL
Note right of SaslServer: AUTHENICATE