功能与特性
核心功能
- 消息引擎 - Kafka 可以作为一个消息引擎系统。
- 流处理 - Kafka 可以作为一个分布式流处理平台。
- 存储 - Kafka 可以作为一个安全的分布式存储。
特性
高性能
- 分区、分段、索引:基于分区机制提供并发处理能力。分段、索引提升了数据读写的查询效率。
- 顺序读写:使用顺序读写提升磁盘 IO 性能。
- 零拷贝:利用零拷贝技术,提升网络 I/O 效率。
- 页缓存:利用操作系统的 PageCache 来缓存数据(典型的利用空间换时间)
- 批量读写:批量读写可以有效提升网络 I/O 效率。
- 数据压缩:Kafka 支持数据压缩,可以有效提升网络 I/O 效率。
- pull 模式:Kafka 架构基于 pull 模式,可以自主控制消费策略,提升传输效率。
高可用
- 持久化:Kafka 所有的消息都存储在磁盘,天然支持持久化。
- 副本机制:Kafka 的 Broker 集群支持副本机制,可以通过冗余,来保证其整体的可用性。
- 选举 Leader:Kafka 基于 ZooKeeper 支持选举 Leader,实现了故障转移能力。
伸缩性
- 分区:Kafka 的分区机制使得其具有良好的伸缩性。
概念
- 消息:Kafka 的数据单元被称为消息。消息由字节数组组成。
- 批次:批次就是一组消息,这些消息属于同一个主题和分区。
- 主题(Topic):Kafka 消息通过主题进行分类。主题就类似数据库的表。
- 不同主题的消息是物理隔离的;
- 同一个主题的消息保存在一个或多个 Broker 上。但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。
- 主题有一个或多个分区。
- 分区(Partition):分区是一个有序不变的消息序列,消息以追加的方式写入分区,然后以先入先出的顺序读取。Kafka 通过分区来实现数据冗余和伸缩性。
- 消息偏移量(Offset):表示分区中每条消息的位置信息,是一个单调递增且不变的值。
- 生产者(Producer):生产者是向主题发布新消息的 Kafka 客户端。生产者可以将数据发布到所选择的主题中。生产者负责将记录分配到主题中的哪一个分区中。
- 消费者(Consumer):消费者是从主题订阅新消息的 Kafka 客户端。消费者通过检查消息的偏移量来区分消息是否已读。
- 消费者群组(Consumer Group):多个消费者共同构成的一个群组,同时消费多个分区以实现高并发。
- 每个消费者属于一个特定的消费者群组(可以为每个消费者指定消费者群组,若不指定,则属于默认的群组)。
- 群组中,一个消费者可以消费多个分区
- 群组中,每个分区只能被指定给一个消费
- 再均衡(Rebalance):消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。分区再均衡是 Kafka 消费者端实现高可用的重要手段。
- Broker - 一个独立的 Kafka 服务器被称为 Broker。Broker 接受来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存;消费者向 Broker 请求消息,Broker 负责返回已提交的消息。
- 副本(Replica):Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用
生产者
生产者定义
Kafka Producer 发送的数据对象叫做 ProducerRecord ,它有 4 个关键参数:
Topic- 主题Partition- 分区(非必填)Key- 键(非必填)Value- 值
Kafka 生产者发送消息流程:
- 序列化 - 发送前,生产者要先把键和值序列化。
- 分区 - 数据被传给分区器。如果在
ProducerRecord中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据ProducerRecord的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。 - 批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。
- 批次,就是一组消息,这些消息属于同一个主题和分区。
- 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。
- 响应 - 服务器收到消息会返回一个响应。
- 如果成功,则返回一个
RecordMetaData对象,它包含了主题、分区、偏移量; - 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。
- 如果成功,则返回一个

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?
- 生产者会向任意 broker 发送一个元数据请求(
MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。 - 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

生产者 API
Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。
- 构造生产者对象所需的参数对象。
- 利用第 1 步的参数对象,创建
KafkaProducer对象实例。 - 使用
KafkaProducer的send方法发送消息。 - 调用
KafkaProducer的close方法关闭生产者并释放各种系统资源
1. 创建生产者
Kafka 生产者核心配置:
bootstrap.servers- 指定了 Producer 启动时要连接的 Broker 地址。注:如果你指定了 1000 个 Broker 连接信息,那么,Producer 启动时就会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,并不建议把集群中所有的 Broker 信息都配置到bootstrap.servers中,通常你指定 3 ~ 4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为bootstrap.servers指定所有的 Broker。key.serializer- 键的序列化器。value.serializer- 值的序列化器。
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);
2. 异步发送
直接发送消息,不关心消息是否到达。这种方式吞吐量最高,但有小概率会丢失消息。
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
3. 同步发送
返回一个 Future 对象,调用 get() 方法,会一直阻塞等待 Broker 返回结果。这是一种可靠传输方式,但吞吐量最差。
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
4. 异步响应发送
代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如:抛出异常、记录错误日志。这是一个折中的方案,即兼顾吞吐量,也保证消息不丢失。
首先,定义一个 callback:
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
然后,使用这个 callback:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
5. 关闭连接
调用 producer.close() 方法可以关闭 Kafka 生产者连接。
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topic, msg));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
producer.close();
}
TCP 连接
创建 TCP 连接
Kafka 生产者创建连接有三个时机:
- 在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与
bootstrap.servers中所有 Broker 的 TCP 连接。 - 当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。
- 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- 场景二:Producer 通过
metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
- 当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。
关闭 TCP 连接
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
- 主动关闭是指调用
producer.close()方法来关闭生产者连接;甚至包括用户调用kill -9主动“杀掉” Producer 应用。 - 如果设置 Producer 端
connections.max.idle.ms参数大于 0(默认为 9 分钟),意味着,在connections.max.idle.ms指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为-1,TCP 连接将成为永久长连接。
值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。
序列化
Kafka 内置了常用 Java 基础类型的序列化器,如:StringSerializer、IntegerSerializer、DoubleSerializer 等。但如果要传输较为复杂的对象,推荐使用序列化性能更高的工具,如:Avro、Thrift、Protobuf 等。
使用方式是通过实现 org.apache.kafka.common.serialization.Serializer 接口来引入自定义的序列化器。
分区
分区概念
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。
- 为什么要分区
- 为什么 Kafka 的数据结构采用三级结构?
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。
分区策略
所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。
分区器是生产者客户端的核心组件,所有分区计算逻辑(默认哈希 / 轮询、自定义规则)都在客户端本地完成,Broker 仅做分区合法性校验,不参与任何分区计算 —— 这种设计既降低了 Broker 压力,又保证了客户端分区逻辑的灵活性。注意分区器只是决定消息分区结果(送往哪个分区),至于 Topic 是如何分区的由 Kafka Controller 决定。
前文中已经提到,Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。
如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。
- 没有 key 时的分发逻辑:每隔
topic.metadata.refresh.interval.ms的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。 - 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错。
自定义分区策略
如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?
首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partition 和 close,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的 topic、key、keyBytes、value 和 valueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。
负载均衡算法常见的有:
- 随机算法 (Random): 类似“抽签”,请求随机分配到服务器。实现最简单,但在服务器性能不均或请求量较小时,分布不够均匀。
- 轮询算法 (Round Robin): 类似“报数”,按顺序循环分配请求。优点是绝对公平,缺点是不考虑服务器的实时压力和处理能力。
- 最小活跃数算法 (Least Active): 类似“看排队人数”,优先把请求发给当前连接数最少的服务器。它是动态的,能有效平衡长耗时任务带来的堆积。
- 源地址哈希算法 (Source Hash): 类似“认准老客户”,根据客户端 IP 计算哈希值,确保同一 IP 的请求始终落在同一台服务器,常用于解决 Session 保持或缓存命中问题。
可以根据实际需要去实现。
压缩
Kafka 消息格式
目前,Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
论哪个版本,Kafka 都不直接操作单条消息,而是批量操作:
- Message (Record Item / 消息): 真正包含用户数据的最小单元。
- Message Set (Batch / 消息集合): 包含一组消息。Kafka 的写入、网络传输和磁盘存储都是以“集”为单位进行的。
那么社区引入 V2 版本的目的是什么呢?V2 版本主要是为了解决 V1 在 CPU 开销 和 压缩效率 上的痛点。
优化一:CRC 校验逻辑的改变
CRC (循环冗余校验) 是用来检测数据在传输或存储过程中是否损坏的。
-
V1 的问题(粒度太细):
- V1 为每一条消息都计算一个 CRC 值。
- 痛点: 消息在 Broker 端可能会被修改(比如重新打时间戳,或者为了兼容旧版本做格式转换)。一旦修改,Broker 必须为每一条消息重新计算 CRC。这非常消耗 CPU,且每个消息都要存一份 CRC,浪费空间。
-
V2 的改进(粒度变粗):
- 改为只对整个 Message Set(消息集) 进行一次 CRC 校验。
- 好处: 大大减少了校验次数,节省了 CPU 算力和存储空间。
优化二:压缩方式的进化
压缩是提升 Kafka 吞吐量的关键,V2 在这里做了一个很聪明的改动。
-
V1 的做法(包装式压缩):
- 它把几条压缩后的消息,塞进一条“外层消息”的 Value 字段里。
- 这种方式比较笨重,因为外层还需要维护各种元数据。
-
V2 的做法(整体式压缩):
- 直接对整个 Message Set(消息集) 进行压缩。
- 好处: 压缩算法在面对更大的数据块时,通常能获得更好的压缩率(重复的数据特征更多)。这种“全量压缩”让磁盘占用更小,网络传输效率更高。
压缩
Kafka 的压缩流程,一言以概之:Producer 端压缩、Broker 端保持、Consumer 端解压缩。在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。
【示例】开启 GZIP 的 Producer 对象
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
通常,Broker 从 Producer 端接收到消息后,不做任何处理。以下两种情况除外:
- 情况一:Broker 端指定了和 Producer 端不同的压缩算法。显然,应该尽量避免这种情况。
- 情况二:Broker 端发生了消息格式转换。所谓的消息格式转换,主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
所谓零拷贝,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果 Kafka 享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。
解压缩
通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
压缩算法
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗。
启用压缩的时机
压缩是在 Producer 端完成的工作,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。
如果环境中带宽资源有限,那么建议开启压缩。
幂等性
幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
Kafka Producer 的幂等性
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。
我们必须要了解幂等性 Producer 的作用范围:
- 首先,
enable.idempotence只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。 - 其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!
幂等性实现
为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。
- PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
- Sequence Numbler。对于每个 PID,该 Producer 发送数据的每个
<Topic, Partition>都对应一个从 0 开始单调递增的 Sequence Number。
Broker 端在缓存中保存了这 seq number,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于 1 则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个 Producer 对于同一个 <Topic, Partition> 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 partion 幂等。

