探究
org.apache.kafka.clients
中Metadata的数据结构、更新及获取原理
Metadata是为producer及consumer服务的,在producer和consumer创建的时候都会创metadata:this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
成员变量
- refreshBackoffMs:metadata的最短刷新间隔,避免频繁的Poll操作,默认100ms
- metadataExpireMs:在下次更新前,metadata保存的最长时间,默认5min
- version:每次更新一次metadata都会将version值增加1
- lastRefreshMs:上次更新时间,也记录更新失败时间
- lastSuccessfulRefreshMs:上次成功更新时间
- cluster:记录集群中节点、partitionInfo、topic之间的关系,只是个子集,因为Metadata初始化的时候,cluster也是初始化为空的
- listeners:自定义的接口,用于监听Metadata更新,在
ConsumerCoordinator
有使用到 - needMetadataForAllTopics:是否需要cluster中所有topic的metadata,默认为false
方法
1. timeToNextUpdate
该方法用于计算下一次刷新metadata的的时间间隔(ms)public synchronized long timeToNextUpdate(long nowMs) {
//距离时效还有多长时间
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
//距离下一次允许刷新的时间
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
Metadata的使用是线程安全的。
2. awaitUpdate
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { |
3. update
public synchronized void update(Cluster cluster, long now) { |
producer获取metadata
时效图如下: