Kafka控制器- 副本状态机(ReplicaStateMachine)

状态机一般用在事件处理中,并且事件会有多种状态。当事件发生变化时,会触发对应的事件处理动作。Kafka控制启动状态机时有下面特点:

1、分区状态机和副本状态机需要获取集群中所有分区和副本,因此需要先初始化上下文后,才能启动状态机。

2、分区包含了多个副本,只有当集群中所有的副本初始化好之后,才可以初始化分区状态机。

ReplicaStateMachine 记录着集群所有副本的状态信息,决定者副本处于什么样的状态,以及可以进行什么样的状态流转。

Kafka副本的状态可以有以下7种类型:

1、NewReplica:当分区重分配时,控制器可以创建一个新副本。这种状态下该副本只能作为follower,它可以是 Replica 删除后的一个临时状态,有效前置状态是 NonExistentReplica;

2、OnlineReplica:当副本被分配到指定的Partition上,并且副本完成创建,那么它将会被置为这个状态。在这个状态下,分区既可以作为Leader也可以作为Follower,有效前置状态是 NewReplica、OnlineReplica 或 OfflineReplica;

3、OfflineReplica:如果副本所在的Broker挂掉,副本将会置为这个状态。有效前置状态是 NewReplica、OfflineReplica 或 OnlineReplica;

4、ReplicaDeletionStarted:副本开始删除时被置为的状态,有效前置状态是 OfflineReplica;

5、ReplicaDeletionSuccessful:如果部分在删除时没有错误信息,它将被置为这个状态。表示该副本的数据已经从Broker清除了,有效前置状态是 ReplicaDeletionStarted;

6、ReplicaDeletionIneligible:如果副本删除失败,会转移到这个状态。表示非法删除,也就是删除不成功,有效前置状态是 ReplicaDeletionStarted;

7、NonExistentReplica:如果副本删除成功,将被转移到这个状态。有效前置状态是:ReplicaDeletionSuccessful。

ReplicaStateMachine初始化

副本状态机启动入口如下:

// Controller重新选举后触发
def startup() {
  // 初始化ZK上所有的副本状态信息(副本存活设置为Online,不存活的设置为ReplicaDeletionIneligible)
  initializeReplicaState()
  val (onlineReplicas, offlineReplicas) = controllerContext.onlineAndOfflineReplicas
  // 将存活的副本转化为OnlineReplica
  handleStateChanges(onlineReplicas.toSeq, OnlineReplica)
  // 将不存活的副本转化为OfflineReplica
  handleStateChanges(offlineReplicas.toSeq, OfflineReplica)  
}

上面的启动方法,首先会从ZK中恢复所有副本的状态。然后调用handleStateChanges(),将存活的副本转化为OnlineReplica状态。下面我们先看一下从ZK中恢复所有分区副本的状态:

/**
 * 初始化所有分区副本的状态
 */
private def initializeReplicaState() {
	// 循环所有分区
  controllerContext.allPartitions.foreach { partition =>
    val replicas = controllerContext.partitionReplicaAssignment(partition)
    replicas.foreach { replicaId =>
      val partitionAndReplica = PartitionAndReplica(partition, replicaId)
      // 如果副本存活,将状态设置为OnlineReplica
      if (controllerContext.isReplicaOnline(replicaId, partition)) {
        controllerContext.putReplicaState(partitionAndReplica, OnlineReplica)
      } else {
        // 不存活的副本设置为ReplicaDeletionIneligible,
        controllerContext.putReplicaState(partitionAndReplica, ReplicaDeletionIneligible)
      }
    }
  }
}

后面紧跟着的就是处理副本状态处理,分别对OnlineReplica和OfflineReplica做上线和线下处理。

/**
  * 副本状态机变化处理方法, 对多个副本的状态改变, 以批量请求的方式发送给多个Broker
  */
override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
  if (replicas.nonEmpty) {
    try {
      controllerBrokerRequestBatch.newBatch()
      // 处理状态请求
      replicas.groupBy(_.replica).foreach { case (replicaId, replicas) =>
        doHandleStateChanges(replicaId, replicas, targetState)
      }
      // 向Broker发送响应请求
      controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
    } catch {
      case e: ControllerMovedException =>
        error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
        throw e
      case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
    }
  }
}

副本状态转换

状态转换为 NewReplica

