状态机一般用在事件处理中,并且事件会有多种状态。当事件发生变化时,会触发对应的事件处理动作。Kafka控制启动状态机时有下面特点:
1、分区状态机和副本状态机需要获取集群中所有分区和副本,因此需要先初始化上下文后,才能启动状态机。
2、分区包含了多个副本,只有当集群中所有的副本初始化好之后,才可以初始化分区状态机。
ReplicaStateMachine 记录着集群所有副本的状态信息,决定者副本处于什么样的状态,以及可以进行什么样的状态流转。
Kafka副本的状态可以有以下7种类型:
1、NewReplica:当分区重分配时,控制器可以创建一个新副本。这种状态下该副本只能作为follower,它可以是 Replica 删除后的一个临时状态,有效前置状态是 NonExistentReplica;
2、OnlineReplica:当副本被分配到指定的Partition上,并且副本完成创建,那么它将会被置为这个状态。在这个状态下,分区既可以作为Leader也可以作为Follower,有效前置状态是 NewReplica、OnlineReplica 或 OfflineReplica;
3、OfflineReplica:如果副本所在的Broker挂掉,副本将会置为这个状态。有效前置状态是 NewReplica、OfflineReplica 或 OnlineReplica;
4、ReplicaDeletionStarted:副本开始删除时被置为的状态,有效前置状态是 OfflineReplica;
5、ReplicaDeletionSuccessful:如果部分在删除时没有错误信息,它将被置为这个状态。表示该副本的数据已经从Broker清除了,有效前置状态是 ReplicaDeletionStarted;
6、ReplicaDeletionIneligible:如果副本删除失败,会转移到这个状态。表示非法删除,也就是删除不成功,有效前置状态是 ReplicaDeletionStarted;
7、NonExistentReplica:如果副本删除成功,将被转移到这个状态。有效前置状态是:ReplicaDeletionSuccessful。
ReplicaStateMachine初始化
副本状态机启动入口如下:
上面的启动方法,首先会从ZK中恢复所有副本的状态。然后调用handleStateChanges(),将存活的副本转化为OnlineReplica状态。下面我们先看一下从ZK中恢复所有分区副本的状态:
后面紧跟着的就是处理副本状态处理,分别对OnlineReplica和OfflineReplica做上线和线下处理。
副本状态转换
状态转换为 NewReplica
简述一下上面的流程如下:
1、校验副本的前置状态,只有处于 NonExistentReplica 状态的副本才能转移到 NewReplica 状态;
2、从ZK中获取该分区的 LeaderIsrAndControllerEpoch 信息;
3、如果获取不到上述信息,直接将该副本的状态设置为 NewReplica,然后结束流程(新建分区时,副本可能处于这个状态,该分区的所有副本是没有 LeaderAndIsr 信息的);
4、获取到分区的 LeaderIsrAndControllerEpoch 信息,如果发现该分区的 leader 是当前副本,那么就抛出 StateChangeFailedException 异常,因为处在这个状态的副本是不能被选举为 leader 的;
5、获取到了分区的 LeaderIsrAndControllerEpoch 信息,并且分区的 leader 不是当前副本,那么向该分区的所有副本添加一个 LeaderAndIsr 请求(添加 LeaderAndIsr 请求时,同时也会向所有的 Broker 都添加一个 UpdateMetadata 请求);
6、最后将该副本的状态转移成 NewReplica,然后结束流程。
状态转换为 OnlineReplica
OnlineReplica是副本正常工作时的状态,此时的副本既可以是 leader 也可以是 follower,转换到这种状态的处理实现如下:
从前面的状态转换可以看出,当副本处在 NewReplica、OnlineReplica、OfflineReplica 状态时,是可以转移到 OnlineReplica 状态的。代码中的实现可以分为如下2种情况:
A、NewReplica –> OnlineReplica
1) 从上下文中的 partitionReplicaAssignment 中获取分区的副本列表;
2) 如果副本不在列表中,那么将其添加到分区副本列表中;
3) 将副本的状态变更为 OnlineReplica 状态。
B、OnlineReplica | OfflineReplica –> OnlineReplica
1) 从上下文中的 partitionLeadershipInfo 获取分区的 LeaderAndIsr 信息;
2) 如果该信息存在,那么就向这个副本所在的 broker 添加这个分区的 LeaderAndIsr 请求,并将副本的状态设置为 OnlineReplica;
3) 如果信息不存在,不做任何处理;
4) 更新副本的状态为 OnlineReplica。
状态转换为 OfflineReplica
1) 校验前置状态,只有副本在 NewReplica、OnlineReplica、OfflineReplica 状态时,才可以转换到这种状态;
2) 向该副本所在Broker发送 StopReplica 请求(deletePartition = false);
3) 将副本列表拆分为:有LeadershipInfo和无LeadershipInfo两部分
4) 有LeadershipInfo的,调用 removeReplicaFromIsr(),将该副本从分区的 isr 移除。然后向该分区其他副本发送 LeaderAndIsr 请求;
5) 无LeadershipInfo的,向所有存活Broker发送 UpdateMetadata请求
6) 更新副本的状态为 OfflineReplica。
状态转换为 ReplicaDeletionStarted
该状态是副本删除过程的开始状态,简述一下上面的逻辑:
1)校验前置状态,副本前置状态只能是 OfflineReplica;
2)更新该副本的状态为 ReplicaDeletionStarted;
3)向该副本发送 StopReplica 请求(deletePartition = true),收到这请求后,broker 会从物理存储上删除这个副本的数据内容;
状态转换为 ReplicaDeletionIneligible
该状态是副本删除失败的状态,简述一下上面的逻辑:
1)校验前置状态,副本的前置状态只能是 ReplicaDeletionStarted;
2)更新该副本的状态为 ReplicaDeletionIneligible。
状态转换为 ReplicaDeletionSuccessful
该状态是副本删除成功的状态,简述一下上面的逻辑:
1)检验前置状态,副本的前置状态只能是 ReplicaDeletionStarted;
2)更新该副本的状态为 ReplicaDeletionSuccessful。
状态转换为 ReplicaDeletionIneligible
该状态是副本已经被完全删除,不存在的状态,简述一下上面的逻辑:
1)检验前置状态,副本的前置状态只能是 ReplicaDeletionSuccessful;
2)在控制器的 partitionReplicaAssignment 删除分区对应的副本信息;
3)从控制器上下文和副本状态机中将这个副本删除。
参考:《Kafka技术内幕》、《Apache Kafka 源码剖析》、Kafka源码