实现幂等之后:

生成 PID 的流程
在执行创建事务时,如下:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
会创建一个 Sender,并启动线程,执行如下 run 方法,在 maybeWaitForProducerId() 中生成一个 producerId,如下:
====================================
类名:Sender
====================================
void run(long now) {
if (transactionManager != null) {
try {
........
if (!transactionManager.isTransactional()) {
// 为idempotent producer生成一个producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
........
幂等性的应用实例
- 配置属性
enable.idempotence,需要设置为 ture,此时就会默认把 acks 设置为 all,所以不需要再设置 acks 属性了。
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启幂等性
properties.put("enable.idempotence", true);
// 设置重试次数
properties.put("retries", 3);
//Reduce the no of requests less than 0
properties.put("linger.ms", 1);
// buffer.memory 控制生产者可用于缓冲的内存总量
properties.put("buffer.memory", 33554432);
// 使用配置初始化 Kafka 生产者
producer = new KafkaProducer<>(properties);
- 发送消息
跟一般生产者一样,如下
public void produceIdempotMessage(String topic, String message) {
// 创建Producer
Producer producer = buildIdempotProducer();
// 发送消息
producer.send(new ProducerRecord<String, String>(topic, message));
producer.flush();
}
此时,因为我们并没有配置 transaction.id 属性,所以不能使用事务相关 API,如下
producer.initTransactions();
否则会出现如下错误:
Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.
at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)
at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)
Kafka 事务
Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作,同时成功或者失败。
消息可靠性保障,由低到高为:
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。
Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
事务型 Producer
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
事务属性实现前提是幂等性,即在配置事务属性 transaction.id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
在事务属性之前先引入了生产者幂等性,它的作用为:
- 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败。
- consumer-transform-producer 模式下,因为消费者提交偏移量出现问题,导致重复消费。需要将这个模式下消费者提交偏移量操作和生产者一系列生成消息的操作封装成一个原子操作。
消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交偏移量 o2 前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息 (o1 到 o2 之间的消息)。
事务型 Producer 是在幂等性基础上的拓展,核心能力有两个:
- 多分区 / 多 Topic 消息的原子写入:一批消息发往多个 Partition/Topic 时,要么全部成功,要么全部失败(回滚),这是你提到的 “原子性”,但需注意:事务仅保证 “写入原子性”,不保证 “消费原子性”;
- 消费偏移量提交 + 生产消息的原子绑定:解决你说的「consumer-transform-producer(消费 - 处理 - 生产)」模式下的重复消费问题 —— 将 “消费者提交偏移量” 和 “生产者发送新消息” 封装成一个事务,要么两者都成功,要么都失败。
事务操作的 API
Producer 提供了 initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五个事务方法。
/**
* 初始化事务。需要注意的有:
* 1、前提
* 需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();
/**
* 开启事务
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 为消费者提供的在事务内提交偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;
/**
* 提交事务
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 放弃事务,类似回滚事务的操作
*/
public void abortTransaction() throws ProducerFencedException ;
使用 kafka 的事务 api 时的一些注意事项:
- 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行
consumer#commitSync或者consumer#commitAsyc - 设置 Producer 端参数
transctional.id。最好为其设置一个有意义的名字。 - 和幂等性 Producer 一样,开启
enable.idempotence = true。如果配置了transaction.id,则此时enable.idempotence会被设置为 true - 消费者需要配置事务隔离级别
isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
事务应用示例
只有生产的事务操作
创建一个事务,在这个事务操作中,只有生成消息操作。代码如下:
/**
* 在一个事务只有生产消息操作
*/
public void onlyProduceInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
try {
// 3.kafka写操作集合
// 3.1 do业务逻辑
// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));
producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 3.3 do其他业务逻辑,还可以发送其他topic的消息。
// 4.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}
}
创建生产者,代码如下,需要:
- 配置
transactional.id属性 - 配置
enable.idempotence属性
/**
* 需要:
* 1、设置transactional.id
* 2、设置enable.idempotence
* @return
*/
private Producer buildProducer() {
// create instance for properties to access producer configs
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 设置事务id
props.put("transactional.id", "first-transactional");
// 设置幂等性
props.put("enable.idempotence",true);
//Set acknowledgements for producer requests.
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 1);
//Specify buffer size in config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
//props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
// Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
return producer;
}
消费 - 生产并存(consume-transform-produce)
是最典型场景的事务场景,如果不使用事务: 如果在第 3 步成功但第 4 步失败,程序重启后会重新消费,导致 Topic B 中出现重复数据。 使用事务后: 写入 Topic B 和提交 Offset 被绑定成一个原子操作。
- 从 Topic A 消费消息。
- 在内存中进行逻辑处理(如计算、过滤)。
- 将结果写入 Topic B。
- 提交 Offset 到
__consumer_offsets。
流程:
- Consume:从 Kafka 消费原始消息(可能来自集群中任意一台机器的生产者);
- Transform:对消息做加工(比如格式转换、数据清洗、业务计算);
- Produce:将加工后的消息写回 Kafka(供下游其他消费者使用)。
在一个事务中,既有生产消息操作又有消费消息操作,即常说的 Consume-tansform-produce 模式。如下实例代码
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();
// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();
// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}
// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 8.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}
创建消费者代码,需要:
- 将配置中的自动提交属性(auto.commit)进行关闭
- 而且在代码里面也不能使用手动提交 commitSync( ) 或者 commitAsync( )
- 设置 isolation.level
/**
* 需要:
* 1、关闭自动提交 enable.auto.commit
* 2、isolation.level为
* @return
*/
public Consumer buildConsumer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "group0323");
// 设置隔离级别
props.put("isolation.level","read_committed");
// 关闭自动提交
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
return consumer;
}
只有消费的事务操作
创建一个事务,在这个事务操作中,只有生成消息操作,如下代码。这种操作其实没有什么意义,跟使用手动提交效果一样,无法保证消费消息操作和提交偏移量(这里是指 Producer 接收到偏移量)操作在一个事务。
/**
* 在一个事务只有消息操作
*/
public void onlyConsumeInTransaction() {
Producer producer = buildProducer();
// 1.初始化事务
producer.initTransactions();
// 2.开启事务
producer.beginTransaction();
// 3.kafka读消息的操作集合
Consumer consumer = buildConsumer();
while (true) {
// 3.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);
try {
// 3.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 3.2.1 处理消息 print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
// 3.2.2 记录提交偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}
// 4.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");
// 5.事务提交
producer.commitTransaction();
} catch (Exception e) {
// 6.放弃事务
producer.abortTransaction();
}
}
}
生产者发送消息的流程
Kafka Controller 根据 Topic 分区规划(比如确定分区数量、副本分布、选举出 Leader 等)后,会将这些元数据信息同步给集群内的所有 Broker。每个 Broker 都会保存这些元数据,这样生产者和消费者在需要时,都能通过向任意一个 Broker 查询获取到 Topic 的分区信息。比如生产者要发送消息时,就会先查询元数据,然后根据自身的分区器来确定消息该发送到哪个 Broker 的哪个分区。
发送消息流程:
封装消息 → 序列化 → 分区路由 → 入缓冲区攒批 → 查询 Leader 地址 → 发送批次到 Leader → Leader 写入 + 副本同步 → 接收确认/重试
-
封装消息
生产者首先创建
ProducerRecord对象,封装消息的核心元数据 -
序列化
生产者通过配置的「序列化器」(
key.serializer/value.serializer),将ProducerRecord的key和value从字符串 / 对象转为字节数组(Kafka 仅传输字节数据) -
分区路由
若未手动指定
partition,生产者通过「分区器(Partitioner)」计算消息要发送到 Topic 的哪个分区,核心逻辑:- 默认分区器(DefaultPartitioner):
- 有 Key:对 Key 做哈希,取模分区数,保证相同 Key 落到同一分区;
- 无 Key:轮询(RoundRobin)分配分区,保证消息均匀分散;
- 自定义分区器:实现
Partitioner接口,比如按业务字段(如用户地域)路由分区;
- 默认分区器(DefaultPartitioner):
-
入缓冲区
缓冲区按「分区」分组,同一分区的消息会被封装为「批次(Batch)」,攒够批次或者时间到了发送
-
获取分区 Leader 地址(元数据查询)
发送批次前,生产者需知道目标分区的 Leader 所在 Broker 地址(生产者仅向 Leader 发送消息)
-
发送批次到 Leader Broker(网络传输)
生产者是要把消息发送到指定 partition,这里的 Leader Broker 是指有 Leader Partition 的 Broker,而不是所有 Broker 的 Leader!!!
-
Leader Broker 写入日志 + 副本同步(核心持久化)
Leader Partition 将批次写入本地磁盘日志;Follower Partition 定期从 Leader 拉取新消息,写入自己的日志;
-
生产者接收确认 + 回调(完成 / 异常处理)
生产者收到 Broker 的
ProduceResponse后,执行最终逻辑:- 成功:触发
Callback回调(若配置),标记消息发送完成; - 失败:
- 可重试异常(如 Leader 宕机、网络抖动):自动重试(
retries控制次数,retry.backoff.ms控制重试间隔); - 不可重试异常(如消息过大
RecordTooLargeException):触发回调抛出异常,需业务处理;
- 可重试异常(如 Leader 宕机、网络抖动):自动重试(
- 成功:触发
生产者配置
更详尽的生产者配置可以参考:Kafka 生产者官方配置说明
以下为生产者主要配置参数清单:
acks:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。默认为acks=1acks=0如果设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为 -1。acks=1如果设置为 1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。acks=all如果设置为 all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1 与 acks=all 是等效的。
buffer.memory:用来设置 Producer 缓冲区大小。compression.type:Producer 生成数据时可使用的压缩类型。默认值是 none(即不压缩)。可配置的压缩类型包括:none、gzip、snappy、lz4或zstd。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。retries:用来设置发送失败的重试次数。batch.size:用来设置一个批次可占用的内存大小。linger.ms:用来设置 Producer 在发送批次前的等待时间。client.id:Kafka 服务器用它来识别消息源,可以是任意字符串。max.in.flight.requests.per.connection:用来设置 Producer 在收到服务器响应前可以发送多少个消息。timeout.ms:用来设置 Broker 等待同步副本返回消息确认的时间,与acks的配置相匹配。request.timeout.ms:Producer 在发送数据时等待服务器返回响应的时间。metadata.fetch.timeout.ms:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。max.block.ms:该配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。max.request.size:请求的最大字节数。receieve.buffer.bytes:TCP 接收缓冲区的大小。send.buffer.bytes:TCP 发送缓冲区的大小。
消费者
消费者定义
Pull 模式
消息引擎获取消息有两种模式:
- push 模式:MQ 推送数据给消费者
- pull 模式:消费者主动向 MQ 请求数据

Kafka 消费者(Consumer)以 pull 方式从 Broker 拉取消息。相比于 push 方式,pull 方式灵活度和扩展性更好,因为消费的主动性由消费者自身控制。
push 模式的优缺点:
- 缺点:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。
pull 模式的优缺点:
- 优点:consumer 可以根据自己的消费能力自主的决定消费策略
- 缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞直到新消息到达
消费者
每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。
一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:

消费者群组
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。
Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer。
如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费。

不同消费者群组之间互不影响:

消费流程
Kafka 消费者通过 poll 来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:
- 消费者通过
customer.poll(time)中设置等待时间 - Broker 会等待累计一定量数据,然后发送给消费者。这样可以减少网络开销。

poll 除了获取消息外,还有其他作用:发送心跳信息。消费者通过向被指派为群组协调器的 Broker 发送心跳来维护他和群组的从属关系,当机器宕掉后,群组协调器触发再均衡。
消费者 API
创建消费者
Properties props = new Properties();
// 服务器地址
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
// 关闭自动提交偏移量
props.put("enable.auto.commit", "false");
// 设置 key 反序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置 value 反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
订阅主题
// 订阅主题列表
consumer.subscribe(Arrays.asList("t1", "t2"));
// 订阅所有与 test 相关的主题
consumer.subscribe("test.*");
subscribe 方法允许传入一个正则表达式,这样就可以匹配多个主题。如果有人创建了新的主题,并且主题名恰好匹配正则表达式,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题。
轮询获取消息
消息轮询是消费者 API 的核心。一旦消费者订阅了主题,轮询就会处理所有细节,包括:群组协调、分区再均衡、发送心跳和获取数据。
try {
// 3. 轮询
while (true) {
// 4. 消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
}
} finally {
// 5. 退出程序前,关闭消费者
consumer.close();
}
手动提交偏移量
同步提交
使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
同步提交的缺点:同步提交方式会一直阻塞,直到接收到 Broker 的响应请求,这会大大限制吞吐量。
异步提交
在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡(新消费者进来重复消费),就会出现重复消息。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
commitAsync() 也支持回调,在 Broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不过如果要用它来进行重试,则一定要注意提交的顺序。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) { log.error("Commit failed for offsets {}", offsets, e); }
}
});
}
重试异步提交
可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增全局序列号。在进行重试前,先检查回调的全局序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试;如果全局序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。
同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
因此,在消费者关闭前一般会组合使用 commitSync() 和 commitAsync()。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
提交特定的偏移量
提交偏移量的频率和处理消息批次的频率是一样的。如果想要更频繁地提交该怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。
解决办法是:消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。
private int count = 0;
// 缓存每个分区待提交的偏移量
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset() + 1, "no metadata"));
// 每 1000 条提交一次
if (count % 1000 == 0) { consumer.commitAsync(currentOffsets, null); }
count++;
}
}
从特定偏移量处开始处理
使用 poll() 方法可以从各个分区的最新偏移量处开始处理消息。不过,有时候,我们可能需要从特定偏移量处开始处理消息。
- 从分区的起始位置开始读消息:
seekToBeginning(Collection<TopicPartition> partitions)方法 - 从分区的末尾位置开始读消息:
seekToEnd(Collection<TopicPartition> partitions)方法 - 查找偏移量:
seek(TopicPartition partition, long offset)方法
通过 seek(TopicPartition partition, long offset) 可以实现处理消息和提交偏移量在一个事务中完成。思路就是需要在客户端建立一张数据表,保证处理消息和和消息偏移量位置写入到这张数据表。在一个事务中,此时就可以保证处理消息和记录偏移量要么同时成功,要么同时失败。
consumer.subscribe(topic);
// 1.第一次调用pool,加入消费者群组
// 直接 `subscribe` 后 Kafka 并不会立即分配分区,调用 `poll(0)` 会触发一次成员加入请求,确保消费者拿到了负责的分区列表(Assignment)。
consumer.poll(0);
// 2.获取负责的分区,并从本地数据库读取改分区最新偏移量,并通过seek方法修改poll获取消息的位置
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
// 只用数据的偏移量,处理成功 = 偏移量成功存入 DB
commitDBTransaction();
}
关闭连接
如果想让消费者从轮询消费消息的无限循环中退出,可以通过另一个线程调用 consumer.wakeup() 方法。 consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。仅作用于 Kafka 消费者的阻塞方法,比如 poll()(拉取消息)、commitSync()(同步提交偏移量)、seek()(定位偏移量)等。
调用 consumer.wakeup() 可以退出 poll() ,并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,调用 wakeup() 会标记 “待中断”,下次调用 poll() 时会立即抛出 WakeupException。
// 注册 JVM 关闭钩子(ShutdownHook)
// 当 JVM 即将退出时(如执行 `kill` 命令、Ctrl+C 终止进程),JVM 会启动这个钩子线程;
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup(); // 中断consumer的poll()阻塞
try {
// 等待消费主线程完成 `finally` 块的收尾逻辑(关闭消费者、提交偏移量等)
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
try {
while (true) { // 无限消费循环
ConsumerRecords<String, String> records = movingAvg.consumer.poll(1000); // 阻塞1秒拉取消息
// 打印消息和当前分区偏移量
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment()) {
System.out.println("Committing offset at position:" + consumer.position(tp));
}
movingAvg.consumer.commitSync(); // 每次拉取后同步提交偏移量
}
} catch (WakeupException e) {
// 捕获wakeup()触发的异常,仅退出循环,不处理(属于正常关闭)
} finally {
consumer.close(); // 最终关闭消费者,释放连接/资源
System.out.println("Closed consumer and we are done");
}
分区再均衡
基本概念
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均(Rebalance)。Rebalance 实现了消费者群组的高可用性和伸缩性。
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
当在群组里面 新增 / 移除消费者 或者 新增 / 移除 kafka 集群 broker 节点 时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。
触发时机
- 消费者群组成员数发生变更:比如有新的 Consumer 加入群组或者离开群组,或者是有 Consumer 实例崩溃被“踢出”群组。
- 新增消费者:consumer 订阅主题之后,第一次执行 poll 方法
- 移除消费者:执行
consumer.close()操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。
- 订阅主题数发生变更:Consumer Group 可以使用正则表达式的方式订阅主题,比如
consumer.subscribe(Pattern.compile(“t.*c”))就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。 - 订阅主题的分区数发生变更:Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
- 新增 broker:如重启 broker 节点
- 移除 broker:如 kill 掉 broker 节点。
总结:消费者 / 主题 / 分区,其中一个数量发生改变可能会触发分区再均衡。注意可能,比如不是被订阅的 Topic、不在消费(poll)的消费者数量发生变化不会触发,还有下面也是不触发:
- 消费者数 / 分区数 “变化后无分配调整”:比如消费者组有 3 个消费者,
order-topic有 3 个分区(1:1 分配),新增 1 个消费者但无新分区 —— 理论上无分区可分配,部分版本的 Kafka 会触发 “空再均衡”(仅校验分配,无实际分区调整),但新版(2.4+)会优化为不触发。
分区再均衡的过程
Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的
- 选择群主
当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区。
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
-
消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。
-
群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。
- Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2,消费者 2 负责分区 3-4,消费者 3 负责分区 5。
- RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。
-
群主分配完成之后,把分配情况发送给群组协调器。
-
群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息。

