天外飞猪的博客


  • Home

  • Tags

  • Archives

producer的回调实现

Posted on 2017-03-07 | Visitors:

为了方便用户感知发送的情况,kafka producer中提供了callback机制,springkafka中实现了ProducerListener

遇到的问题

近期生产环境上由于版本BUG问题导致一段时间内某台broker出现假死的异常状态,在这异常期间内发送到该broker上的消息都失败了。在各个系统中我们实现了一个ProducerListener用于将发送失败的消息写入到Hbase中,方便后续处理。

然而,实际情况是Hbase中(与log中一致,排除写入Hbase的错误)的错误记录不全。

接下来以从上往下的层次顺序解析下Producer callback的运行机制,以及对Exception的处理

ProducerListener

首先看下KafkaTemplate的简化UML图

在doSend方法中为producer的send方法设置回调函数:

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
this.producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
if(KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(...);
}
} else {
if(KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(...,exception);
}
}
}
});
}

如果我们在创建KafkaTemplate时没有设置producerListener属性,那么就会使用默认的:LoggingProducerListener。

KafkaProducer

之前对KafkaProducer的异步发送机制实现进行过分析,有一个sender线程用于协调消息队列及网络通信的工作,在运行过程中可能发生多种异常需要统一进行处理:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
try {
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
this.sender.wakeup();
return result.future;
//API异常会调用callback的onCompletion方法
}catch (ApiException e) {
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
//其他异常直接抛出
} catch (Exception e){
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}

interceptCallback将KafkaTemplate注入的ProducerListener以及KafkaProducer配置的拦截器ProducerInterceptors组合在一起,统一对发送的执行结果进行拦截处理。

通过append方法将拦截器配置到每个RecordBatch中

public FutureRecordMetadata tryAppend( byte[] key, byte[] value, Callback callback) {
...
if (callback != null)
thunks.add(new Thunk(callback, future));
}

Thunk是内部类,将callback于FutureRecordMetadata捆绑在一起。在每个RecordBatch被处理完成的时候(3种情况:1.发送完成;2.失效;3.producer被强制关闭;)会执行绑定的callback方法:

public void done(long baseOffset, long timestamp, RuntimeException exception) {
for (int i = 0; i < this.thunks.size(); i++) {
Thunk thunk = this.thunks.get(i);
if (exception == null) {
RecordMetadata metadata = new RecordMetadata(...);
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
}
}

按理来说,这里已经覆盖了所有的情况,为什么现实使用中会有发送失败未记录下来的情况呢?

Kafka consumer原理剖析

Posted on 2017-03-01 | Visitors:

Blockquote

REF

  • KafkaConsumer API doc
  • Kafka 之 Group 状态变化分析及 Rebalance 过程
  • Kafka conumser redesign since 0.9.0
  • 趣解reblance

consumer的新特性

在 0.9.0.0 之后的 Kafka,出现了几个新变动,一个是在 Server 端增加了 GroupCoordinator 这个角色,另一个较大的变动是将 topic 的 offset 信息由之前存储在 zookeeper 上改为存储到一个特殊的 topic 中(__consumer_offsets)。

__consumer_offsets

consumer_offsets 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 三副本,而具体 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算(其中,NumPartitions 是consumer_offsets 的 partition 数,默认是50个)的。

offset

  • Last Commiteed Offset:consumer上一次commit的位置;
  • Current Position:cosumer当前消费到的位置,last coomitted offset 到current position之间的就是当前正被consumer处理的消息。
  • High Watermark:被成功备份到所有replicas的最新位置,该位置之前的所消息都被认为是安全可消费的。
  • Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;

coordinator机制

kafka server将partiton分配的工作转移到了Client上(Producer中也可以看到),server保留的是group的分配工作,这样的设计是为了方便client使用灵活的partition分配方案。

coordinator in server

server上的Coordinator 负责reblance、Offset提交、心跳,实现主要代码在kafka.coordinator.GroupCoordinator.scala

一个consumer group对应一个coordinator

coordinator 状态机

共有5种状态:

  1. Dead:group中没有成员,并且metadata已被移除,这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
  2. Empty:Group 没有任何成员,如果所有的 offsets 都过期的话就会变成 Dead,一般当 Group 新创建时是这个状态,也有可能这个 Group 仅仅用于 offset commits 并没有任何成员,该状态至响应JoinGroupRequest
  3. Stable:这种状态下,coordinator已经获得了激活的generation,或者目前没有成员,等待第一个joinGroup。该状态还会接受成员的heartbeats。
  4. PreparingRebalance:准备重平衡状态,例如member发生变化
  5. AwaitingSync:所有的joinGroup请求都接受到后,会选举产生一个leader,这个状态就是在等待leader发送partition的分配结果(SyncGroupRequest)。

状态机如下:

coordinator in client

根据KafkaConsumer主要方法pollOnce来跟踪client上的coordinator工作过程

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {

coordinator.ensureCoordinatorReady();

if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();

if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());

long now = time.milliseconds();
client.executeDelayedTasks(now);

Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;

fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}

第一步、投石问路——确保server端有可用的coordinator

public void ensureCoordinatorReady() {
//通过与节点建立连接判断coordinator是否存活
while (coordinatorUnknown()) {
//发送GroupCoordinatorRequest请求
RequestFuture<Void> future = sendGroupCoordinatorRequest();
client.poll(future);

if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
coordinatorDead();
time.sleep(retryBackoffMs);
}

}
}

