Kafka 的主副本主要负责消息的读写操作,消费者或者备份副本都会向主副本同步数据。客户端读取主副本的过程又叫做“拉取”,拉取消息时肯定会指定拉取的偏移量。比如前面介绍过的,消费者会根据提交偏移量,或者客户端设置的重置策略,从指定位置拉取消息。客户端还会指定拉取的数据量大小(fetchSize),这个值默认是 max.partition.fetch.bytes 配置项,大小为 1MB。
一个分区对应的日志管理了所有的日志分段,日志保存了基准偏移量和日志分段的映射关系。给定一个偏移量,要读取从指定位置开始的消息集,最多读取 fetchSize 字节。因为 fetchSize 默认只有 1M,而日志分段对应的数据文件大小默认有 1GB,所以通常来说服务端读取日志时,没有必要读取所有日志分段,只需要选择其中的一个分段即可。
日志分段只是一个逻辑的概念,它管理了物理概念的一个数据文件和一个索引文件。读取日志分段时,首先要读取索引文件再读取数据文件。下面是Kafka 服务端的日志文件示意图:
从上图可以看出,日志分段的起始偏移量会作为 数据文件和索引文件的文件名称。下面我们再回顾一下,读取日志的流程。
读取日志分段
我们首先看一下 Kafka 服务端,读取日志分段的代码片段。
上面 translateOffset() 方法将起始偏移量(startOffset)转换成LogOffsetPosition 对象,其包含起始偏移量和起始物理位置。物理位置的差值(最大物理位置-起始物理位置)和拉取大小(maxSize)比较取最小值。然后对log 做切片,返回 FileRecords 对象,并最终返回 FetchDataInfo 对象返回。下面我们看一下 FileRecords 对象的生成。
从上面代码中我们也可以看出,在生成 FileRecords 对象时,并没有读取日志中的内容,仅仅只是对分区日志做了一个“切片视图”,如下图所示。
查找索引文件
每个日志分段都有一个索引文件和一个数据文件。给定起始偏移量,先调用索引文件的 lookup() 获取小于目标偏移量的最大偏移量的物理位置。然后再调用 searchForOffsetWithSize(),从指定物理位置开始读取每条消息,直到找到目标位置的消息。
首先我们看一下查询索引文件,最后定位到索引中最接近目标值的偏移量和物理位置。索引文件使用内存映射(mmap)的方式加载到内存中。因为在查询的过程中,可能会有新的索引条目添加到索引文件,导致内存映射发生变化,因此要复制出一个字节缓冲区(idx),然后在 idx 上面查询,不需要和底层索引文件发生磁盘读取操作。
参考:《Kafka技术内幕》、《极客时间:Kafka核心技术与实战》、Kafka源码