如何判断消费者已经死亡
消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。
当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。
查找协调者
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。
目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。
- 第 1 步:确定由哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。 - 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
分区再均衡问题
- 首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
- 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
- 最后,Rebalance 实在是太慢了。
那应该如何避免分区再平衡?
实际上,大部分情况下,导致分区再均衡的原因是:组成员数量发生变化。有两种情况,消费者并没有宕机,但也被视为消亡:
- 未及时发送心跳
- Consumer 消费时间过长
未及时发送心跳
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置 session.timeout.ms 和 heartbeat.interval.ms 的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。
- 设置
session.timeout.ms= 6s。 - 设置
heartbeat.interval.ms= 2s。 - 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即
session.timeout.ms>= 3 *heartbeat.interval.ms。
将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。
Consumer 消费时间过长
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。我之前有一个客户,在他们的场景中,Consumer 消费数据时需要将消息处理之后写入到 MongoDB。显然,这是一个很重的消费逻辑。MongoDB 的一丁点不稳定都会导致 Consumer 程序消费时长的增加。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。
GC 参数
如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了
偏移量提交
每次调用 poll() 方法,它总是会返回由生产者写入 Kafka 但还没有被消费者读取过的记录,Kafka 因此可以追踪哪些记录是被哪个群组的哪个消费者读取的。
更新分区当前位置的操作叫作提交。
重复消费与消息丢失
如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
(1)如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