如果请求有broker响应了,那么将将该节点做为coordinator:

this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());

第二步、确保group是可用的

首先,对group需要reJoin的情况进行梳理:

  • 有consumer离开当前group,client会发送一个LeaveGroupRequest如:
  • 不再订阅某个topic
  • ConsumerCoordinator执行关闭操作
  • 发送SyncGroupRequest后收到的response异常
  • 发送HeartbeatRequest后收到的response异常,包括:REBALANCE_IN_PROGRESS(正在重平衡),ILLEGAL_GENERATION(generation值不合法),UNKNOWN_MEMBER_ID(未知的成员)
    public void ensureActiveGroup() {
    if (!needRejoin()) return;

    //如果设置了auto commit,那么在rebalance之前先提交,再准备reJoin
    if (needsJoinPrepare) {
    onJoinPrepare(generation, memberId);
    needsJoinPrepare = false;
    }

    while (needRejoin()) {
    ensureCoordinatorReady();

    //在reblance执行之前,需要确保所有JoinGroup的请求都被处理掉了,避免频繁的reblance
    if (client.pendingRequestCount(this.coordinator) > 0) {
    client.awaitPendingRequests(this.coordinator);
    continue;
    }

    RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
    future.addListener(new RequestFutureListener<ByteBuffer>() {
    @Override
    public void onSuccess(ByteBuffer value) {
    onJoinComplete(generation, memberId, protocol, value);
    needsJoinPrepare = true;
    heartbeatTask.reset();
    }
    });
    client.poll(future);
    }
    }

JoinGroupRequest中包含的信息有:

  • groupId
  • memberId
  • subscriptions && PartitionAssignor(默认:RangeAssignor)

第三步、处理JoinGroupResponse

这是通过回调函数实现的,具体的是JoinGroupResponseHandler的handle方法:

public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
//记录新的generation
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
//leader与follower区别对待
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
} else if {
...
}
}

server的coordinator在收到joinGroupRequest后,会为每个group组选择一个member任命为leader。

leader在收到response后,会进行partition的分配,并且将分配结果发送给server的coordinator

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
return sendSyncGroupRequest(request);
}

分区分配的逻辑:

protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
//获取分配规则(默认range)
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
//获取订阅的topic
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}

this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());

//对每个订阅的topic进行分区分配
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}

return groupAssignment;
}

follower只需要发送一个不包含分区结果的SyncGroupRequest

private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
memberId, Collections.<String, ByteBuffer>emptyMap());
return sendSyncGroupRequest(request);
}

关闭KafkaListnerContainer时发生了些什么

一共三个操作:

if(this.listenerInvokerFuture != null) {
this.stopInvokerAndCommitManualAcks();
}

try {
this.consumer.unsubscribe();
} catch (WakeupException var8) {
;
}

this.consumer.close();

主要看unsubscribe方法:

this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);

  • 首先取消了所有订阅
  • 然后发送一个LeaveGroupRequest,并且将memberId设为UNKNOWN,needRejoin设为true

当所有member离开时,server的coordinator进入Empty状态

小实验

创建两个ListenerContainer,订阅同一个topic(“test” with partitions=6),并且在同一个group内:

ConcurrentMessageListenerContainer testContainer = new ContainerBuilder()
// .setTopic("kafka.scanAllBackup")
.setTopic("test")
.setGroupId("g1")
.setListenerName("testListener")
.setBeanName("testContainer")
...
ConcurrentMessageListenerContainer secondContainer = new ContainerBuilder()
// .setTopic("kafka.scanAllBackup")
.setTopic("test")
.setGroupId("g1")
.setListenerName("testListener")
.setBeanName("secondContainer")

实验结果

开启第一个Container

此时的generation = 1,分配的分区数为6
开启第二个Container

因为有新的member加入,因此触发了Rebalance,根据Range分配规则,每个consumer获得3个分区

简述下Range Assignor规则:假如topic有N个分区(按number排序),group组内有M个consumer(按字典序排列)订阅,那么就现将分区分成M份,每份N/M个,如果不能整除,就将余数(N%M)分配给前N%M个consumer

关闭第一个Container

有member离开group,再次触发Reblance,第二个container独享6个分区

实验二

在使用kafka的时候有个现象:如果将正在消费的consumer关闭、重启,那么在短时间内他是无法接收到消息的,从日志上看得话就是server coordinator没有为这个consumer分配分区,为了详解这种机制,我将发送JoinGroup的debug信息输出。测试用例同上
开启第一个container

在6分09秒发送了一个joinGroup请求,但是并没有得到反馈。

g1这个group在server中的generation=1,因此新的joinGroup请求进来后,server进入PreparingRebalance状态。

开启第二个Container,关闭第一个container

发送了第二个joinGroup请求,并没有马上收到反馈,在6分29秒关闭了第一个container,经过了96s后,收到了反馈,并且指定leader为client2(也就是当前的consumer),client1是member,成功分配到了3个分区。

在10分06秒的时候,server再次执行了重平衡,client2再次发送了joinGroup请求,马上得到了反馈,并且这次独享6个分区。

产生这次重平衡的原因是:8分05秒server反馈了joinGroup请求,session_timeout设置的是两分钟,在10分05秒的时候,server依然未收到client1的heartbeat,因此触发了重平衡

