Kafka 的日志管理负责日志的创建、检索、清理,和日志相关的读写操作则交给日志实例去处理。每个 TopicPartition 都对应一个物理层面上的 log 实例,LogManager 使用了logs 管理了分区对应的日志实例。简化后的代码结构如下所示:
Kafka 服务端实例的数据目录配置项(log.dirs)可以设置多个目录。如果 log.dirs = /data/kafka_log/logs1,/data/kafka_log/logs2,表示该服务端实例有 2 个数据目录。如下图所示,每个 TopicPartition 都对应一个文件夹,在其同级目录下面还有一些检查点文件,下图是经过删减之后的目录和文件列表。
Kafka 启动时会创建一个 LogManager 对象,并且执行 loadLogs() 方法加载所有的 log,而每一个 log 也会调用 loadSegments() 加载所有的 LogSegment。由于这个过程比较慢,所以 LogManager 用异步线程池的方式为每一个 Log 的加载都创建了一个单独的线程,相关代码如下。
LogManager 是采用线程池提交任务,但是每个任务本身的执行是阻塞的。也就是只有在 log 加载完成之后,才会被加到 logs 映射表中。如果没有加载完所有日志,loadLogs() 也就不能返回。LogManager 加载完成后,KafkaServer 会调用LogManager.startup() 启动下面 4 个后台的管理线程。前面 2 个可以看做日志刷新策略,后面 2 个可以看做日志清理策略。相关代码如下:
我们下面看一下 Kafka 服务端配置文件(server.properties)对 LogManager 后台线程不同的配置项:
检查点文件
Kafka 服务端实例可以用多个数据目录存储所有分区日志,每个数据目录都有一个全局检查点文件(恢复检查点文件),该文件会存储这个数据目录下所有 log 的检查点信息。检查点表示日志已经刷新到磁盘的位置,其主要用于故障恢复,下面看一下相关代码:
检查点文件在LogManager 和 Log 实例的运行过程中起到了重要的作用,具体步骤如下:
(1)Kafka 启动时创建 LogManager,读取检查点文件,并把每个分区对应的检查点作为对日志的恢复点(recoveryPoint),最后创建分区对应的Log实例。
(2)消息追加到分区对应的日志,在刷新日志时,将最新的偏移量作为日志的检查点。
(3)LogManager 会启动一个定时任务,读取所有日志的检查点,并写入全局的检查点文件。
刷新日志
LogManager 启动时会定时调用 flushDirtyLogs() ,定期将页面缓存中的数据真正刷写到磁盘文件中。日志在未刷写之前,数据保存在操作系统的页面缓存中,这比直接将数据写到磁盘快得多。这种做法同时也意味着:如果数据还没来得及刷写到磁盘上,服务端实例崩溃了,这就会造成数据丢失。前面我们提到,在Kafka 中有两种刷盘策略:
1、时间策略(log.flush.interval.ms): 配置调度周期,默认无限大,也就是默认选择大小策略。
2、大小策略(log.flush.interval.messages): 配置当未刷新的消息数超过该值时,进行刷新。
通过上面的代码可以看出,刷盘只是按照时间策略去刷盘的。那么什么时候是按照大小去判断呢?大家应该能猜到肯定是追加消息的时候。那么我们看一下Log.append() 方法的简化实现:
消息追加到日志中,有下面两种场景会发生刷新日志的动作。
1、新创建一个日志分段,立即刷新旧的日志分段。
2、日志中未刷新的消息数量超过 log.flush.interval.messages 配置项的值。
刷新日志的参数是日志最新偏移量 (logEndOffset),它要和日志中现有的检查点位置 (recoveryPoint) 比较,只有最新偏移量比检查点位置大才需要刷新。因为一个日志包含多个日志分段,在刷新日志时,会刷新从检查点位置到最新偏移量的所有日志分段,最后更新检查点位置。下面我们看一下,创建新的日志分段和达到 log.flush.interval.messages 阈值的两种情况刷盘。
日志清理
为了控制日志的总大小不超过阈值 ( log.retention.bytes ),日志管理器会定时清理旧的日志分段。日志清理有下面 2 种策略:
1、删除:超过日志的阈值,直接物理删除整个日志分段。
2、压缩:不直接删除日志分段,而是采用合并压缩的方式。
不过通常情况下,我们都是通过 log.retention.hours 来配置 Segment 的保存时间。因为不同的 Topic 对应的分区,数据量大小也不一样。因此如果限制的大小,其保存时间不固定,不利于管理和控制消息。下面我们看一下日志清理的相关代码摘要:
从上面可以看出,清除日志的具体实现是在 Log.deleteOldSegments() 方法中完成的,下面具体看一下:
通过上面的代码可以看出,日志的清理工作,会分别从 存活时间、日志大小和不符合起始偏移量 3 个角度去清理。
参考:《Kafka技术内幕》、Kafka源码