由此可知,处理偏移量,会对客户端处理数据产生影响。
提交偏移量方案
- 老版本方法
老版本的 Consumer Group 把偏移量保存在 ZooKeeper 中。ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将偏移量保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销,有利于实现伸缩性。
这种方案的问题在于:ZooKeeper 其实并不适合进行高频的写操作,而 Consumer Group 的偏移量更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 偏移量保存在 ZooKeeper 中是不合适的做法。
- 新版本方法
消费者向一个叫做 _consumer_offsets 的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
_consumer_offsets 主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >。
通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建偏移量主题。偏移量主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。如果偏移量主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。分区数可以通过 offsets.topic.num.partitions 设置;副本数可以通过 offsets.topic.replication.factor 设置
获取偏移量流程
__consumer_offsets 主题保存的是:某一消费者组 消费 某一主题 的 某一分区 的 偏移量。通过消费者发起普通请求给 Coordinator 去查偏移量,而不是订阅 __consumer_offsets 主题的方式去消费偏移量。
步骤 1:消费者启动,找到所属消费组的 Coordinator
- 消费者向任意 Broker 发送请求:“我是消费组 G1 的成员,请问我的 Coordinator 是谁?”;
- Broker 根据公式
hash(消费组ID) % __consumer_offsets的分区数,计算出该消费组的偏移量存在__consumer_offsets的哪个分区; - 负责这个分区的 Broker 就是该消费组的 Coordinator,Broker 把 Coordinator 的地址返回给消费者。
步骤 2:向 Coordinator 请求 “已提交的偏移量”
- 消费者给 Coordinator 发请求:“我要消费 Topic T1 的 P0/P1 分区,请告诉我这个消费组上次提交的偏移量是多少?”;(让协调组者去查,而不是消费者自己,而且偏移量所在机器和协调者是同一个,直接本地查找)
Coordinator 做两件事:
- 若该消费组首次消费(
__consumer_offsets中无该组的偏移量记录):返回 “无偏移量”,消费者按auto.offset.reset配置处理(默认latest从最新偏移量开始,earliest从零开始); - 若该消费组非首次消费:从
__consumer_offsets中查该组 “T1-P0/T1-P1” 的已提交偏移量(比如 P0 是 1000,P1 是 800),返回给消费者。
步骤 3:消费者从指定偏移量开始拉取消息
- 消费者拿到偏移量后,向对应分区的 Leader Broker 发请求:“拉取 T1-P0 从 1000 开始的消息,T1-P1 从 800 开始的消息”,而非从零开始。
自动 / 手动提交
自动提交
自动提交是 Kafka 处理偏移量最简单的方式。
当 enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。
与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交最近一次 poll() 拉取到的消息的最后偏移量。
假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到 5s 的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。虽然可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗的时间跨度,不过这种情况是无法完全避免的。
在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。
自动提交的触发依赖 poll() 方法的调用 —— 消费者必须持续调用 poll(),后台的自动提交线程才会工作;若停止 poll(),自动提交也会停止(因为 Kafka 认为消费者 “未存活”)。
自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题。
手动提交
自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题。因此,可以通过手动提交偏移量,由开发者自行控制。
首先,把 enable.auto.commit 设为 false,关闭自动提交。
如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。
在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。
onPartitionsRevoked(临终遗言/权限收回) 这是在 “旧分区被拿走之前” 调用的。- 任务: 赶紧把内存里攒着的、还没处理完的消息处理掉,并立刻提交位移。
- 重要性: 这是该消费者对该分区拥有的“最后一次发言权”。在这里提交了位移,下一个接班的人才能无缝衔接,不会重读。
- 资源清理: 图片里提到的“关闭数据库连接、文件句柄”也该在这里做。
onPartitionsAssigned(新官上任/权限分配) 这是在 “新分区分配下来之后,正式开始读数据之前” 调用的。- 任务: 这里的核心通常是初始化状态。
- 关联之前的 DB 方案: 如果你把位移存在了数据库里,那么在这里你应该执行
consumer.seek(),从数据库里查出这个新分区对应的位移,强行拉回到正确的位置。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets=new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// 忽略异常,正在关闭消费者
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
反序列化器
生产者需要用序列化器将 Java 对象转换成字节数组再发送给 Kafka;同理,消费者需要用反序列化器将从 Kafka 接收到的字节数组转换成 Java 对象。
独立消费者
通常,会有多个 Kafka 消费者组成群组,关注一个主题。
但可能存在这样的场景:只需要一个消费者从一个主题的所有分区或某个特定的分区读取数据。这时,就不需要消费者群组和再均衡了,只需要把主题或分区分配给消费者,然后开始读取消息并提交偏移量。
如果是这样,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或为自己分配分区,但不能同时做这两件事。
// 手动获取 Topic 的分区信息
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");
// 手动指定消费者要消费的分区(跳过消费者组的自动分区分配逻辑)
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(),record.value());
}
consumer.commitSync();
}
}
消费者消费消息的流程
初始化消费者 → 订阅 Topic/分配分区 → 加入消费组 → 获取已提交偏移量 → poll() 拉取消息 → 解析 + 处理消息 → 提交偏移量 → 再均衡处理 → 优雅关闭
-
初始化消费者(配置 + 实例化)
创建
KafkaConsumer实例,配置核心参数(消费组、序列化、偏移量策略等) -
订阅 Topic / 手动分配分区(确定消费范围)
指定要消费的 Topic / 分区:
- 自动订阅 Topic:订阅后,消费者会加入消费组(调用 subscribe 并且 poll 后发生),由 Coordinator 自动分配分区,支持再均衡(容错 + 负载均衡)
- 手动分配分区(无再均衡,精准控制):精准消费指定分区、单消费者独占所有分区,无再均衡容错。
-
加入消费组 + 选举 Coordinator(集群协调)
- 消费者向任意 Broker 发送「加入消费组请求」,Broker 根据
hash(group.id) % __consumer_offsets分区数计算该组的 Coordinator(某台 Broker); - 消费者连接 Coordinator,成为消费组成员;
- Coordinator 触发「分区再均衡」(若需要),将 Topic 的分区分配给组内消费者。
- 消费者向任意 Broker 发送「加入消费组请求」,Broker 根据
-
获取已提交偏移量(确定消费起始位置)
- 若消费组首次消费(
__consumer_offsets无记录):按auto.offset.reset配置处理(latest从最新偏移量、earliest从最旧偏移量); - 若消费组非首次消费:Coordinator 从
__consumer_offsets读取已提交偏移量,返回给消费者; - 消费者可手动调整偏移量(如
seek()定位到指定位置)
- 若消费组首次消费(
-
消费者通过
poll()方法向分区 Leader Broker 拉取消息,这是消费的核心操作poll() 触发 → 检查本地元数据缓存 → 缓存有效:直接取分区 Leader 地址 → 缓存无效/Leader 变更:向任意 Broker 刷新元数据 → 获取最新 Leader 地址 → 向 Leader 发送拉取请求
-
解析消息(反序列化 + 业务处理)
消息处理失败时,不要更新偏移量(避免消息丢失),可重试或死信队列处理。
-
提交偏移量(确认消费完成)
消费者提交偏移量(已处理的最后偏移量 + 1),提交方式:
- 手动异步提交(生产环境首选,保性能):非阻塞式提交,提交请求发送后立即返回,失败仅触发回调(无自动重试)。
- 手动同步提交(兜底用,保一致性)
- 自动提交(仅测试 / 低要求场景):存在丢消息风险,因为逻辑与 “消息是否处理完成无关”。
-
再均衡处理(容错 + 负载均衡)
若消费组发生再均衡(消费者加入 / 退出、Topic 扩分区等),触发以下逻辑:
- 消费者暂停消费,释放持有的分区;
- Coordinator 重新分配分区;
- 消费者在
onPartitionsRevoked()中提交最后偏移量,在onPartitionsAssigned()中接管新分区。
-
优雅关闭(收尾 + 资源释放)
消费者配置
bootstrap.servers- Broker 集群地址,格式:ip1:port,ip2:port…,不需要设定全部的集群地址,设置两个或者两个以上即可。group.id- 消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。fetch.min.bytes- 单次拉取的最小字节数(Broker 端)。Kafka 会等到有足够的数据时才返回消息给消费者,以降低负载。fetch.max.wait.ms- Kafka 需要等待足够的数据才返回给消费者,如果一直没有足够的数据,消费者就会迟迟收不到消息。所以需要指定 Broker 的等待延迟,一旦超时,直接返回数据给消费者。max.partition.fetch.bytes- 单个分区单次拉取的最大字节数(Broker 端)。默认为 1 MB。session.timeout.ms- 指定了消费者的心跳超时时间。如果消费者没有在有效时间内发送心跳给群组协调器,协调器会视消费者已经消亡,从而触发分区再均衡。默认为 3 秒。auto.offset.reset- 指定了消费者在读取一个没有偏移量的分区或偏移量无效的情况下,该如何处理。latest- 表示在偏移量无效时,消费者将从最新的记录开始读取分区记录。earliest- 表示在偏移量无效时,消费者将从起始位置读取分区记录。
enable.auto.commit- 指定了是否自动提交消息偏移量,默认开启。partition.assignment.strategy- 消费者的分区分配策略。Range- 表示会将主题的若干个连续的分区分配给消费者。RoundRobin- 表示会将主题的所有分区按照轮询方式分配给消费者。
client.id- 客户端标识。max.poll.records- 单次 poll () 拉取的最大消息条数(Consumer 端)receive.buffer.bytes- 用于设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 64KB。如果设置为 -1,则使用操作系统的默认值。控制消费者与 Broker 通信的 Socket 接收缓冲区大小,仅影响网络传输效率,不直接限制拉取数据量。send.buffer.bytes- 用于设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 128KB。与 receive.buffer.bytes 参数一样,如果设置为 -1,则使用操作系统的默认值。
Kafka 集群
Kafka 是一个分布式的、可水平扩展的、基于发布/订阅模式的、支持容错的消息系统。
Kafka 和 Zookeeper
Kafka 使用 Zookeeper 来维护集群成员的信息。每个 Broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在 Broker 启动的时候,它通过创建临时节点把自己的 ID 注册到 Zookeeper。
Kafka 组件订阅 Zookeeper 的 /broker/ids 路径,当有 Broker 加入集群或退出集群时,这些组件就可以获得通知。
如果要启动另一个具有相同 ID 的 Broker,会得到一个错误——新 Broker 会试着进行注册,但不会成功,因为 ZooKeeper 中已经有一个具有相同 ID 的 Broker。
在 Broker 停机、出现网络分区或长时间垃圾回收停顿时,Broker 会与 ZooKeeper 断开连接,此时 Broker 在启动时创建的临时节点会自动被 ZooKeeper 移除。监听 Broker 列表的 Kafka 组件会被告知 Broker 已移除。

Kafka 在 ZooKeeper 的关键存储信息:
admin:存储管理信息。主要为删除主题事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)brokers:存储 Broker 相关信息。broker 节点以及节点上的主题相关信息cluster:存储 kafka 集群信息config:存储 broker,client,topic,user 以及 changer 相关的配置信息consumers:存储消费者相关信息controller:存储控制器节点信息controller_epoch:存储控制器节点当前的年龄(说明控制器节点变更次数)
ZooKeeper 两个重要特性:
- 客户端会话结束时,ZooKeeper 就会删除临时节点。
- 客户端注册监听它关心的节点,当节点状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端。
- 详细内容可以参考:ZooKeeper 原理
控制器
控制器(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。控制器其实就是一个 Broker,只不过它除了具有一般 Broker 的功能以外,还负责 Leader 的选举。

如何选举控制器
集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个在 ZooKeeper 成功创建 /controller 临时节点的 Broker 会被指定为控制器。
选举控制器的详细流程:

- 第一个在 ZooKeeper 中成功创建
/controller临时节点的 Broker 会被指定为控制器。 - 其他 Broker 在控制器节点上创建 Zookeeper watch 对象。
- 如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 临时节点就会消失。集群中的其他 Broker 通过 watch 对象得到状态变化的通知,它们会尝试让自己成为新的控制器。
- 第一个在 Zookeeper 里创建一个临时节点
/controller的 Broker 成为新控制器。其他 Broker 在新控制器节点上创建 Zookeeper watch 对象。 - 每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他节点会忽略旧的 epoch 的消息。
- 当控制器发现一个 Broker 已离开集群,并且这个 Broker 是某些 Partition 的 Leader。此时,控制器会遍历这些 Partition,并用轮询方式确定谁应该成为新 Leader,随后,新 Leader 开始处理生产者和消费者的请求,而 Follower 开始从 Leader 那里复制消息。
简而言之,Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行 Partition Leader 选举。控制器使用 epoch 来避免“脑裂”,“脑裂”是指两个节点同时被认为自己是当前的控制器。
控制器的作用
Topic 管理(创建、删除、增加分区)
这里的 Topic 管理,就是指控制器帮助我们完成对 Kafka Topic 的创建、删除以及分区增加的操作。换句话说,当我们执行 kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
分区重分配
分区重分配主要是指,kafka-reassign-partitions 脚本(关于这个脚本,后面我也会介绍)提供的对已有 Topic 分区进行细粒度的分配功能。这部分功能也是控制器实现的。
选举 Leader
Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。在专栏后面说到工具的时候,我们再详谈 Preferred 领导者选举,这里你只需要了解这也是控制器的职责范围就可以了。
集群成员管理
集群成员管理,包括自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。
比如,控制器组件会利用 Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
数据服务
控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
控制器中保存了多种数据,比较重要的的数据有:
- 所有 Topic 信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
- 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
- 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。
值得注意的是,这些数据其实在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。有了这些数据,控制器就能对外提供数据服务了。这里的对外主要是指对其他 Broker 而言,控制器通过向这些 Broker 发送请求的方式将这些数据同步到其他 Broker 上。
副本
副本机制是分布式系统实现高可用的不二法门,Kafka 也不例外。
副本机制有哪些好处?
- 提供可用性:有句俗语叫:鸡蛋不要放在一个篮子里。副本机制也是一个道理——当部分节点宕机时,系统仍然可以依靠其他正常运转的节点,从整体上对外继续提供服务。
- 提供伸缩性:通过增加、减少机器可以控制系统整体的吞吐量。
- 改善数据局部性:允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
但是,Kafka 只实现了第一个好处,原因后面会阐述。
Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition,每个 Partition 有多个副本。每个 Broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志。

(注意图片有个逻辑问题:同一个 Topic 的所有分区,副本因子是统一的,创建 Topic 时指定,比如 replication-factor=2,则该 Topic 下所有分区的副本数都是 2)
Kafka 副本有两种角色:
- Leader 副本(主):每个 Partition 都有且仅有一个 Leader 副本。为了保证数据一致性,Leader 处理一切对 Partition (分区)的读写请求;
- Follower 副本(从):Leader 副本以外的副本都是 Follower 副本。Follower 唯一的任务就是从 Leader 那里复制消息,保持与 Leader 一致的状态。
- 如果 Leader 宕机,其中一个 Follower 会被选举为新的 Leader。

为了与 Leader 保持同步,Follower 向 Leader 发起获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。
Leader 另一个任务是搞清楚哪个 Follower 的状态与自己是一致的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但是在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本是不同步的,在 Leader 失效时,就不可能成为新的 Leader——毕竟它没有包含全部的消息。
除了当前首领之外,每个分区都有一个首选首领——创建 Topic 时选定的首领就是分区的首选首领。之所以叫首选 Leader,是因为在创建分区时,需要在 Broker 之间均衡 Leader。
ISR 同步副本
ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。
判断 Follower 是否与 Leader 同步的标准:
Kafka Broker 端参数 replica.lag.time.max.ms 参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。
选举 Leader
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
处理请求
Broker 的大部分工作是处理客户端、Partition 副本和控制器发送给 Partition Leader 的请求。Kafka 提供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 Broker 如何对请求作出响应。
broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接,并把它交给 Processor 线程去处理。Processor 线程的数量是可配置的。Processor 线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。
当请求放进请求队列后,IO 线程负责进行处理。
生产请求和获取请求都需要发送给 Partition 的 Leader 副本处理。如果 Broker 收到一个针对特定分区的请求,而该分区的 Leader 在另一个 Broker 上,那么发送请求的客户端会收到一个“非分区 Leader”的错误响应。Kafka 客户端要自己负责把生成请求和获取请求发送到正确的 Broker 上。
元数据请求
客户端怎么知道哪个是 Leader 呢?客户端通过使用另一种类型的请求来实现,那就是元数据请求(metadata request)。这种请求包含了客户端感兴趣的 Topic 列表。broker 的响应消息指明了这些 Topic 所包含的 Partition、Partition 有哪些副本,以及哪个副本是 Leader。元数据请求可以发给任意一个 broker,因为所有 Broker 都缓存了这些信息。
客户端会把这些信息缓存起来,并直接往目标 Broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms 来配置),从而知道元数据是否发生了变化。