server.log

[2017-03-02 19:06:03,197] INFO [GroupCoordinator 2]: Stabilized group g1 generation 1 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:06:03,201] INFO [GroupCoordinator 2]: Assignment received from leader for group g1 for generation 1 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:06:10,351] INFO [GroupCoordinator 2]: Preparing to restabilize group g1 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:08:06,504] INFO [GroupCoordinator 2]: Stabilized group g1 generation 2 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:08:06,511] INFO [GroupCoordinator 2]: Assignment received from leader for group g1 for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:06,506] INFO [GroupCoordinator 2]: Preparing to restabilize group g1 with old generation 2 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:06,966] INFO [GroupCoordinator 2]: Stabilized group g1 generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:06,970] INFO [GroupCoordinator 2]: Assignment received from leader for group g1 for generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:51,173] INFO [GroupCoordinator 2]: Preparing to restabilize group g1 with old generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:51,175] INFO [GroupCoordinator 2]: Group g1 generation 3 is dead and removed (kafka.coordinator.GroupCoordinator)
  • 6分03秒,server收到来自leader的syncGroup请求,coordinator进入Stable状态。
  • 随后,非正常关闭container,所有member离开了group,coordinator进入Empty状态 coordinator无法收到members的心跳
  • 6分10秒,client重启,并发送了joinGroup请求,memberId为UNKNOWN,server进入PreparingRebalance状态

疑点: coordinator在收到client1的leaveGroup请求后为啥还会响应其joinGroup请求嘞? coordinator因为没有感知到client1的离开,所以才会长时间等待

Metadata详解

Posted on 2017-02-28 | Visitors:

探究org.apache.kafka.clients中Metadata的数据结构、更新及获取原理

ref

Metadata是为producer及consumer服务的,在producer和consumer创建的时候都会创metadata:

this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

成员变量

  • refreshBackoffMs:metadata的最短刷新间隔,避免频繁的Poll操作,默认100ms
  • metadataExpireMs:在下次更新前,metadata保存的最长时间,默认5min
  • version:每次更新一次metadata都会将version值增加1
  • lastRefreshMs:上次更新时间,也记录更新失败时间
  • lastSuccessfulRefreshMs:上次成功更新时间
  • cluster:记录集群中节点、partitionInfo、topic之间的关系,只是个子集,因为Metadata初始化的时候,cluster也是初始化为空的
  • listeners:自定义的接口,用于监听Metadata更新,在ConsumerCoordinator有使用到
  • needMetadataForAllTopics:是否需要cluster中所有topic的metadata,默认为false

方法

1. timeToNextUpdate

该方法用于计算下一次刷新metadata的的时间间隔(ms)

public synchronized long timeToNextUpdate(long nowMs) {
//距离时效还有多长时间
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
//距离下一次允许刷新的时间
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

Metadata的使用是线程安全的。

2. awaitUpdate

public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
...
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
/*循环等待version值超过当前version值,时间耗尽就退出*/
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}

3. update

public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
//如果有人监听了metadata的更新,通知他们
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);

//新的cluster覆盖旧的cluster
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

notifyAll(); //通知所有的阻塞的线程(调用了wait的线程)
}

producer获取metadata

时效图如下:

kafka producer原理梳理

Posted on 2017-02-28 | Visitors:

从Kafka 0.8.2开始,发布了一套新的Java版的client api,KafkaProducer/KafkaConsumer,替代之前的scala版的api。

REF

Kafka: Producer (0.10.0.0)
Kafka源码分析 Producer客户端(超详细)

重要配置项说明

  • linger.ms:这是在send的过程中人为设置的一个delay,目的是将尽可能多的消息放到一个batch中,这样能减少发送的次数;
  • max.block.ms:如果buffer满了或者无法获取到metadata,那么将阻塞KafkaProducer.send()与partitionFor()这两个方法,为了防止无限等待,这里设置了一个最长等待时间,默认为60s,超出这个时间的消息就不会被发送了;
  • request.timeout.ms:连接超时时间,设置为30s;

send

下面是KafkaProducer执行send操作的时序图:

采用的是一种Non-block的工作方式,提供一个callback,调用send后,可以继续发送消息而不用等待。当有结果返回时,callback会被自动通知执行。

public Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);

// 序列化key和value
byte[] serializedKey= keySerializer.serialize(record.topic(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());

// 根据key选择Partition
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
TopicPartition tp = new TopicPartition(record.topic(), partition);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);

// 在每次追加一条消息到收集器之后,都要判断是否满了.如果满了,就执行一次Sender操作,通知Sender将这批数据发送到Kafka
if (result.batchIsFull || result.newBatchCreated) this.sender.wakeup();
return result.future;
}

RecordAccumulator

kafka消息的异步发送是依靠消息缓存实现的,RecordAccumulator扮演的就是这个角色,它创建了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>>类型的缓存结构,每条消息会被放到对应的双端队列中。

append

public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//用于记录当前等待处理的batch数量
appendsInProgress.incrementAndGet();
try {
// 有可用的dq就试着将消息入队
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}

// 无法放到之前的dq,那么就分配空间然后创建dq
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
  • 为了实现并发,对每个队列的操作都放在同步块中操作
  • 新创建了一个RecordBatch,或者当前的RecordBatch的buffer被填满了,那么就可以发车 (唤醒Sender)了

