前面我们提到了,服务端存储日志的分区单元是日志分段(LogSegment)。同一个日志分段内,所有消息的偏移量都是递增的,因此可以保证整个分区的偏移量都是递增的。
Kafka 服务端节点每个 TopicPartition 都对应一个 Log,每个 Log 有多个 LogSegment,该 Log 管理分区上面的所有 LogSegment。下面看一下 Log 源码中的核心参数实现:
下面我们简单看一下上面简化后的代码片段。
activeSegment 被定义为方法,它会获取 segments 的最后一个元素,作为最新的活动 LogSegment。此时如果有新的 LogSegment 被创建,该方法返回的也是最新的日志分段。
nextOffsetMetadata 被定义为一个变量,它的数据依赖于 activeSegment,比如活动分片的下一个偏移量(nextOffset)、活动分段的基准偏移量(baseOffset)、活动分段的大小(size)。
logEndOffset (日志最新偏移量)表示下一条消息的偏移量,它取自 nextOffsetMetadata 的 nextOffset,实际上是活动日志分段的 下一个偏移量值。
对于 nextOffsetMetadata 对象下面详细介绍一下。
(1)追加消息前,使用 nextOffsetMetadata 的消息偏移量,作为这一批消息的起始偏移量。
(2)如果滚动创建了 LogSegment,当前活动的 LogSegment 会指向新创建的日志分段。
(3)追加消息后,更新 nextOffsetMetadata 的消息偏移量,作为下一批消息的起始偏移量。
日志偏移量元数据
日志偏移量元数据是 Log 的一个重要特征,客户端对消息的读写,都会用到这个数据结构。下面我们看一下这个数据结构产生的核心变量。
上面展示的 3 个变量当中,其中 nextOffsetMetadata 和 logEndOffsetMetadata 这两个变量可以认为是指到的偏移量是相同的位置,但是因为其针对的客户端不同(一个是生产者、一个是消费者),所以区分开来了。
从上面图示可以看出, highWatermark(高水位)会将日志分段分割为“已提交”和“未提交”部分。“已提交”部分对消费者端是可见的,而“未提交”部分则是不可见的,高水位指针指的是 已提交消息+1 的位置。对于 nextOffset 和 logEndOffset 指针则指的是相同的位置,即下一条消息插入的位置。两者的区别是一个是针对于生产者的,另一个是针对于分区备份副本的。
滚动创建日志分段
当为消息分配偏移量之后,消息将会追加到最新的日志分段中。如果当前日志分段放不下新消息,则会采用“滚动”的方式创建一个新的日志分段,并添加其中。
新创建日志分段的基准偏移量取自 logEndOffset,实际上是 nextOffsetMetadata的消息偏移量值(messageOffset),也是当前活动日志分段的下一个偏移量值(nextOffset)。如上图所示:
(1)追加3条消息到日志分段 3,下一个偏移量改为23,基准偏移量20.
(2)追加5条消息到日志分段 3,下一个偏移量改为28,基准偏移量20.
(3)追加2条消息到日志分段 3,下一个偏移量改为30,基准偏移量20.
(4)追加3条消息,此时日志分段 3 已经满了,滚动创建日志分段 4。新日志分段创建后,加入到 segments 后,当前活动的值日分段会指向日志分段 4。下一个偏移量为33,基准偏移量30.
数据文件
LogSegment 包含数据文件和索引文件,基准偏移量是每个日志分段的标识。追加一批消息到日志分段中,每次都会写到对应的数据文件中,同时间隔 indexIntervalBytes 大小才写入一条索引条目到索引文件中。假如一条消息占用 10 字节,每隔 100 字节会写入一个索引条目,也就代表着每 10 条消息才会写入 1 个索引条目。下面看一下 LogSegment.append() 方法的实现摘要。
索引文件
消息存储到日志分段中,只是简单的追加。此时如果要查询消息,则从日志分段的开始遍历,则就比较低效了。此时建立索引文件就非常有必要了。
写入数据文件的每个消息集,都带有绝对偏移量、消息大小和消息内容。Kafka 创建的索引文件具有以下特性:
(1)索引文件映射偏移量到文件的物理位置,不会对每条消息建立索引,所以是稀疏的。
(2)索引条目的偏移量存储的是相对于“基准偏移量”的“相对偏移量”,不是消息的绝对偏移量(可以减少存储开销)。
(3)索引条目的“相对偏移量”和物理位置各自占用4 个字节,也就是 1 个索引占用 8 字节。消息集的“绝对偏移量”占用的是 8 字节,此时可以减少 4 字节的内存开销。
(4)偏移量是有序的,查询指定偏移量时,可以使用二分查找快读定位偏移量的位置。
(5)当“相对偏移量”在索引中不存在时,找出索引中小于偏移量的最大值,然后定位到物理位置,顺序查找目标偏移量的消息。
(6)使用稀疏索引可以将整个索引文件都放入内存,加快偏移量的查询。
简述一下上面的查找过程,当要拉取绝对偏移量 00000028 的消息时,首先定位到索引文件为 00000000.index。然后转化为相对偏移量 28,之后找到小于28 的最大偏移量 20,找到其物理地址为39,然后定位到具体的物理地址。然后顺序遍历,直到找到绝对偏移量 00000028 的消息。
参考:《Kafka技术内幕》、《极客时间:Kafka核心技术与实战》、Kafka源码