生产请求
acks 参数控制多少个副本确认写入成功后生产者才认为消息生产成功。这个参数的取值可以为:
acks=0- 消息发送完毕,生产者认为消息写入成功;acks=1- Leader 写入成功,生产者认为消息写入成功;acks=all- 所有同步副本写入成功,生产者才认为消息写入成功。
如果 Leader 收到生产消息,它会执行一些检查逻辑,包含:
- 发送的用户是否有权限写入 Topic?
- 请求的
acks参数取值是否合法(只允许0,1,all)? - 如果
acks设置为all,是否有足够的同步副本已经安全写入消息?(我们可以配置如果同步副本数量不足,Leader 拒绝处理新消息)
之后,消息被写入到本地磁盘。一旦消息本地持久化后,如果 acks 被设为 0 或 1,那么会返回结果给客户端;如果 acks 被设为 all 那么会将请求放置在一个称为 purgatory 的缓冲区中等待其他的副本写入完成。
消费请求
Leader 处理拉取请求和处理生产请求的方式很相似:
- 请求需要先到达指定的 Partition Leader 上,然后客户端通过查询元数据来确保请求的路由是正确的。
- Leader 在收到请求时,会先检查请求是否有效。
- 如果请求的偏移量存在,Broker 将按照客户端指定的数量上限从 Partition 里读取消息,再把消息返回给客户端。Kafka 使用零拷贝技术向客户端发送消息——也就是说,Kafka 直接把消息从文件(更准确的说,是文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这避免了内存的字节拷贝和缓冲区维护,极大地提高了性能。
客户端可以指定 Broker 返回数据量的上限和下限,防止数据量过大造成客户端内存溢出。同时,客户端也可以指定返回的最小数据量,当消息数据量没有达到最小数据量时,请求会一直阻塞直到有足够的数据返回。指定最小的数据量在负载不高的情况下非常有用,通过这种方式可以减轻网络往返的额外开销。当然请求也不能永远的阻塞,客户端可以指定最大的阻塞时间,如果到达指定的阻塞时间,即便没有足够的数据也会返回。

不是所有 Leader 的数据都能够被读取。消费者只能读取已提交的消息。只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
因为还没有被足够的副本持久化的消息,被认为是不安全的——如果 Leader 发生故障,另一个副本成为新的 Leader,这些消息就丢失了。如果允许读取这些消息,就可能会破坏数据一致性。
这也意味着,如果 Broker 间的消息复制因为某些原因变慢了,那么消息到达消费者的时间也会随之变长。
延迟时间可以通过 replica.lag.time.max.ms 来配置,它指定了副本在复制消息时可被允许的最大延迟时间。

其他请求
我们讨论了 Kafka 中最常见的三种请求类型:元信息请求,生产请求和拉取请求。这些请求都是使用的是 Kafka 的自定义二进制协议。集群中 Broker 间的通信请求也是使用同样的协议,这些请求是内部使用的,客户端不能发送。比如在选举 Partition Leader 过程中,控制器会发送 LeaderAndIsr 请求给新的 Leader 和其他跟随副本。
这个协议目前已经支持 20 种请求类型,并且仍然在演进以支持更多的类型
总结
副本机制
- 每个 Partition 都有一个 Leader,零个或多个 Follower。
- Leader 处理一切对 Partition (分区)的读写请求;而 Follower 只需被动的同步 Leader 上的数据。
- 同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份。
选举机制
Follower 宕机,啥事儿没有;Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。 生产者/消费者如何知道谁是 Leader
- Kafka 将这种元数据存储在 Zookeeper 服务中。
- 生产者和消费者都和 Zookeeper 连接并通信。
可靠传输
消息不丢失
如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。
一条消息从生产到消费,可以划分三个阶段:

- 生产阶段:Producer 创建消息,并通过网络发送给 Broker。
- 存储阶段:Broker 收到消息并存储,如果是集群,还要同步副本给其他 Broker。
- 消费阶段:Consumer 向 Broker 请求消息,Broker 通过网络传输给 Consumer。
这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。
存储阶段
存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
上面的话可以解读为:
- 已提交:只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
- 持久化:Kafka 的数据存储在磁盘上,所以只要写入成功,天然就是持久化的。
- 只要还有一个副本是存活的,那么已提交的消息就不会丢失。
- 消费者只能读取已提交的消息。
副本机制
Kafka 的副本机制是 kafka 可靠性保证的核心。
Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。
Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。
1. 副本数
replication.factor 作用是设置每个分区的副本数。replication.factor 是主题级别配置; default.replication.factor 是 broker 级别配置。
副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
2. 不完全的选主
unclean.leader.election.enable 用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable 是 broker 级别(实际上是集群范围内)配置,默认值为 true。
- 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息的风险;
- 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。
3. 最少同步副本
min.insync.replicas 控制的是消息至少要被写入到多少个副本才算是“已提交”。min.insync.replicas 是主题级别和 broker 级别配置。
尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
注意:要确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
生产阶段
在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。Kafka 有三种发送方式:同步、异步、异步回调。
同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。
解决生产者丢失消息的方案:
生产者使用异步回调方式 producer.send(msg, callback) 发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。
- 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;
- 如果是消息不合格造成的,那么可以调整消息格式后再次发送。
然后,需要基于以下几点来保证 Kafka 生产者的可靠性
ACK
生产者可选的确认模式有三种:acks=0、acks=1、acks=all。
acks=0、acks=1都有丢失数据的风险。acks=all意味着会等待所有同步副本都收到消息。再结合min.insync.replicas,就可以决定在得到确认响应前,至少有多少副本能够收到消息。这是最保险的做法,但也会降低吞吐量。
重试
如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。
- 可重试错误,如:
LEADER_NOT_AVAILABLE,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。 - 不可重试错误,如:
INVALID_CONFIG,即使重试,也无法改变配置选项,重试没有意义。
需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
错误处理
开发者需要自行处理的错误:
- 不可重试的 broker 错误,如消息大小错误、认证错误等;
- 消息发送前发生的错误,如序列化错误;
- 生产者达到重试次数上限或消息占用的内存达到上限时发生的错误。
消费阶段
前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。
消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。

消费者的可靠性配置
group.id- 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的group.id。auto.offset.reset- 有两个选项:earliest- 消费者会从分区的开始位置读取数据latest- 消费者会从分区末尾位置读取数据
enable.auto.commit- 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。auto.commit.interval.ms- 自动提交的频率,默认为每 5 秒提交一次。
显示提交偏移量
如果 enable.auto.commit 设为 true,即自动提交,就无需考虑提交偏移量的问题。如果选择显示提交偏移量,需要考虑以下问题:
- 必须在处理完消息后再发送确认(提交偏移量),不要收到消息立即确认。
- 提交频率是性能和重复消息数之间的权衡
- 分区再均衡
- 消费可能需要重试机制
- 超时处理
- 消费者可能需要维护消费状态,如:处理完消息后,记录在数据库中。
- 幂等性设计
- 写数据库:根据主键判断记录是否存在
- 写 Redis:set 操作天然具有幂等性
- 复杂的逻辑处理,则可以在消息中加入全局 ID
重复消息
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
- At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性。常用的实现幂等操作的方法:
- 利用数据库的唯一约束实现幂等
关系型数据库可以使用 INSERT IF NOT EXIST 语句防止重复;Redis 可以使用 SETNX 命令来防止重复;其他数据库只要支持类似语义,也是一个道理。
- 为更新的数据设置前置条件
如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
- 记录并检查操作
还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。
消息有序性
某些场景下,可能会要求按序发送消息。
-
单 Partition
Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。
- 优点:简单粗暴。开发者什么也不用做。
- 缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。
- 结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。
-
同一个 key 的消息发送给指定 Partition
- 生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。
- 接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。
消息积压
- 先修复消费者,然后停掉当前所有消费者。
- 新建 Topic,扩大分区,以提高并发处理能力。
- 创建临时消费者程序,并部署在多节点上,扩大消费处理能力。
- 最后处理完积压消息后,恢复原先部署架构。
验证系统可靠性
建议从 3 个层面验证系统的可靠性:
- 配置验证
- 应用验证
- 客户端和服务器断开连接
- 选举
- 依次重启 broker
- 依次重启生产者
- 依次重启消费者
- 监控可靠性
- 对于生产者来说,最重要的两个指标是消息的
error-rate和retry-rate。如果这两个指标上升,说明系统出了问题。 - 对于消费者来说,最重要的指标是
consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。
- 对于生产者来说,最重要的两个指标是消息的
最佳实践
- 生产者
- 不要使用
producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法。 - 设置
acks = all。acks是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。 - 设置
retries为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0的 Producer 能够自动重试消息发送,避免消息丢失。
- 服务器(Kafka Broker)
- 设置
unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。 - 设置
replication.factor>= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas> 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 - 确保
replication.factor>min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1。
- 消费者
- 确保消息消费完成再提交。Consumer 端有个参数
enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的
Kafka 存储
逻辑存储

持久化
持久化是 Kafka 的一个重要特性。
Kafka 集群持久化保存(使用可配置的保留期限)所有发布记录 —— 无论它们是否被消费。但是,Kafka 不会一直保留数据,也不会等待所有的消费者读取了消息才删除消息。只要数据量达到上限(比如 1G)或者数据达到过期时间(比如 7 天),Kafka 就会删除旧消息。Kafka 的性能和数据大小无关,所以长时间存储数据没有什么问题。
Kafka 对消息的存储和缓存严重依赖于文件系统。
- 顺序磁盘访问在某些情况下比随机内存访问还要快!在 Kafka 中,所有数据一开始就被写入到文件系统的持久化日志中,而不用在 cache 空间不足的时候 flush 到磁盘。实际上,这表明数据被转移到了内核的 pagecache 中。所以,虽然 Kafka 数据存储在磁盘中,但其访问性能也不低。
- Kafka 的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。Consumer 每次获取多个大型有序的消息块,并由服务端依次将消息块一次加载到它的日志中。这可以有效减少大量的小型 I/O 操作。
- 由于 Kafka 在 Producer、Broker 和 Consumer 都共享标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。这可以避免字节拷贝带来的开销。(这里的 “零” 是指减少用户态与内核态之间的拷贝,让数据从内核态缓冲区直接传递,减少了用户态与内核态之间的拷贝开销,但 CPU 还是会参与控制流程比如调度、协议处理等,只是不再处理字节拷贝的工作)
- Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 Consumer 消费时解压缩。压缩传输数据,可以有效减少网络带宽开销。Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议。
所有这些优化都允许 Kafka 以接近网络速度传递消息。
物理存储
Log
Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:
请注意:这里的主题只是一个逻辑上的抽象概念,实际上,Kafka 的基本存储单元是 Partition。Partition 无法在多个 Broker 间进行再细分,也无法在同一个 Broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制。
Partiton 命名规则为 Topic 名称 + 有序序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partition 数量减 1。
Log 是 Kafka 用于表示日志文件的组件。每个 Partiton 对应一个 Log 对象,在物理磁盘上则对应一个目录。如:创建一个双分区的主题 test,那么,Kafka 会在磁盘上创建两个子目录:test-0 和 test-1;而在服务器端,这就对应两个 Log 对象
Log Segment

因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成若干个片段,即日志段(Log Segment)。默认每个 Segment 大小不超过 1G,且只包含 7 天的数据。如果 Segment 的消息量达到 1G,那么该 Segment 会关闭,同时打开一个新的 Segment 进行写入。
Broker 会为 Partition 里的每个 Segment 打开一个文件句柄(包括不活跃的 Segment),因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。正在写入的分片称为活跃片段(active segment),活跃片段永远不会被删除。
Segment 文件命名规则:Partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
Segment 文件可以分为两类:
-
索引文件
- 偏移量索引文件(
.index) - 时间戳索引文件(
.timeindex) - 已终止事务的索引文件(
.txnindex):如果没有使用 Kafka 事务,则不会创建该文件
- 偏移量索引文件(
-
日志数据文件(
.log)
文件格式
Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式和从生产者发送过来或消费者读取的数据格式是一样的。因为使用了相同的数据格式,使得 Kafka 可以进行零拷贝技术给消费者发送消息,同时避免了压缩和解压。
除了键、值和偏移量外,消息里还包含了消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip 或者 LZ4)和时间戳(0.10.0 新增)。时间戳可以是生产者发送消息的时间,也可以是消息到达 Broker 的时间,这个是可配的。
如果生产者发送的是压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:

如果生产者使用了压缩功能,发送的批次越大,就意味着能获得更好的网络传输效率,并且节省磁盘存储空间。
Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。
索引
Kafka 允许消费者从任意有效的偏移量位置开始读取消息。Kafka 为每个 Partition 都维护了一个偏移量索引(即 .index 文件),该索引将偏移量映射到片段文件以及偏移量在文件里的位置。
索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和。如果索引出现损坏,Kafka 会通过重读消息并录制偏移量和位置来重新生成索引。如果有必要,管理员可以删除索引,这样做是绝对安全的,Kafka 会自动重新生成这些索引。
索引文件用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由 offset 和 position 组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka 采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过“index.interval.bytes”设置索引的跨度;
有了偏移量索引文件,通过它,Kafka 就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的 position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。下面是 Kafka 中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。

- N: 消息的偏移量(offset);
- position: 偏移量对应的消息在日志文件(.log)中的物理位置
清理
log.cleanup.policy 是 Kafka 中控制 Topic 日志清理规则的核心配置,主要有两种取值:
- delete(默认):按「时间」或「大小」删除过期日志片段。
- compact(日志压缩):按「消息键(key)」去重,保留每个 key 的最新版本消息。
每个日志片段可以分为以下两个部分:
- 干净的部分:这部分消息之前已经被清理过,每个键只存在一个值。
- 污浊的部分:在上一次清理后写入的新消息。
如果在 Kafka 启动时启用了清理功能(通过 log.cleaner.enabled 配置),每个 Broker 会启动一个清理管理器线程和若干个清理线程,每个线程负责一个 Partition。
清理线程会读取污浊的部分,并在内存里创建一个 map。map 的 key 是消息键的哈希吗,value 是消息的偏移量。对于相同的键,只保留最新的位移。其中 key 的哈希大小为 16 字节,位移大小为 8 个字节。也就是说,一个映射只有 24 字节,假设消息大小为 1KB,那么 1GB 的段有 1 百万条消息,建立这个段的映射只需要 24MB 的内存,映射的内存效率是非常高效的。
在配置 Kafka 时,管理员需要设置这些清理线程可以使用的总内存。如果设置 1GB 的总内存同时有 5 个清理线程,那么每个线程只有 200MB 的内存可用。在清理线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka 会一次清理最老的几个脏段,然后在下一次再处理其他的脏段。
一旦建立完脏段的键与位移的映射后,清理线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。
分区有以下消息(从老到新):旧段 (Clean/Old) + 脏段 (Dirty/New)。扫描脏段生成 Map,根据 Map 去清理旧段
对于一个段,清理前后的效果如下:

这个清理过程不会删除消息的键,而是根据键的唯一性保留最新的消息、删除旧的重复键消息。
删除事件
对于只保留最新消息的清理策略来说,Kafka 还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个 null 的消息内容。当清理线程发现这条消息时,它首先仍然进行一个正常的清理并且保留这个包含 null 的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,清理线程会删除这条消息,这个键会从 Partition 中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。
流式处理
流处理定义
是什么
数据流是无边界数据集的抽象表示。无边界意味着无限和持续增长。无边界数据集之所以是无限的,是因为随着时间的推移,新的记录会不断加入进来。
- 事件流是有序的。事件的发生总是有先后顺序。而数据库里的记录是无序的。
- 不可变的数据记录。事件一旦发生,就不能被改变。我们不执行 UPDATE(更新)操作。如果一个用户改了家庭地址,流处理不会去修改旧的那条“地址事件”,而是产生一条新的“地址变更事件”。
- 事件流是可重播的。对于大多数业务来说,重播发生在几个月前(甚至几年前)的原始事件流是一个很重要的需求。可能是为了尝试使用新的分析方法纠正过去的错误,或是为了进行审计。如果没有这项能力,流式处理充其量只是数据科学实验室里的一个玩具而已。
流式处理是指实时地处理一个或多个事件流。流式处理是一种编程范式,就像请求与响应范式和批处理范式那样。
编程范式对比
- 请求与响应 - 这是延迟最小的一种范式,响应时间处于亚毫秒到毫秒之间,而且响应时间一般非常稳定。这种处理模式一般是阻塞的,应用程序向处理系统发出请求,然后等待响应。
- 批处理 - 这种范式具有高延迟和高吞吐量的特点。处理系统按照设定的时间启动处理进程,读取所有的输入数据(从上一次执行之后的所有可用数据,或者从月初开始的所有数据等),输出结果,然后等待下一次启动。处理时间从几分钟到几小时不等,并且用户从结果里读到的都是旧数据。一般用于 BI 生成分析报表。
- 流式处理 - 这种范式介于上述两者之间。大部分的业务不要求亚毫秒级的响应,不过也接受不了长时间的等待。大部分业务流程都是持续进行的,只要业务报告保持更新,业务产品线能够持续响应,那么业务流程就可以进行下去,而无需等待特定的响应,也不要求在几毫秒内得到响应。一些业务流程具有持续性和非阻塞的特点。 流的定义不依赖任何一个特定的框架、 API 或特性。只要持续地从一个无边界的数据集读取数据,然后对它们进行处理并生成结果,那就是在进行流式处理。重点是,整个处理过程必须是持续的。
流处理的核心概念
- 时间
时间或许是流式处理最为重要的概念。大部分流式应用的操作都是基于时间窗口的。有这么几个时间概念:
- 事件时间 - 事件时间是指所追踪事件的发生时间和记录的创建时间。
- 日志追加时间 - 日志追加时间是指事件保存到 broker 的时间。
- 处理时间 - 处理时间是指应用程序在收到事件之后要对其进行处理的时间。这个时间可以是在事件发生之后的几毫秒、几小时或几天。同一个事件可能会被分配不同的时间戳,这取决于应用程序何时读取这个事件。如果应用程序使用了两个线程来读取同一个事件,这个时间戳也会不一样!所以这个时间戳非常不可靠,应该避免使用它。
注意:在处理与时间有关的问题时,需要注意时区问题。整个数据管道应该使用同一个时区。
- 状态
如果只是单独处理每一个事件,那么流式处理就很简单。如果操作里包含了多个事件,流式处理就会变得复杂而有趣。事件与事件之间的信息被称为状态。这些状态一般被保存在应用程序的本地变量里。
流式处理含以下几种状态:
- 本地状态或内部状态 - 这种状态只能被单个应用程序实例访问,它们一般使用内嵌在应用程序里的数据库进行维护和管理。本地状态的优势在于它的速度,不足之处在于它受到内存大小的限制 。 所以,流式处理的很多设计模式都将数据拆分到多个子流,这样就可以使用有限的本地状态来处理它们。
- 外部状态 - 这种状态使用外部的数据存储来维护,一般使用 NoSQL 系统,比如 Cassandra。大部分流式处理应用尽量避免使用外部存储,或者将信息缓存在本地,减少与外部存储发生交互,以此来降低延迟,而这就引入了如何维护内部和外部状态一致性的问题。
- 流和表
流是一系列事件,每个事件就是一个变更。表包含了当前的状态,是多个变更所产生的结果。所以说,表和流是同一个硬币的两面,世界总是在发生变化,用户有时候关注变更事件,有时候则关注世界的当前状态。如果一个系统允许使用这两种方式来查看数据,那么它就比只支持一种方式的系统强大。
- 时间窗口
时间窗口有不同的类型,基于以下属性决定:
- 窗口的大小
- 窗口移动的频率
- 窗口的可更新时间多长
流处理设计模式
单个事件处理
处理单个事件是流式处理最基本的模式。这个模式也叫 map 或 filter 模式,因为它经常被用于过滤无用的事件或者用于转换事件( map 这个术语是从 Map-Reduce 模式中来的, map 阶段转换事件, reduce 阶段聚合转换过的事件)。
在这种模式下,应用程序读取流中的事件 ,修改它们,然后把事件生成到另一个流上。
使用本地状态
大部分流式处理应用程序关心的是如何聚合信息,特别是基于时间窗口进行聚合。
要实现这些聚合操作,需要维护流的状态,可以通过本地状态(而不是共享状态)来实现。
如果流式处理应用包含了本地状态,会变得非常复杂,还需要解决下列问题:
- 内存使用 - 应用实例必须有可用的内存来保存本地状态。
- 持久化 - 要确保在应用程序关闭时不会丢失状态,并且在应用程序重启后或者切换到另一个应用实例时可以恢复状态。
- 再均衡 - 有时候,分区会被重新分配给不同的消费者。在这种情况下,失去分区的实例必须把最后的状态保存起来 , 同时获得分区的实例必须知道如何恢复到正确的状态。
多阶段处理和重分区
数据量不大的时候,可以使用本地状态。但面对海量的流数据时,可以使用多阶段处理(类似 Hadoop 的 map reduce
流和表的连接
有些场景下,流式处理需要将外部数据和流集成在一起。
可以考虑将外部的数据信息(如数据库存储)缓存到流式处理应用程序里
流和流的连接
有些场景下,需要连接两个真实的事件流。
将两个流里具有相同键和发生在相同时间窗口内的事件匹配起来。这就是为什么流和流的连接也叫作基于时间窗口的连接( windowed-join )。
乱序的事件
不管是对于流式处理还是传统的 ETL 系统来说,处理乱序事件都是一个挑战。
要让流处理应用程序处理好这些场景,需要做到以下几点:
- 识别乱序的事件。应用程序需要检查事件的时间,并将其与当前时间进行比较。
- 规定一个时间段用于重排乱序的事件。比如 3 个小时以内的事件可以重排,但 3 周以外的事件就可以直接扔掉。
- 具有在一定时间段内重排乱序事件的能力。这是流式处理应用与批处理作业的一个主要不同点。假设有一个每天运行的作业, 一些事件在作业结束之后才到达,那么可以重新运行昨天的作业来更新事件。而在流式处理中,“重新运行昨天的作业”这种情况是不存在的,乱序事件和新到达的事件必须一起处理。
- 具备更新结果的能力。如果处理的结果保存到数据库里,那么可以通过 put 或 update 对结果进行更新。如果流应用程序通过邮件发送结果,那么要对结果进行更新,就需要很巧妙的手段。
重新处理
有两种模式:
模式一:使用新版本应用处理同一个事件流,生成新的结果,并比较两种版本的结果,然后在某个时间点将客户端切换到新的结果流上。
模式二:重置应用,让应用回到输入流的起始位置开始处理,同时重置本地状态(这样就不会将两个版本应用的处理结果棍淆起来了),而且还可能需要清理之前的输出流。
Kafka Streams 架构
每个流式应用程序至少会实现和执行一个拓扑。拓扑(在其他流式处理框架里叫作 DAG,即有向无环图)是一个操作和变换的集合,每个事件从输入到输出都会流经它。

分区和任务
Kafka 的消息传递层对数据进行分区以进行存储和传输。 Kafka Streams 对数据进行分区以进行处理。Kafka Streams 使用分区和任务的概念作为基于 Kafka 主题分区的并行模型的逻辑单元。
每个流分区都是数据记录的完全有序序列,并映射到 Kafka 主题分区。流中的数据记录映射到该主题的 Kafka 消息。更具体地说,Kafka Streams 根据应用程序的输入流分区创建固定数量的任务,每个任务分配了输入流中的分区列表(即 Kafka 主题)。分区对任务的分配永远不会改变,因此每个任务都是应用程序并行性的固定单元。然后,任务可以根据分配的分区实例化其自己的处理器拓扑。它们还为其分配的每个分区维护一个缓冲区,并一次从这些记录缓冲区处理消息。结果,可以在没有人工干预的情况下独立且并行地处理流任务。

Coordinator/controller/分区器/leader
- Controller 管集群全局的 Topic / 分区元数据
- Leader 管单个分区的读写
- Coordinator 管消费组的成员和偏移量
- 分区器管生产者的消息投递目标
| 角色 | 核心职责 | 作用场景 | 执行 / 运行位置 | 关键关联对象 |
|---|---|---|---|---|
| Controller 控制器 | 1. 管理所有 Topic 的分区数量、副本分布; 2. 选举每个分区的 Leader; 3. 同步元数据给所有 Broker | 集群启动、Topic 创建 / 扩容、Broker 宕机 | 集群中一台 Broker(选举产生,宕机自动切换) | 所有 Topic、所有 Broker、分区副本 |
| Leader 分区主副本 | 1. 处理对应分区的所有生产 / 消费请求; 2. 同步数据到 Follower 副本; 3. 维护分区的消息日志 | 消息生产、消息消费 | 某台 Broker(每个分区的 Leader 可不同) | 单个分区的 Follower 副本、生产者、消费者 |
| Coordinator 消费组协调者 | 1. 管理消费组成员的加入 / 退出; 2. 触发消费组的分区再均衡; 3. 存储消费组的已提交偏移量 | 消费者加入消费组、分区分配、偏移量提交 | 某台 Broker(由 hash(消费组ID) 计算得出) | 消费组、消费者、__consumer_offsets 主题 |
| 分区器 Partitioner | 1. 为生产者的每条消息计算目标分区; 2. 支持默认策略(按 Key 哈希 / 轮询)或自定义策略 | 生产者发送消息前 | 生产者客户端本地(完全在客户端执行) | 生产者、ProducerRecord(消息) |
Kafka 运维
单点部署
下载解压
进入官方下载地址:http://kafka.apache.org/downloads
解压到本地:
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0
现在您已经在您的机器上下载了最新版本的 Kafka。
启动服务器
由于 Kafka 依赖于 ZooKeeper,所以运行前需要先启动 ZooKeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
然后,启动 Kafka
$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
停止服务器
执行所有操作后,可以使用以下命令停止服务器
bin/kafka-server-stop.sh config/server.properties
集群部署
修改配置
复制配置为多份(Windows 使用 copy 命令代理):
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改配置:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
其中,broker.id 这个参数必须是唯一的。
端口故意配置的不一致,是为了可以在一台机器启动多个应用节点。
启动
根据这两份配置启动三个服务器节点:
$ bin/kafka-server-start.sh config/server.properties &
...
$ bin/kafka-server-start.sh config/server-1.properties &
...
$ bin/kafka-server-start.sh config/server-2.properties &
...
创建一个新的 Topic 使用 三个备份:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看主题:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
- leader - 负责指定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
- replicas - 是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
- isr - 是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
常用命令
主题(Topic)
- 创建 Topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
- 查看 Topic 列表
kafka-topics --list --zookeeper localhost:2181
- 添加 Partition
kafka-topics --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
- 删除 Topic
kafka-topics --zookeeper localhost:2181 --delete --topic my-topic
- 查看 Topic 详细信息
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe
- 查看备份分区
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions
生产者(Producers)
- 通过控制台输入生产消息
kafka-console-producer --broker-list localhost:9092 --topic my-topic
- 通过文件输入生产消息
kafka-console-producer --broker-list localhost:9092 --topic test < messages.txt
- 通过控制台输入 Avro 生产消息
kafka-avro-console-producer --broker-list localhost:9092 --topic my.Topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=http://localhost:8081
- 然后,可以选择输入部分 json key:
{ "f1": "value1" }
- 生成消息性能测试
kafka-producer-perf-test --topic position-reports --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap.servers="localhost:9092"
消费者(Consumers)
- 消费所有未消费的消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning
- 消费一条消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --max-messages 1
- 从指定的 offset
__consumer_offsets消费一条消息:
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1
- 从指定 Group 消费消息
kafka-console-consumer --topic my-topic --new-consumer --bootstrap-server localhost:9092 --consumer-property group.id=my-group
- 消费 avro 消息
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081 --max-messages 10
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081
- 查看消费者 Group 列表
kafka-consumer-groups --new-consumer --list --bootstrap-server localhost:9092
- 查看消费者 Group 详细信息
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testgroup
配置(Config)
- 设置 Topic 的保留时间
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000
- 查看 Topic 的所有配置
kafka-configs --zookeeper localhost:2181 --describe --entity-type topics --entity-name my-topic
- 修改 Topic 的配置
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
ACL(访问控制列表)
- 查看指定 Topic 的 ACL
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic topicA
- 添加 ACL
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic topicA --group groupA
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic topicA
ZooKeeper
zookeeper-shell localhost:2182 ls /
工具
核心配置
Broker 级别配置
存储配置
首先 Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:
log.dirs:指定了 Broker 需要使用的若干个文件目录路径。这个参数是没有默认值的,必须由使用者亲自指定。log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
log.dirs 具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如 /home/kafka1,/home/kafka2,/home/kafka3 这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。
zookeeper 配置
Kafka 与 ZooKeeper 相关的最重要的参数当属 zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为 zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。
现在问题来了,如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。
如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的 zookeeper.connect 参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1 和 zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。
Broker 连接配置
listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
我们具体说说监听器的概念,从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为 <协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如 CONTROLLER: //localhost:9092。
最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。
Topic 管理
auto.create.topics.enable:是否允许自动创建 Topic。一般设为 false,由运维把控创建 Topic。unclean.leader.election.enable:是否允许 Unclean Leader 选举。auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。
第二个参数 unclean.leader.election.enable 是关闭 Unclean Leader 选举的。何谓 Unclean?还记得 Kafka 有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。那么问题来了,这些副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。好了,现在出现这种情况了:假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行 Leader 选举了?此时这个参数就派上用场了。如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的。
这个参数在最新版的 Kafka 中默认就是 false,本来不需要我特意提的,但是比较搞笑的是社区对这个参数的默认值来来回回改了好几版了,鉴于我不知道你用的是哪个版本的 Kafka,所以建议你还是显式地把它设置成 false 吧。
第三个参数 auto.leader.rebalance.enable 的影响貌似没什么人提,但其实对生产环境影响非常大。设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若 auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。
你要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。
数据留存
log.retention.{hour|minutes|ms}:都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大。log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。message.max.bytes:控制 Broker 能够接收的最大消息大小。默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。
Topic 级别配置
retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
操作系统参数
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如 ulimit -n 1000000。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。
其次是文件系统类型的选择。这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 ZFS 的 数据报告,貌似性能更加强劲,有条件的话不妨一试。
第三是 swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
最后是提交时间或者说是 Flush 落盘时间。向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。
集群规划
操作系统
部署生产环境的 Kafka,强烈建议操作系统选用 Linux。在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。
磁盘
Kafka 集群部署选择普通的机械磁盘还是固态硬盘?前者成本低且容量大,但易损坏;后者性能优势大,不过单价高。
结论是:使用普通机械硬盘即可。
Kafka 采用顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。从这一点上来说,使用 SSD 似乎并没有太大的性能优势,毕竟从性价比上来说,机械磁盘物美价廉,而它因易损坏而造成的可靠性差等缺陷,又由 Kafka 在软件层面提供机制来保证,故使用普通机械磁盘是很划算的。
带宽
大部分公司使用普通的以太网络,千兆网络(1Gbps)应该是网络的标准配置。
通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其他应用或进程留一些资源。此外,通常要再额外预留出 2/3 的资源,因为不能让带宽资源总是保持在峰值。
基于以上原因,一个 Kafka 集群数量的大致推算公式如下:
Kafka 机器数 = 单位时间需要处理的总数据量 / 单机所占用带宽
使用指导
Kafka 服务端
- 获取、启动 Kafka
下载最新的 Kafka 版本并解压到本地:
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0
保证本地必须已安装 Java。执行以下指令,保证所有服务按照正确的顺序启动:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话,并执行:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务成功启动,您就已经成功运行了一个基本的 kafka 环境。
- 创建一个 Topic 并存储事件
Kafka 是一个分布式事件流处理平台,它可以让您通过各种机制读、写、存储并处理事件(events 被称为记录或消息)
示例事件包括付款交易,手机的地理位置更新,运输订单,物联网设备或医疗设备的传感器测量等等。 这些事件被组织并存储在主题中(topics)。 简单来说,主题类似于文件系统中的文件夹,而事件是该文件夹中的文件。
因此,在您写入第一个事件之前,您必须先创建一个 Topic。执行以下指令:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
所有的 Kafka 命令行工具都有附加可选项:不加任何参数,运行 kafka-topics.sh 命令会显示使用信息。例如,会显示新 Topic 的分区数等细节。
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- 向 Topic 写入 Event
Kafka 客户端和 Kafka Broker 的通信是通过网络读写 Event。一旦收到信息,Broker 会将其以您需要的时间(甚至永久化)、容错化的方式存储。
执行 kafka-console-producer.sh 命令将 Event 写入 Topic。默认,您输入的任意行会作为独立 Event 写入 Topic:
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以通过 Ctrl-C 在任何时候中断 kafka-console-producer.sh
- 读取 Event
执行 kafka-console-consumer.sh 以读取写入 Topic 中的 Event
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
由于 Event 被持久化存储在 Kafka 中,因此您可以根据需要任意多次地读取它们。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松地验证这一点。
- 通过 Kafka Connect 将数据作为事件流导入/导出
您可能有大量数据,存储在传统的关系数据库或消息队列系统中,并且有许多使用这些系统的应用程序。 通过 Kafka Connect,您可以将来自外部系统的数据持续地导入到 Kafka 中,反之亦然。 因此,将已有系统与 Kafka 集成非常容易。为了使此过程更加容易,有数百种此类连接器可供使用。
需要了解有关如何将数据导入和导出 Kafka 的更多信息,可以参考:Kafka Connect section。
- 使用 Kafka Streams 处理事件
一旦将数据作为 Event 存储在 Kafka 中,就可以使用 Kafka Streams 的 Java / Scala 客户端。它允许您实现关键任务的实时应用程序和微服务,其中输入(和/或)输出数据存储在 Kafka Topic 中。
Kafka Streams 结合了 Kafka 客户端编写和部署标准 Java 和 Scala 应用程序的简便性,以及 Kafka 服务器集群技术的优势,使这些应用程序具有高度的可伸缩性、弹性、容错性和分布式。该库支持一次性处理,有状态的操作,以及聚合、窗口化化操作、join、基于事件时间的处理等等。
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));
- 终止 Kafka 环境
- 如果尚未停止,请使用
Ctrl-C停止生产者和消费者客户端。 - 使用
Ctrl-C停止 Kafka 代理。 - 最后,使用
Ctrl-C停止 ZooKeeper 服务器。
如果您还想删除本地 Kafka 环境的所有数据,包括您在此过程中创建的所有事件,请执行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
Kafka 客户端
- Maven 依赖
Stream API 的 maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
其他 API 的 maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
- Kafka 核心 API
Kafka 有 5 个核心 API
- Producer API- 允许一个应用程序发布一串流式数据到一个或者多个 Kafka Topic。
- Consumer API- 允许一个应用程序订阅一个或多个 Kafka Topic,并且对发布给他们的流式数据进行处理。
- Streams API- 允许一个应用程序作为一个流处理器,消费一个或者多个 Kafka Topic 产生的输入流,然后生产一个输出流到一个或多个 Kafka Topic 中去,在输入输出流中进行有效的转换。
- Connector API- 允许构建并运行可重用的生产者或者消费者,将 Kafka Topic 连接到已存在的应用程序或数据库。例如,连接到一个关系型数据库,捕捉表的所有变更内容。
- Admin API- 支持管理和检查 Topic,Broker,ACL 和其他 Kafka 对象
- 发送消息
- 发送并忽略返回
代码如下,直接通过 send 方法来发送
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
- 同步发送
代码如下,与“发送并忘记”的方式区别在于多了一个 get 方法,会一直阻塞等待 Broker 返回结果:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
- 异步发送
代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如记录错误或者成功日志。
首先,定义一个 callback
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
然后,使用这个 callback
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
- 发送消息示例
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Kafka 生产者生产消息示例 生产者配置参考:https://kafka.apache.org/documentation/#producerconfigs
*/
public class ProducerDemo {
private static final String HOST = "localhost:9092";
public static void main(String[] args) {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
// 3. 使用 send 方法发送异步消息
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者
producer.close();
}
}
}
- 消费消息流程
具体步骤如下:
- 创建消费者。
- 订阅主题。除了订阅主题方式外还有使用指定分组的模式,但是常用方式都是订阅主题方式
- 轮询消息。通过 poll 方法轮询。
- 关闭消费者。在不用消费者之后,会执行 close 操作。close 操作会关闭 socket,并触发当前消费者群组的再均衡。
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();
// 2.设置主题
consumer.subscribe(Arrays.asList(topic));
// 3.接受消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("customer Message---");
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
} finally {
// 4.关闭消息
consumer.close();
}
创建消费者的代码如下:
public Consumer buildCustomer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
return consumer;
}
消费消息方式,分为订阅主题和指定分组两种方式:
- 消费者分组模式。通过订阅主题方式时,消费者必须加入到消费者群组中,即消费者必须有一个自己的分组;
- 独立消费者模式。这种模式就是消费者是独立的不属于任何消费者分组,自己指定消费那些
Partition。
(1)订阅主题方式
consumer.subscribe(Arrays.asList(topic));
(2)独立消费者模式
通过 consumer 的 assign(Collection<TopicPartition> partitions) 方法来为消费者指定分区。
public void consumeMessageForIndependentConsumer(String topic){
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();
// 2.指定分区
// 2.1获取可用分区
List<PartitionInfo> partitionInfoList = buildCustomer().partitionsFor(topic);
// 2.2指定分区,这里是指定了所有分区,也可以指定个别的分区
if(null != partitionInfoList){
List<TopicPartition> partitions = Lists.newArrayList();
for(PartitionInfo partitionInfo : partitionInfoList){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(partitions);
}
// 3.接受消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("consume Message---");
for (ConsumerRecord<String, String> record : records) {
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
// 异步提交
consumer.commitAsync();
}
}
}