ready

找出每个topic-partition对应的leader,并判断发送条件是否成熟

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();

for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
if (leader == null) {
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
...
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}

drain

在ready环节已经找出所有的ready nodes,那么接下来就将batchs中的RecordBatch按照node进行分组,方便后续的发送

Request && Response

下面要探究的是如何将RecordBatch转换成server端能理解的请求,以及收到server的响应后如何进行处理。

  • 首先,将发往相同broker、topic-partition的RecordBatch组成一个ProduceRequest
  • 根据ProduceRequest进而组成一个用于发送的RequestSend
  • 在handleProduceResponse方法中实现了对ClientResponse的处理,在这里会对batch做done操作,细节在producer回调中有阐述。
  • 根据上面的处理方法实现了回调方法,进而完成了ClientRequest的组建工作
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
RequestSend send = new RequestSend(Integer.toString(destination),
this.client.nextRequestHeader(ApiKeys.PRODUCE),
request.toStruct());
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};

return new ClientRequest(now, acks != 0, send, callback);


回调函数是在NetworkClient层被调用的,具体的是client.poll()中:

for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}

NetworkClient

NetworkClient是kafka客户端实现网络连接、数据读写的。是通过Java NIO实现的,并且实现了自己的Selector与KafkaChannel

连接

在Sender的run方法中创建ClientRequest之前需要先筛选出可连接的节点,这里调用的是NetworkClient的ready方法:

if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);

然后调用Selector的connect方法:

  • 开启一个SocketChannel

    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);
  • 将创建的Channel注册在nioSelector上,监听事件为:OP_CONNECT

  • 创建KafkaChannel,并与brokerId绑定
    KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    key.attach(channel);
    this.channels.put(id, channel);

因为实现的是非阻塞模式的,因此connect方法在连接成功建立之前就会返回的。为了确保连接成功,需要调用KafkaChannel的finishConnect方法

public boolean finishConnect() throws IOException {
boolean connected = socketChannel.finishConnect();
if (connected)
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}

在连接成功后就注册了OP_READ。为什么不在发送请求的时候注册嘞,因为一来响应什么时候到我们是无法预料的,二来有些请求并不要求响应。

发送

在组建完ClientRequest后,要调用Selector的send操作,被发送的主体是RequestSend:

public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
channel.setSend(send);
}

private KafkaChannel channelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
}

需要注意的是:

  • 每个KafkaChannel上只有一个RequestSend,无法加塞,必须在之前的被发送掉后才能上车
  • setSend的操作会注册OP_WRITE,然后在发送完成后,会取消掉OP_WRITE
//设置发送的主体
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

//send完成后取消OP_WRITE
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

return send.completed();
}

poll轮询

上面的setSend、write、send的操作如何组织起来呢,这就是poll的工作了。

简单来说就是nioSelector执行一个select操作,然后遍历SelectionKey,如果有我们感兴趣的操作,就对相应的Channel进行read、write之类的操作。

遍历获取有事件发生的Channel

Iterator<SelectionKey> iterator = selectionKeys.iterator();
//遍历获取每个事件上的Channel
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
if (channel.finishConnect()) {
this.connected.add(channel.id());
} else
continue;
}

read from KafkaChannel

if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
  • key.isReadable()只有在Channel监听Read事件,并且Channel有数据写入时才成立
  • 在完成读操作后,会将NetworkReceive添加到队列:completedReceives中

    write to KafkaChannel

    if (channel.ready() && key.isWritable()) {
    Send send = channel.write();
    if (send != null) {
    this.completedSends.add(send);
    this.sensors.recordBytesSent(channel.id(), send.size());
    }
    }

不同于read操作时使用了while循环来确保读取完毕,write操作是一次性的,如果发送完毕,就去除OP_WRITE事件,并且将Send添加到队列completedSends中;否则就将不取消,然后在后面的循环中继续执行写操作,直至写完为止

NetworkClient.poll

下面把目光拉回到NetworkClient中,来看看执行完selector的poll操作之后,是如何进一步处理的。

ClientRequest在被NetworkClient层执行发送操作的时候,会被添加到
inFlightRequests中,这个名字很形象——正在飞行中的请求,指代正在发送或者等待响应的请求的集合。该集合中将ClientRequest按照目的节点进行分组,相同目的节点的保存在同一个双端队列中。

handleCompletedSends

for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}

对于不要求响应的请求,将发送完成的RequestSend对应的ClientRequest从inFlightRequests中移除,直接组建一个body为空的response。

handleCompletedReceives

for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}

根据接收到的源地址,移除inFlightRequests中对应的ClientRequest,并且根据收到的内容组建response

producer消息发送过程

《深入理解JVM》读书笔记

Posted on 2017-02-24 | Visitors:

《Understanding the JVM》这本书拿到手后刷了一遍,没过多久就忘得差不多了,为了加深印象和记录所想于此篇笔记当中

参考文档

java虚拟机规范 SE7
Memory management in the Java HotSpot Virtual Machine 白皮书的中文翻译