case NewReplica =>
    validReplicas.foreach { replica =>
      val partition = replica.topicPartition
      val currentState = controllerContext.replicaState(replica)

      controllerContext.partitionLeadershipInfo.get(partition) match {
        /** 从ZK获取分区的 leaderAndIsr 信息 */
        case Some(leaderIsrAndControllerEpoch) =>
          if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
            /** NewReplica 状态的副本不能作为分区Leader */
            val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
            logFailedStateChange(replica, currentState, OfflineReplica, exception)
          } else {
            /** 向replicaId的副本发送 LeaderAndIsr请求,并同时向所有Broker发送UpdateMetadata请求 */
            controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
              replica.topicPartition,
              leaderIsrAndControllerEpoch,
              controllerContext.partitionReplicaAssignment(replica.topicPartition),
              isNew = true)
            logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
            /** 修改ControllerContext中的副本状态 */
            controllerContext.putReplicaState(replica, NewReplica)
          }
        case None =>
          /** 如副本没有LeaderAndIsr信息,则等待分区Leader选举完成 */
          logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
          controllerContext.putReplicaState(replica, NewReplica)
      }
    }

简述一下上面的流程如下:

1、校验副本的前置状态,只有处于 NonExistentReplica 状态的副本才能转移到 NewReplica 状态;

2、从ZK中获取该分区的 LeaderIsrAndControllerEpoch 信息;

3、如果获取不到上述信息,直接将该副本的状态设置为 NewReplica,然后结束流程(新建分区时,副本可能处于这个状态,该分区的所有副本是没有 LeaderAndIsr 信息的);

4、获取到分区的 LeaderIsrAndControllerEpoch 信息,如果发现该分区的 leader 是当前副本,那么就抛出 StateChangeFailedException 异常,因为处在这个状态的副本是不能被选举为 leader 的;

5、获取到了分区的 LeaderIsrAndControllerEpoch 信息,并且分区的 leader 不是当前副本,那么向该分区的所有副本添加一个 LeaderAndIsr 请求(添加 LeaderAndIsr 请求时,同时也会向所有的 Broker 都添加一个 UpdateMetadata 请求);

6、最后将该副本的状态转移成 NewReplica,然后结束流程。

状态转换为 OnlineReplica

OnlineReplica是副本正常工作时的状态,此时的副本既可以是 leader 也可以是 follower,转换到这种状态的处理实现如下:

case OnlineReplica =>
    validReplicas.foreach { replica =>
      val partition = replica.topicPartition
      val currentState = controllerContext.replicaState(replica)

      currentState match {
        case NewReplica =>
          /** NewReplica --> OnlineReplica */
          val assignment = controllerContext.partitionReplicaAssignment(partition)
          /** 如副本不在分区副本集合中,添加进集合(正常情况下不会出现) */
          if (!assignment.contains(replicaId)) {
            controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId)
          }
        case _ =>
          /** OnlineReplica | OfflineReplica --> OnlineReplica */
          controllerContext.partitionLeadershipInfo.get(partition) match {
            case Some(leaderIsrAndControllerEpoch) =>
              /** 如果该副本的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求 */
              controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
                replica.topicPartition,
                leaderIsrAndControllerEpoch,
                controllerContext.partitionReplicaAssignment(partition), isNew = false)
            case None =>
              /** 表示分区不是OnlinePartition状态,也就是Broker没有为分区启动log,并且没有分区高水位的值 */
          }
      }
      logSuccessfulTransition(replicaId, partition, currentState, OnlineReplica)
      controllerContext.putReplicaState(replica, OnlineReplica)
    }

从前面的状态转换可以看出,当副本处在 NewReplica、OnlineReplica、OfflineReplica 状态时,是可以转移到 OnlineReplica 状态的。代码中的实现可以分为如下2种情况:

A、NewReplica –> OnlineReplica

1) 从上下文中的 partitionReplicaAssignment 中获取分区的副本列表;

2) 如果副本不在列表中,那么将其添加到分区副本列表中;

3) 将副本的状态变更为 OnlineReplica 状态。

B、OnlineReplica | OfflineReplica –> OnlineReplica

1) 从上下文中的 partitionLeadershipInfo 获取分区的 LeaderAndIsr 信息;

2) 如果该信息存在,那么就向这个副本所在的 broker 添加这个分区的 LeaderAndIsr 请求,并将副本的状态设置为 OnlineReplica;

3) 如果信息不存在,不做任何处理;

4) 更新副本的状态为 OnlineReplica。

状态转换为 OfflineReplica

case OfflineReplica =>
    validReplicas.foreach { replica =>
      /** 发送 StopReplica 请求给该副本,先停止副本同步 */
      controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
    }
    /** 将副本列表拆分为:有LeadershipInfo和无LeadershipInfo两部分 */
    val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
      controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
    }
    /** 有LeadershipInfo的,控制器将副本从ISR中移除 */
    val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
    updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
      if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
        val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
        /** 向该分区其他副本发送 LeaderAndIsr 请求 */
        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
          partition,
          leaderIsrAndControllerEpoch,
          controllerContext.partitionReplicaAssignment(partition), isNew = false)
      }
      val replica = PartitionAndReplica(partition, replicaId)
      val currentState = controllerContext.replicaState(replica)
      logSuccessfulTransition(replicaId, partition, currentState, OfflineReplica)
      controllerContext.putReplicaState(replica, OfflineReplica)
    }
    /** 无LeadershipInfo的,向所有存活Broker发送 UpdateMetadata请求 */
    replicasWithoutLeadershipInfo.foreach { replica =>
      val currentState = controllerContext.replicaState(replica)
      logSuccessfulTransition(replicaId, replica.topicPartition, currentState, OfflineReplica)
      controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
      controllerContext.putReplicaState(replica, OfflineReplica)
    }

1) 校验前置状态,只有副本在 NewReplica、OnlineReplica、OfflineReplica 状态时,才可以转换到这种状态;

2) 向该副本所在Broker发送 StopReplica 请求(deletePartition = false);

3) 将副本列表拆分为:有LeadershipInfo和无LeadershipInfo两部分

4) 有LeadershipInfo的,调用 removeReplicaFromIsr(),将该副本从分区的 isr 移除。然后向该分区其他副本发送 LeaderAndIsr 请求;

5) 无LeadershipInfo的,向所有存活Broker发送 UpdateMetadata请求

6) 更新副本的状态为 OfflineReplica。

状态转换为 ReplicaDeletionStarted

case ReplicaDeletionStarted =>
    validReplicas.foreach { replica =>
      val currentState = controllerContext.replicaState(replica)
      logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
      controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
      /** 发送 StopReplica 请求给该副本,并设置 deletePartition=true */
      controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)
    }

该状态是副本删除过程的开始状态,简述一下上面的逻辑:

1)校验前置状态,副本前置状态只能是 OfflineReplica;

2)更新该副本的状态为 ReplicaDeletionStarted;

3)向该副本发送 StopReplica 请求(deletePartition = true),收到这请求后,broker 会从物理存储上删除这个副本的数据内容;

状态转换为 ReplicaDeletionIneligible

case ReplicaDeletionIneligible =>
  validReplicas.foreach { replica =>
    val currentState = controllerContext.replicaState(replica)
    logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
    controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
  }

该状态是副本删除失败的状态,简述一下上面的逻辑:

1)校验前置状态,副本的前置状态只能是 ReplicaDeletionStarted;

2)更新该副本的状态为 ReplicaDeletionIneligible。

状态转换为 ReplicaDeletionSuccessful

case ReplicaDeletionSuccessful =>
  validReplicas.foreach { replica =>
    val currentState = controllerContext.replicaState(replica)
    logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
    controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
  }

该状态是副本删除成功的状态,简述一下上面的逻辑:

1)检验前置状态,副本的前置状态只能是 ReplicaDeletionStarted;

2)更新该副本的状态为 ReplicaDeletionSuccessful。

状态转换为 ReplicaDeletionIneligible

case NonExistentReplica =>
  validReplicas.foreach { replica =>
    val currentState = controllerContext.replicaState(replica)
    val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition)
    // 从控制器上下文和副本状态机中清除这个副本的信息
    controllerContext.updatePartitionReplicaAssignment(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
    logSuccessfulTransition(replicaId, replica.topicPartition, currentState, NonExistentReplica)
    controllerContext.removeReplicaState(replica)
  }

该状态是副本已经被完全删除,不存在的状态,简述一下上面的逻辑:

1)检验前置状态,副本的前置状态只能是 ReplicaDeletionSuccessful;

2)在控制器的 partitionReplicaAssignment 删除分区对应的副本信息;

3)从控制器上下文和副本状态机中将这个副本删除。


参考:《Kafka技术内幕》、《Apache Kafka 源码剖析》、Kafka源码