Metadata详解

探究org.apache.kafka.clients中Metadata的数据结构、更新及获取原理

ref

Metadata是为producer及consumer服务的,在producer和consumer创建的时候都会创metadata:

this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

成员变量

  • refreshBackoffMs:metadata的最短刷新间隔,避免频繁的Poll操作,默认100ms
  • metadataExpireMs:在下次更新前,metadata保存的最长时间,默认5min
  • version:每次更新一次metadata都会将version值增加1
  • lastRefreshMs:上次更新时间,也记录更新失败时间
  • lastSuccessfulRefreshMs:上次成功更新时间
  • cluster:记录集群中节点、partitionInfo、topic之间的关系,只是个子集,因为Metadata初始化的时候,cluster也是初始化为空的
  • listeners:自定义的接口,用于监听Metadata更新,在ConsumerCoordinator有使用到
  • needMetadataForAllTopics:是否需要cluster中所有topic的metadata,默认为false

方法

1. timeToNextUpdate

该方法用于计算下一次刷新metadata的的时间间隔(ms)

public synchronized long timeToNextUpdate(long nowMs) {
//距离时效还有多长时间
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
//距离下一次允许刷新的时间
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

Metadata的使用是线程安全的。

2. awaitUpdate

public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
...
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
/*循环等待version值超过当前version值,时间耗尽就退出*/
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}

3. update

public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
//如果有人监听了metadata的更新,通知他们
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);

//新的cluster覆盖旧的cluster
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

notifyAll(); //通知所有的阻塞的线程(调用了wait的线程)
}

producer获取metadata

时效图如下: