Appearance
Kafka 极简教程
一、基础理论
1. 定位
- 分布式高吞吐消息队列:基于 发布订阅模型 的流式处理平台,承担微服务间的异步解耦与削峰填谷。
- Spring 生态集成:通过
spring-kafka原生接入;Spring Cloud Stream 以 Kafka Binder 作为默认消息中间件之一。 - 日志/事件驱动首选:天生为顺序写磁盘设计,吞吐量极高,适合日志收集、事件溯源、流处理场景。
2. 核心架构
| 角色 | 说明 |
|---|---|
| Producer | 生产者,向 Topic 发送消息。 |
| Consumer | 消费者,从 Topic 订阅并处理消息。 |
| Broker | Kafka 服务器节点,负责存储和转发消息。多个 Broker 组成集群。 |
| ZooKeeper / KRaft | 旧版用 ZK 协调元数据;Kafka 2.8+ / 3.x 引入 KRaft 模式去除 ZK 依赖。 |
3. 核心概念
| 概念 | 说明 |
|---|---|
| Topic | 逻辑消息主题,生产者发送、消费者订阅的目标。 |
| Partition | Topic 的物理分片,消息按 Key Hash 或轮询写入不同 Partition。Partition 内消息有序。 |
| Offset | 消息在 Partition 内的唯一位移标识;消费者用 Offset 记录消费进度。 |
| Consumer Group | 消费者组;组内广播消费(一条消息只被组内一个消费者消费),组间共享(多组可同时消费同一条消息)。 |
| Replication Factor | 副本数;Partition 可在多个 Broker 上保存副本,Leader 负责读写,Follower 同步数据。 |
| ISR | In-Sync Replicas,与 Leader 保持同步的副本集合;ISR 内的 Follower 才有资格竞选新 Leader。 |
4. 生产者 ACK 机制
| ACK 值 | 行为 | 可靠性 | 吞吐量 |
|---|---|---|---|
acks=0 | 发完不管,不等 Broker 确认 | 最低 | 最高 |
acks=1 | Leader 写入成功即确认 | 中 | 中 |
acks=all(默认) | Leader + 所有 ISR Follower 写入成功才确认 | 最高 | 较低 |
生产环境推荐
acks=all+retries=3+enable.idempotence=true(幂等生产者)。
5. 消费者提交 Offset
| 方式 | 说明 |
|---|---|
| 自动提交 | enable.auto.commit=true,按 auto.commit.interval.ms 周期提交。简单但可能丢消息或重复消费。 |
| 手动同步提交 | consumer.commitSync();处理完业务后立即提交,可靠但阻塞。 |
| 手动异步提交 | consumer.commitAsync();非阻塞,吞吐量高,可能重复消费。 |
推荐:关闭自动提交,业务处理成功后手动异步提交;配合幂等/去重机制防重复消费。
6. Rebalance(分区重分配)
- 消费者组内成员变化(新消费者加入 / 消费者退出 / 消费超时)时,Coordinator 触发 Rebalance,重新分配 Partition。
- 问题:Rebalance 期间整个组暂停消费。
- 优化:合理设置
session.timeout.ms和heartbeat.interval.ms;使用静态成员(group.instance.id)减少不必要的 Rebalance。
二、工作实践(复制即用)
1. 基础依赖
xml
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>2. 基础配置(application.yml)
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: all # 最高可靠性
retries: 3 # 发送失败重试
batch-size: 16384 # 批量发送大小
buffer-memory: 33554432 # 缓冲区 32MB
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
enable.idempotence: true # 幂等生产者,防重复发送
consumer:
group-id: order-consumer-group
auto-offset-reset: earliest # 无初始偏移时从最早开始(可选 latest)
enable-auto-commit: false # 关闭自动提交,改手动
max-poll-records: 500 # 单次拉取最大条数
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate # 手动立即确认
concurrency: 3 # 并发消费者线程数3. 生产者发送(KafkaTemplate)
java
@Service
@Slf4j
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 同步发送(简单场景)
*/
public void sendSync(String topic, String key, String message) {
try {
SendResult<String, String> result = kafkaTemplate.send(topic, key, message).get();
log.info("发送成功: topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} catch (Exception e) {
log.error("发送失败", e);
}
}
/**
* 异步发送 + 回调(推荐生产用)
*/
public void sendAsync(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送失败: topic={}, key={}", topic, key, ex);
} else {
log.info("发送成功: partition={}, offset={}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
/**
* 发送带 Header 的消息(如透传 traceId)
*/
public void sendWithHeader(String topic, String message, String traceId) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
record.headers().add("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(record);
}
}4. 消费者消费(@KafkaListener)
手动提交 + 业务异常处理
java
@Component
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// 业务处理
processOrder(record.value());
// 业务成功后再提交 Offset
ack.acknowledge();
} catch (Exception e) {
log.error("消费异常, 不提交 Offset, 消息将进入重试或死信: offset={}", record.offset(), e);
// 不调用 ack.acknowledge(),消息不会被确认,Spring Kafka 会根据配置决定重试
}
}
private void processOrder(String message) {
// 业务逻辑...
}
}5. 批量消费(提升吞吐量)
java
@Component
@Slf4j
public class BatchKafkaConsumer {
@KafkaListener(topics = "log-topic", groupId = "log-consumer-group")
public void consumeBatch(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
log.info("本批收到 {} 条消息", records.size());
for (ConsumerRecord<String, String> record : records) {
try {
processLog(record.value());
} catch (Exception e) {
log.error("单条处理失败: offset={}", record.offset(), e);
// 建议:失败记录写入异常表,不要影响整批提交
}
}
ack.acknowledge();
}
private void processLog(String value) {
// 批量处理逻辑
}
}配合
spring.kafka.consumer.max-poll-records调整批量大小。
6. 死信队列(DLQ)——失败消息不阻塞主流程
配置 + 消费异常处理器
java
@Configuration
public class KafkaConfig {
/**
* 消费异常时,将失败消息发送到死信 Topic
*/
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<String, String> operations) {
// 最多重试 3 次(间隔 1s),仍失败则发送到 DLT
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(operations),
new FixedBackOff(1000L, 3L)
);
// 不阻塞其他分区
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
}消费者应用 ErrorHandler
java
@Component
@Slf4j
public class OrderConsumer {
@KafkaListener(topics = "order-topic", groupId = "order-consumer-group",
errorHandler = "kafkaListenerErrorHandler")
public void consume(String message) {
processOrder(message);
}
}死信 Topic 命名规则:
order-topic.DLT,可在 UI 或控制台监控死信重放。
7. JSON 序列化(对象消息)
配置序列化器
yaml
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.example.dto # 允许反序列化的包路径发送对象
java
@Service
public class EventProducer {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void sendOrderEvent(OrderEvent event) {
kafkaTemplate.send("order-event-topic", event.getOrderId(), event);
}
}消费对象
java
@KafkaListener(topics = "order-event-topic", groupId = "event-consumer")
public void consumeOrderEvent(ConsumerRecord<String, OrderEvent> record) {
OrderEvent event = record.value();
// 处理...
}8. Spring Cloud Stream Kafka Binder(函数式,云原生推荐)
Spring Cloud Stream 以 Binder 抽象消息中间件,代码不耦合 Kafka 具体 API。
依赖
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>函数式编程模型(Spring Cloud Stream 3.x+ 推荐)
java
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
/**
* Consumer:接收 order-in-0 通道的消息
*/
@Bean
public Consumer<OrderEvent> orderIn() {
return event -> {
System.out.println("收到订单事件: " + event.getOrderId());
};
}
/**
* Supplier:定时向 order-out-0 通道发送消息
*/
@Bean
public Supplier<OrderEvent> orderOut() {
return () -> new OrderEvent("ORD-" + System.currentTimeMillis(), 199.99);
}
/**
* Function:接收 input 处理后发送到 output
*/
@Bean
public Function<OrderEvent, OrderEvent> processOrder() {
return event -> {
event.setStatus("PROCESSED");
return event;
};
}
}application.yml
yaml
spring:
cloud:
stream:
function:
definition: orderIn;processOrder # 声明激活的函数
bindings:
orderIn-in-0:
destination: order-topic # 绑定到 Kafka topic
group: order-consumer-group
processOrder-in-0:
destination: raw-order-topic
processOrder-out-0:
destination: processed-order-topic
kafka:
binder:
brokers: localhost:9092函数名命名规则:
orderIn→ 输入通道orderIn-in-0,输出通道orderIn-out-0。
三、核心速查表
| 概念/配置 | 说明 |
|---|---|
bootstrap-servers | Kafka Broker 地址,集群用逗号分隔 |
acks=all | 生产者最高可靠性确认 |
enable.idempotence=true | 幂等生产者,防网络抖动导致重复发送 |
auto.offset.reset | 无初始偏移时:earliest 从头 / latest 从最新 |
enable-auto-commit: false | 关闭自动提交,推荐手动 ack |
ack-mode: manual_immediate | 手动立即确认,业务成功后提交 Offset |
max-poll-records | 单次拉取最大消息数,配合批量消费 |
concurrency | 消费者并发线程数,建议 ≤ Partition 数 |
group-id | 消费者组;同组内一条消息只被一个消费者消费 |
KafkaTemplate | 生产者发送模板 |
@KafkaListener | 消费者注解,指定 topic 和 groupId |
ConsumerRecord | 单条消息封装,含 key/value/offset/partition |
Acknowledgment | 手动确认对象,ack.acknowledge() 提交 Offset |
DeadLetterPublishingRecoverer | 死信发布恢复器,失败消息转 DLT |
DefaultErrorHandler | Spring Kafka 消费异常处理器 |
四、排错速查
| 现象 | 解决 |
|---|---|
| 消费者启动不了 / 连不上 Kafka | 1. 检查 bootstrap-servers 地址和端口 90922. 检查 Kafka 是否正常监听( advertised.listeners 配置) |
| 消息发送成功但消费者收不到 | 1. 检查 Topic 名是否一致 2. 检查 group-id 是否被其他消费者占用了 Offset3. 检查 auto.offset.reset,若为 latest 且已存在消费组,不会消费旧消息 |
| 消息重复消费 | 1. 检查是否手动提交 Offset,异常时未提交导致重拉 2. 开启生产者幂等 enable.idempotence=true3. 消费者做业务幂等(如唯一键/去重表) |
| 消息丢失 | 1. 生产者 acks 是否为 all2. 是否手动提交 Offset 但在业务处理前提交(应该先处理业务再 ack) 3. 检查 retries 是否为 0 |
| 消费速度慢 / 积压 | 1. 增加 concurrency(不超过 Partition 数)2. 开启批量消费 max-poll-records 配合 List 参数3. 业务逻辑异步化(注意保证顺序需求) |
| 消息顺序错乱 | 1. 保证同 Key 的消息发到同一 Partition(Kafka 会按 Key Hash 路由) 2. 一个 Partition 只由一个消费者线程消费 |
| Rebalance 频繁 | 1. 检查消费者是否频繁上下线 2. 设置 group.instance.id(静态成员)3. 调整 session.timeout.ms 和 heartbeat.interval.ms |
| 序列化异常 | 1. 检查序列化器与反序列化器是否匹配 2. JsonDeserializer 需配置 spring.json.trusted.packages |
| Spring Cloud Stream 函数不触发 | 1. 检查 spring.cloud.stream.function.definition 是否声明了函数名2. 检查绑定名格式: functionName-in-0 / functionName-out-0 |
| Kafka 与 Spring Boot 版本冲突 | Spring Boot 3.x 对应 spring-kafka 3.x 和 Kafka Client 3.x;版本不匹配可能抛 NoSuchMethodError |