接上篇:《Kafka消费者加入group流程(上)》
发送加入组请求(Rebalance流程)
消费者首次加入group也可以认为是Rebalance的一种,其中包含了两类请求:JoinGroup 和 SyncGroup 请求。我们先看一下两次请求的流程:
当组内成员加入group时,它会向协调者发送一个JoinGroup请求。请求中会将自己要订阅的Topic 上报,这样协调者就可以收集到所有成员的订阅信息。收集完订阅信息之后,通常情况下,第一个发送JoinGroup 请求的成员将会自动称为Leader。这里面的Leader 和 分区的Leader 副本不是一个概念,这里面的Leader 是消费者group 的 Leader,它将会负责具体的分区分配方案制定。下面我们看一下源代码的实现:
/**
* 确保Group是active,并且加入该group
* 向GroupCoordinator发送JoinGroup、SyncGroup请求,并获取分配的主题分区
*/
boolean ensureActiveGroup(final Timer timer) {
// 确保 GroupCoordinator 已经连接,防止之前建立的连接断开
if (!ensureCoordinatorReady(timer)) {
return false;
}
// 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳)
startHeartbeatThreadIfNeeded();
// 发送 JoinGroup 请求,并对返回的信息进行处理
return joinGroupIfNeeded(timer);
}
JoinGroup 的请求发送是在 joinGroupIfNeeded() 中实现的:
/**
* 发送 JoinGroup + SyncGroup 请求
*/
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
/** 触发 onJoinPrepare, 包括 offset commit 和 rebalance listener */
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
/** 初始化 JoinGroup 请求,并发送该请求 */
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
/** 到这一步,时间上SyncGroup 已经成功了 */
if (future.succeeded()) {
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
// 重置 joinFuture 为空
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
下面我们看下 initiateJoinGroup() 的实现:
/**
* 发送JoinGroup请求,并添加listener
*/
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
/** rebalance 期间,心跳线程停止 */
disableHeartbeatThread();
/** 标记为 rebalance */
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest(); /** 发送 JoinGroup 请求 */
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
/** 标记 Consumer 为 stable */
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
/** 标记 Consumer 为 Unjoined */
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
下面我们看一下发送请求的方法 sendJoinGroupRequest():
/**
* 发送 JoinGroup 请求并返回分配结果(在 JoinGroupResponseHandler中实现)
*/
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// 发送JoinGroup请求
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler());
}
/**
* 处理 JoinGroup response 的 handler(同步 group 信息)
*/
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
/** 如果此时 Consumer 的状态不是 rebalacing,就引起异常 */
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName());
/** JoinGroup成功,下面需要进行SyncGroup,获取分配的主题分区 */
if (joinResponse.isLeader()) {
// Leader 将会执行分配方案,并发送SyncGroup请求
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + "to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, joinResponse.data().memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
下面我们看一下 SyncGroup 的请求发送流程:
/**
* 当consumer为follower时,发送SyncGroup获取分配结果
*/
private RequestFuture<ByteBuffer> onJoinFollower() {
// 发送空消息的SyncGroup
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
/**
* 当consumer为leader时,对group下的所有实例进行分配,将assign的结果通过SyncGroup请求发送到GroupCoordinator
*/
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
/** 进行 assign 操作 */
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));
}
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
);
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
/** 发送 sync-group 请求 */
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
/**
* 发送 SyncGroup 请求,获取对 partition 分配的安排
*/
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
}
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
} else {
// join的标志位设置为true
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// group正在rebalance,任务失败
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup failed: {}", error.message());
resetGeneration();
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
log.debug("SyncGroup failed: {}", error.message());
markCoordinatorUnknown();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
}
消费者Rebalance的几种场景
我们先看一下Rebalance触发的几个提交件: 1、组成员数量发生变化;2、订阅主题数量发生变化;3、订阅主题的分区数发生变化。对于一个运行中的应用,上面3 中场景中,第一种场景触发Rebalance的可能性比较大。下面我们看一下Rebalance的各种场景:
新成员入组
组成员主动离组
组成员崩溃离组
Rebalance时组内成员需要提交offset
参考:《Apache Kafka 源码剖析》、《Kafka技术内幕》、《极客时间:Kafka核心技术与实战》、http://matt33.com/2017/10/22/consumer-join-group/