显式内存管理容易引发的两个问题:

  1. 引用挂起:将A对象引用的内存释放掉并分配给B对象,当A再次访问引用的内存时,结果无法预测
  2. 内存泄漏:以单向链表为例,在释放一个链表所占用的内存时如果在处理表头时系统出现了异常,那么整个链表的元素都无法被引用。

    第二章 java内存区域与内存溢出异常

    运行时数据区

    区域名称 | 用处 | 访问性 | 生命周期 |抛出的异常
    —|—|—|—|—|
    PC寄存器 | 指令字节码行号指示器 | 线程私有 | 与线程相同 | 无
    java虚拟机栈(也可以称作java栈) | 存储栈帧(包括局部变量表、动态链接、方法出口)|私有|有线程相同|StackOverflowError:线程请求分配空间过大; OutOfmemoryError:栈动态扩展时无法申请到足够空间
    本地方法栈(也可以称作”C Stacks”)|为执行native方法服务|私有|线程分配|同上
    java堆(也可叫做”GC堆”)|存放所有的对象实例以及数组,垃圾收集器管理的主要区域|共享|虚拟机启动时创建|OOM:所需堆超过了系统能提供的最大值
    方法区(hotspot使用永久代实现方法区)|存放每一个类的结构信息,包括常量池、静态变量、构造函数和不同方法的字节码内容|共享|虚拟机启动时创建|OOM:无法满足内存分配需求
  • 直接内存虽然不是JVM定义的运行时内存,但是在NIO中会使用到
  • 对象在内存中分配到空间后虚拟机会将这块空间初始化为零值,这样保证了对象的属性在java代码中可以不赋初始值就可以直接使用。
  • 粗粒度来说,一台机器上单个线程可分配到的内存(指代虚拟机栈+本地方法栈) = nG(可用该内存上限) - maxHeapSize - maxPermSize(最大方法区容量)。考虑到内存资源紧张的多线程场景,如果想要获取更多的线程的话就需要通过减少堆以及栈空间(-Xss)容量

    本地线程分配缓冲(Thread Local Allocation Buffer),在创建对象时,虚拟机会为该对象分配内存空间,确切的说是从java堆中分配空间,而这一操作在多线程并发的情况下是非线程安全的,除了对内存分配动作进行同步的方案外,还有一种方法就是为每个线程在堆上预分配一块内存(TLAB),这块堆内存空间就暂时变成线程私有的了,如此就将内存分配的动作分配到每个线程私有的空间上进行了,进而避免了并发的问题。

HotSpot代的划分

在HotSpot中,内存被分为3代:年轻代、老年代和永久代(即方法区)。

  • 对象初始分配在年轻代(大对象除外)
  • 对象在年轻代经历几次年轻代的垃圾回收(young generation collections)后升至老年代

年轻代包含3个区域:一个Eden区和两个Survivor区

  • 大部分对象初始分配在Eden区
  • Survivor中保存至少经过一次Young GC的对象
  • 如果To是空的,那么From会一直保留这些对象

    第三章 GC与内存分配策略

    方法论

    判断对象是否可回收的方法

  1. 引用计数算法
    该算法为每个对象添加一个引用计数器,引用了就+1,引用失效了就-1,变为0的对象就可以回收了。
    优点:实现简单,效率高;
    缺点:无法解决相互引用的问题
  2. 可达性分析算法(GC Roots)

    以HotSpot实现的算法为主

  • 该算法从GC Roots出发,向下搜索,遍历的路径为引用链,能遍历到的节点代表对象可达,不必回收;无法遍历到的对象不可达,可回收。
  • 可作为GC Roots的包括全局性的引用(如常量和类静态属性)和执行上下文(如栈帧中的本地变量表)
  • 数据一致性问题:GC检查好比集合点名,需要对象停止当前的活动,否则在检查期间出现引用变化,那么久导致检查结果不准确了。为了解决这个问题,引入了安全点和安全区域的概念

GC算法

算法 大致过程 特点 适用区域
标记-清除算法 标记出需要回收的对象;GC时回收对象内存区域 会产生大量不连续的内存碎片 基础算法
复制算法 将内存分成(一般情况)8:1:1的空间,新生代使用8+1区域,GC时将存活对象移到剩下的1空间内,原区域完全回收 不会产生内存碎片 存活率低的新生代
标记-整理算法 标记、清除后将内存空间整理一下,保证有序 适用于回收频率低的老年代

HotSpot的垃圾收集器

收集器 说明 算法 适用环境
Serial 收集期间停止一切线程,单线程、停顿时间长、专心收集 复制算法 单CPU的Client、新生代
ParNew Serial的多线程版本,能与CMS配合使用 复制算法 Server模式下的首选新生代收集器
Parallel Scavenge 关注点为吞吐率,即提高CPU的使用率 复制 吞吐量优先的新生代收集器
Serial Old Serial的老年代版本 标记-整理 单线程Client老年代收集器
Parallel Old Parallel Scavenge的老年代版本 标记-整理 吞吐量优先的老年代收集器
CMS 以获取最短回收停顿时间为目标、或产生空间碎片,CPU敏感 标记-清除 老年代收集器
G1 将内存分区域,计算每个区域的回收价值,高者优先收集 标记-整理 +复值 面向服务器端的收集器
  • Minor GC:新生代被填满时,GC暂停应用程序回收新生代空间的操作。Eden空间的对象要么被转移、要么被回收,转移的目的地是另一块Survior空间或者老年代
  • Full GC:老年代被填满时,暂停所有应用线程,回收不再使用的对象,整理空间。(CMS和G1的老年代回收算法通过并发等手段减少停顿的时间,尽量达到在应用线程运行的同时进行垃圾回收)

内存分配与回收策略

####1. 对象优先分配在Eden区
如果Eden区无法存放新的对象,那么将触发一次Minor GC,采用复制算法将Eden区的对象转移到Survivor区或者老年代(tenured),看下面的例子:

/**
*
* VM: -XX:+UseSerialGC -Xms40M -Xmx40M - Xmn20M -XX:+PrintGCDetails -XX:SurvivorRatio=8
*/
public class TestInvokeMinorGC {
private static final int _1MB = 1024 * 1024;

public static void main(String[] args) {
byte[] a1 = new byte[1 * _1MB];
byte[] a2 = new byte[12 * _1MB];
byte[] a3 = new byte[4 * _1MB];
}
}

这里的JVM参数限定了固定堆大小为40M,其中新生代20M,Eden与survior的比例为8:1,即eden=16M,survior0=2M,survior1=2M。
首先分配到的a1以及a2对象都能存储在eden区,在分配a3时触发了GC,将a1转移到了s0区,将a2转移到了老年代(分配担保机制)

下面的GC日志中也能看出几个区所占的空间:

####2. 大对象直接进入老年代

大对象指需要大量连续内存空间的对象,典型的是长字符串或数组

可以通过-XX:PretenureSizeThreshold来界定 大 的下限,该参数只对serial和ParNew两类收集器有效。

####3. 长期存活的对象进入老年代
对象如果进入Survior区那么他的年龄+1,当他的年龄足够大的时候就会进入老年代。看看下面的例子:

/**
*
* VM: -XX:+UseSerialGC -Xms40M -Xmx40M - Xmn20M -XX:+PrintGCDetails -XX:SurvivorRatio=8
* -XX:MaxTenuringThreshold=1
*/
public class TestInvokeMinorGC {
private static final int _1MB = 1024 * 1024;

public static void main(String[] args) {
byte[] a1 = new byte[_1MB / 2];
byte[] a2 = new byte[15 * _1MB]; // 第一次GC
byte[] a3 = new byte[2 * _1MB]; // 第二次GC
}
}

JVM参数与上例基本一致,只是定义了进入老年代的年龄下限为1.
在这个例子中,创建a2时触发了一次GC,a1进入S0区,创建a3时再次触发GC:

  • a2由于分配担保机制进入了老年代
  • a1由于年龄=1,也进入了老年代


从GC日志也可以发现只有a3留在了eden区,a1,a2都进入了老年代。

####4. 动态对象年龄判断
survior升迁到老年代并不仅仅依靠年龄判断,当survior的使用空间超过了一半,无论年龄是否达标,都会进入老年代。

承接上面的例子,将a1设置为1MB的对象,并且MaxTenuringThreshold设置为10,结果如下:

在survior中年龄为1的对象也进入了老年代。

第六章 类文件结构

JVM之所以被称为平台、语言无关性的平台,究其原因在于它处理的是字节码(Class类文件),这个字节码是由java或者scala之类的语言编译而来的,因此语言无关;字节码如果解析成0、1在不同的操作系统+体系结构中执行依赖于不同的JVM类型,因此是平台无关的。

第七章 类加载机制

类加载的过程主要包含3步:

  1. 加载:
    根据类的全限定名获取字节流,将其所代表的静态存储结构转化为方法区的运行时数据结构,最后在内存中生成一个代表该类的Class对象
  2. 连接:
    • 验证:对Class文件的字节流进行验证,包含:
      • 文件格式
      • 元数据
      • 字节码
      • 符号引用
    • 准备:为类变量(被static修饰的变量)分配内存并设置初始值
    • 解析
  3. 初始化
    该阶段就是执行类构造器<client>()方法的过程,类构造器有以下特点:
  • 由类变量赋值语句以及static代码块合并而成,如果这些都没有的话就不会产生类构造器
  • <client>()与类的构造函数(也称为实例构造器<init>()方法)不同,不需要显式调用父类构造器。父类的<client>()先执行。
  • 多线程环境下初始化一个类时,只会有一个线程执行类的<client>()方法。

第八章 字节码执行引擎

栈帧

栈帧是支持虚拟机进行方法调用和方法执行的数据结构,包含:

  1. 局部变量表:用于存放方法参数以及方法内部的局部变量
    • 该表中的第0号索引指向的是方法所属对象实例的引用,可以通过this来访问这个隐含的参数
    • 局部变量表中的空间是可重用的,如果执行到的位置已经超出某个局部变量的作用域,那么他所在的空间很可能会被其他变量重用。
    • <<effecive Java>>一书中也给出一个编码规则:应该及时清除废弃的引用。
  2. 操作栈
  3. 动态链接
  4. 返回地址

方法调用

  • Class文件的编译过程不包含连接过程,因此Class文件中方法调用的目标方法都是符号引用,只有在运行时才能得到直接引用(存在内存中的方法入口地址)。
  • 可以在不同的阶段获取目标方法的直接引用,包括:类加载的解析阶段以及运行阶段

解析

能够在解析阶段确定方法直接引用具备以下特点:

  • 在编译器可知,运行期不变
  • 静态方法、私有方法以及final修饰的方法在编译期都能确定他们的直接引用,因为这些方法不可能通过继承或别的方式重写。这种类型的方法统称为非虚方法

重载与静态分派

如下定义中:

Fruit apple = new Apple();  //Fruit是Apple的父类

  • Fruit是变量的静态类型,这个是在编译期可知的;
  • Apple是变量的实际类型,这个只有在运行时才可确定;
  • 在重载时,编译器根据传入参数的静态类型决定使用哪个重载版本。
  • 依赖静态类型来决定方法的执行版本的分派方式称为静态分派:
    • 静态分派发生在编译阶段
    • 在选择重载版本时有一定的优先级,如传入参数类型为char,那么在选择执行版本的时候的优先级为:char > int > long > double > Character > Object > char... 查找父类时按照从下往上的方式

重写与动态分派

public class DynamicDispatch {
static abstract class Human {
protected abstract void say();
}
static class Man extends Human {
@Override
protected void say() {
System.out.println("I'm a Man");
}
}

static class Woman extends Human {
@Override
protected void say() {
System.out.println("I'm an Woman");
}
}

public static void main(String[] args) {
Human woman = new Woman();
Human man = new Man();
man.say(); // I'm a Man
woman.say(); //I'm an Woman
}
}

观察上面main函数对应的字节码:

其中#6对应常量池的引用依然是Human.say:

#6 = Methodref          #12.#33        //DynamicDispatch$Human.say:()V

但是执行结果却是定位到Man.say(),这是如何做到的呢?

在执行man.say()时:

  • 获取man变量对应的实际类型C
  • 调用invokevirtual指令,该指令根据实际类型将方法调用的符号引用解析到了确定类型C的直接引用上:
    • 如果在C中能找到对应的方法,并且有权限访问,那么久直接返回这个方法的直接引用
    • 否则,按照继承关系从下往上查找父类来确定方法的直接引用。

以上就是重写的本质,在运行期间根据实际类型确定方法执行版本的分配过程称为动态分派

内存模型与线程

为了与硬件中的主内存、高速缓存、处理器的交互关系一致,java内存模型也定义了:主内存,工作内存以及线程

  • 所有的变量都存储在主内存中(包括实例字段、静态字段和构成数组对象的元素,不包括局部变量以及方法参数。后者存放在线程的私有栈中不存在共享的问题)
  • 每个线程都有自己的工作内存,工作内存中存放着主内存中变量的副本拷贝
  • 线程对变量的读写操作只能在工作内存中进行,换而言之,线程操作只是主内存中变量的一个影分身

为了实现主内存与工作内存之间的交互,Java内存模型定义了8种原子性的操作,其中:

  • 从内存中复制变量到工作内存需要执行:read -> load
  • 将工作内存中的变量同步回主内存:store -> write
    以上两步只是顺序执行,中间可允许穿插其他指令,这就是共享变量在多线程运行下可能出现结果不确定性的根源。

volatile变量与可见性

可见性:一个线程修改了共享变量的值,其他线程能够立即得知这个修改。

volatile是一个特殊变量类型,它通过以下的规则提供了弱同步机制:

  • 在使用一个变量之前,必须执行load操作。确保使用变量时能看到主内存中最新的值
  • 对变量执行了赋值操作后,必须连续执行store以及write操作。确保每次修改了变量的值立刻同步回主内存

volatile通过以上的机制保证了变量的可见性,其通常用作某个操作完成、发生中断或者状态的标记,下面是典型的例子:

volatile boolean shouldShutDown;

public void shutDown() {
shouldShutDown = true;
}

while (shouldShutDown) {
...
}

在多线程情况下,任意的线程调用shutDown方法都能正常关闭。这是因为对shutDown的赋值不依赖于当前值,而volatile的可见性确保该变量变为true时其他线程都能得知变化。

之所以说volatile实现的是弱同步,在于其未实现原子性,多线程读写volatile变量的结果是未知的。

原子性与CAS

synchronized代码块之间的操作是具有原子性的,除此以外,java.util.concurrent.atomic包提供了诸如AtomicInteger之类具有原子性操作的类,这些类的原子性并不是通过加锁实现的,而是通过CAS(Compare And Swap)实现的,在JDK 1.5之前,AtomicInteger的incrementAndGet()方法的实现是:

while(true) {
next = current + 1;
if (compareAndSet(current, next)) {
return next;
}
}

假如有两个线程同时调用了同一个AtomicInteger对象的incrementAndGet方法,那么无论如何结果都是++2。

Jmock探究

Posted on 2017-02-24 | Visitors:

参考资料:

官方文档:

  1. 详细说明文档
  2. quick start
  3. API doc

背景

今天再设计跟单拆分工程的单元测试时遇到了这么一个问题。

单元测试的目标方法是:BillTraceRuleHelper.operateBillTraceRule,该方法用于将跟单规则请求 BillTraceRuleRequest 转化为跟单规则 BillTraceRule,该方法中会有一个检查请求有效性的方法:checkRuleRequest,该方法要求请求中需要包含详尽的参数,为了 偷懒 使用了jmock的mockUp方法对上面的方法进行mock,使其始终返回true代码如下:

private void mockCheckRuleRequest(){
new MockUp<BillTraceRuleHelper>(){
@Mock
public ClientMessage checkRuleRequest(BillTraceRuleRequest req){
return new ClientMessage();
}
};
}

由此,产生了两个问题:

问题一、业务执行结果不符合预期

使用mock的测试

@Test
public void testOperateBillTraceRule(){
mockCheckRuleRequest();
BillTraceRuleRequest request = new BillTraceRuleRequest();
request.setOperateType(OperateType.CREATE);
request.setRuleKey("fakeRuleKey");
request.setRuleType("fakeRuleType");
request.setRuleSubscribeSite("白牛一部");
billTraceRuleService. //新建规则
operateBillTraceRule(Collections.singletonList(request));

request.setOperateType(OperateType.UPDATE);
request.setRuleSubscribeSite("黑牛二部");
billTraceRuleService. //更新规则
operateBillTraceRule(Collections.singletonList(request));

request.setOperateType(OperateType.DELETE);
billTraceRuleService. //删除规则
operateBillTraceRule(Collections.singletonList(request));
}

预期结果是update操作能修改站点信息,delete操作删除的信息中包含更改后的黑牛二部,然而实际结果却不是这样的。

以下是截取的部分update SQL语句:

set RULE_SUBSCRIBE_SITE ='黑牛二部'
where RULE_KEY = 'fakeRuleKey'
and CUSTOMER_DATA_SOURCE = NULL
and RULE_TYPE = 'fakeRuleType'

这句update并未成功,gt_bill_trace_rule未发生变化。

不使用Mock的测试

相比较使用mock的情况,本次测试的区别在于添加了以下两个参数

request.setCusstomerDataSource("天网");
request.setRspMessageType("自定义反馈类型");

这样测试的结果就达到了预期。

为了找出造成这种差别的原因,我在PL/SQL中执行第一个测试的SQL脚本,发现

select * from gt_bill_trace_rule t 
where t.rule_key = 'fakeRuleKey'
and customer_data_source = null;

语句结果为空!这表明跟mock机制无关,其实这是一个SQL语法问题

在数据库中如果不填入数据,那么那一项就为null,而null并不是一个字符串,因此我们无法通过 =、!=、<> 等来判断是否为null。null作为一个bool类型,应该用is 或is not来判断是否为null。

问题二、在mock时进行调试无法跳转到被测试的方法中

在operateBillTraceRule()中添加断点,调试时无法进入。

跟踪调用,发现进入的位置为JdkDynamicAopProxy.invoke方法,如果mockUp方法是定义AOP的切面的话,那么进入的位置应该是checkRuleRequest()这个方法,那么为什么operateBillTraceRule()也被动态代理了呢?

通过查阅API doc发现,mockUp方法进行造假的对象是其泛型参数T(即BillTraceRuleHelper),而检查的方法只是指定造假的方法,动态代理的对象依然是这个helper

T - ++specifies the type (class, interface, etc.) to be faked++;
multiple interfaces can be faked by defining a type variable in the test class or test method, and using it as the type argument;
if a type variable is used and it extends a single type, then all implementation classes extending or implementing that base type are also faked;
if the type argument itself is a parameterized type, then only its raw type is considered

MockUp的两种方式

mock-up 的类继承了虚类mockit.MockUp<T>,T是被模拟的类或接口。在运行时,模拟的方法或者构造函数会被拦截和重定向到相应的方法中(代理的机制),执行结束后将结果返回给调用者,一般而言,调用者是测试类,模拟类是一个依赖。

模拟类通过会在Junit中被定义成一个static、内部类或者匿名类。

如果定义完模拟类后,修改了实类某个被模拟方法的名字,将会导致IllegalArgumentExecption

java.lang.IllegalArgumentException: Matching real methods not found for the following mocks:

匿名类

I have a base class named TheBaseClass:

public class TheBaseClass {
public void doSomething(int i){
System.out.println(i);
}

public void doSomethingElse(){
System.out.println("do something else...");
}
}

在测试类中,定义了一个内部类,该内部类对doSomething方法进行了模拟

private void mocked(){
new MockUp<TheBaseClass>(){
@Mock
public void doSomething(int i){
System.out.println(++i);
}
};
}

然后在单元测试中使用实类:

@Test
public void testMock(){
mocked();
TheBaseClass bb = new TheBaseClass();
bb.doSomething(1);
bb.doSomethingElse();
}

执行结果为:

2
do something else...

非匿名类

这次,我在测试类中定义了一个内部类MockedBase:

class MockedBase extends MockUp<TheBaseClass>{
@Mock public void doSomething(int i){
System.out.println(--i);
}
}

在单元测试中,我需要新建一个MockedBase(否则无法触发代理):

@Test
public void testMock(){
new MockedBase();
TheBaseClass mb = new TheBaseClass();
mb.doSomething(1);
mb.doSomethingElse();
}
//output:
0
do something else

小结

  • 我们不管将模拟类定义成内部类还是匿名类,其定义的模拟方法会被拦截,没有模拟的方法就还是原生的
  • 如果从层级角度来看的话,每次新建一个模拟类相当于将其中声明的模拟方法置为可调用的最高层,相当于屏蔽了对原生方法。
  • 模拟类需要新建才能生效

思考

如果MockUp的对象是一个接口,而这个接口有一个实现类,那么最终执行的依然是实现类,而不是@Mock的方法。这是为啥嘞?

12
fbZhu

fbZhu

人为什么越长大越孤单? 答:内心中有秘密,无法诉说

16 posts
7 tags
© 2018 fbZhu
Theme — NexT.Mist v5.1.4