Skip to content

Kafka 极简教程


一、基础理论

1. 定位

  • 分布式高吞吐消息队列:基于 发布订阅模型 的流式处理平台,承担微服务间的异步解耦削峰填谷
  • Spring 生态集成:通过 spring-kafka 原生接入;Spring Cloud Stream 以 Kafka Binder 作为默认消息中间件之一。
  • 日志/事件驱动首选:天生为顺序写磁盘设计,吞吐量极高,适合日志收集事件溯源流处理场景。

2. 核心架构

角色说明
Producer生产者,向 Topic 发送消息。
Consumer消费者,从 Topic 订阅并处理消息。
BrokerKafka 服务器节点,负责存储和转发消息。多个 Broker 组成集群。
ZooKeeper / KRaft旧版用 ZK 协调元数据;Kafka 2.8+ / 3.x 引入 KRaft 模式去除 ZK 依赖。

3. 核心概念

概念说明
Topic逻辑消息主题,生产者发送、消费者订阅的目标。
PartitionTopic 的物理分片,消息按 Key Hash 或轮询写入不同 Partition。Partition 内消息有序
Offset消息在 Partition 内的唯一位移标识;消费者用 Offset 记录消费进度。
Consumer Group消费者组;组内广播消费(一条消息只被组内一个消费者消费),组间共享(多组可同时消费同一条消息)。
Replication Factor副本数;Partition 可在多个 Broker 上保存副本,Leader 负责读写,Follower 同步数据。
ISRIn-Sync Replicas,与 Leader 保持同步的副本集合;ISR 内的 Follower 才有资格竞选新 Leader。

4. 生产者 ACK 机制

ACK 值行为可靠性吞吐量
acks=0发完不管,不等 Broker 确认最低最高
acks=1Leader 写入成功即确认
acks=all(默认)Leader + 所有 ISR Follower 写入成功才确认最高较低

生产环境推荐 acks=all + retries=3 + enable.idempotence=true(幂等生产者)。

5. 消费者提交 Offset

方式说明
自动提交enable.auto.commit=true,按 auto.commit.interval.ms 周期提交。简单但可能丢消息或重复消费。
手动同步提交consumer.commitSync();处理完业务后立即提交,可靠但阻塞。
手动异步提交consumer.commitAsync();非阻塞,吞吐量高,可能重复消费。

推荐:关闭自动提交,业务处理成功后手动异步提交;配合幂等/去重机制防重复消费。

6. Rebalance(分区重分配)

  • 消费者组内成员变化(新消费者加入 / 消费者退出 / 消费超时)时,Coordinator 触发 Rebalance,重新分配 Partition。
  • 问题:Rebalance 期间整个组暂停消费
  • 优化:合理设置 session.timeout.msheartbeat.interval.ms;使用静态成员group.instance.id)减少不必要的 Rebalance。

二、工作实践(复制即用)

1. 基础依赖

xml
<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 基础配置(application.yml)

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: all                    # 最高可靠性
      retries: 3                   # 发送失败重试
      batch-size: 16384            # 批量发送大小
      buffer-memory: 33554432      # 缓冲区 32MB
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        enable.idempotence: true     # 幂等生产者,防重复发送
    consumer:
      group-id: order-consumer-group
      auto-offset-reset: earliest   # 无初始偏移时从最早开始(可选 latest)
      enable-auto-commit: false    # 关闭自动提交,改手动
      max-poll-records: 500        # 单次拉取最大条数
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual_immediate   # 手动立即确认
      concurrency: 3               # 并发消费者线程数

3. 生产者发送(KafkaTemplate)

java
@Service
@Slf4j
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 同步发送(简单场景)
     */
    public void sendSync(String topic, String key, String message) {
        try {
            SendResult<String, String> result = kafkaTemplate.send(topic, key, message).get();
            log.info("发送成功: topic={}, partition={}, offset={}",
                    result.getRecordMetadata().topic(),
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
        } catch (Exception e) {
            log.error("发送失败", e);
        }
    }

    /**
     * 异步发送 + 回调(推荐生产用)
     */
    public void sendAsync(String topic, String key, String message) {
        kafkaTemplate.send(topic, key, message)
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        log.error("发送失败: topic={}, key={}", topic, key, ex);
                    } else {
                        log.info("发送成功: partition={}, offset={}",
                                result.getRecordMetadata().partition(),
                                result.getRecordMetadata().offset());
                    }
                });
    }

    /**
     * 发送带 Header 的消息(如透传 traceId)
     */
    public void sendWithHeader(String topic, String message, String traceId) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        record.headers().add("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8));
        kafkaTemplate.send(record);
    }
}

4. 消费者消费(@KafkaListener)

手动提交 + 业务异常处理

java
@Component
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());

            // 业务处理
            processOrder(record.value());

            // 业务成功后再提交 Offset
            ack.acknowledge();
        } catch (Exception e) {
            log.error("消费异常, 不提交 Offset, 消息将进入重试或死信: offset={}", record.offset(), e);
            // 不调用 ack.acknowledge(),消息不会被确认,Spring Kafka 会根据配置决定重试
        }
    }

    private void processOrder(String message) {
        // 业务逻辑...
    }
}

5. 批量消费(提升吞吐量)

java
@Component
@Slf4j
public class BatchKafkaConsumer {

    @KafkaListener(topics = "log-topic", groupId = "log-consumer-group")
    public void consumeBatch(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        log.info("本批收到 {} 条消息", records.size());

        for (ConsumerRecord<String, String> record : records) {
            try {
                processLog(record.value());
            } catch (Exception e) {
                log.error("单条处理失败: offset={}", record.offset(), e);
                // 建议:失败记录写入异常表,不要影响整批提交
            }
        }

        ack.acknowledge();
    }

    private void processLog(String value) {
        // 批量处理逻辑
    }
}

配合 spring.kafka.consumer.max-poll-records 调整批量大小。


6. 死信队列(DLQ)——失败消息不阻塞主流程

配置 + 消费异常处理器

java
@Configuration
public class KafkaConfig {

    /**
     * 消费异常时,将失败消息发送到死信 Topic
     */
    @Bean
    public DefaultErrorHandler errorHandler(KafkaOperations<String, String> operations) {
        // 最多重试 3 次(间隔 1s),仍失败则发送到 DLT
        DefaultErrorHandler handler = new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(operations),
                new FixedBackOff(1000L, 3L)
        );
        // 不阻塞其他分区
        handler.addNotRetryableExceptions(IllegalArgumentException.class);
        return handler;
    }
}

消费者应用 ErrorHandler

java
@Component
@Slf4j
public class OrderConsumer {

    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group",
                   errorHandler = "kafkaListenerErrorHandler")
    public void consume(String message) {
        processOrder(message);
    }
}

死信 Topic 命名规则:order-topic.DLT,可在 UI 或控制台监控死信重放。


7. JSON 序列化(对象消息)

配置序列化器

yaml
spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.example.dto   # 允许反序列化的包路径

发送对象

java
@Service
public class EventProducer {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void sendOrderEvent(OrderEvent event) {
        kafkaTemplate.send("order-event-topic", event.getOrderId(), event);
    }
}

消费对象

java
@KafkaListener(topics = "order-event-topic", groupId = "event-consumer")
public void consumeOrderEvent(ConsumerRecord<String, OrderEvent> record) {
    OrderEvent event = record.value();
    // 处理...
}

8. Spring Cloud Stream Kafka Binder(函数式,云原生推荐)

Spring Cloud Stream 以 Binder 抽象消息中间件,代码不耦合 Kafka 具体 API。

依赖

xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

函数式编程模型(Spring Cloud Stream 3.x+ 推荐)

java
@SpringBootApplication
public class StreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }

    /**
     * Consumer:接收 order-in-0 通道的消息
     */
    @Bean
    public Consumer<OrderEvent> orderIn() {
        return event -> {
            System.out.println("收到订单事件: " + event.getOrderId());
        };
    }

    /**
     * Supplier:定时向 order-out-0 通道发送消息
     */
    @Bean
    public Supplier<OrderEvent> orderOut() {
        return () -> new OrderEvent("ORD-" + System.currentTimeMillis(), 199.99);
    }

    /**
     * Function:接收 input 处理后发送到 output
     */
    @Bean
    public Function<OrderEvent, OrderEvent> processOrder() {
        return event -> {
            event.setStatus("PROCESSED");
            return event;
        };
    }
}

application.yml

yaml
spring:
  cloud:
    stream:
      function:
        definition: orderIn;processOrder   # 声明激活的函数
      bindings:
        orderIn-in-0:
          destination: order-topic        # 绑定到 Kafka topic
          group: order-consumer-group
        processOrder-in-0:
          destination: raw-order-topic
        processOrder-out-0:
          destination: processed-order-topic
      kafka:
        binder:
          brokers: localhost:9092

函数名命名规则:orderIn → 输入通道 orderIn-in-0,输出通道 orderIn-out-0


三、核心速查表

概念/配置说明
bootstrap-serversKafka Broker 地址,集群用逗号分隔
acks=all生产者最高可靠性确认
enable.idempotence=true幂等生产者,防网络抖动导致重复发送
auto.offset.reset无初始偏移时:earliest 从头 / latest 从最新
enable-auto-commit: false关闭自动提交,推荐手动 ack
ack-mode: manual_immediate手动立即确认,业务成功后提交 Offset
max-poll-records单次拉取最大消息数,配合批量消费
concurrency消费者并发线程数,建议 ≤ Partition 数
group-id消费者组;同组内一条消息只被一个消费者消费
KafkaTemplate生产者发送模板
@KafkaListener消费者注解,指定 topic 和 groupId
ConsumerRecord单条消息封装,含 key/value/offset/partition
Acknowledgment手动确认对象,ack.acknowledge() 提交 Offset
DeadLetterPublishingRecoverer死信发布恢复器,失败消息转 DLT
DefaultErrorHandlerSpring Kafka 消费异常处理器

四、排错速查

现象解决
消费者启动不了 / 连不上 Kafka1. 检查 bootstrap-servers 地址和端口 9092
2. 检查 Kafka 是否正常监听(advertised.listeners 配置)
消息发送成功但消费者收不到1. 检查 Topic 名是否一致
2. 检查 group-id 是否被其他消费者占用了 Offset
3. 检查 auto.offset.reset,若为 latest 且已存在消费组,不会消费旧消息
消息重复消费1. 检查是否手动提交 Offset,异常时未提交导致重拉
2. 开启生产者幂等 enable.idempotence=true
3. 消费者做业务幂等(如唯一键/去重表)
消息丢失1. 生产者 acks 是否为 all
2. 是否手动提交 Offset 但在业务处理前提交(应该先处理业务再 ack)
3. 检查 retries 是否为 0
消费速度慢 / 积压1. 增加 concurrency(不超过 Partition 数)
2. 开启批量消费 max-poll-records 配合 List 参数
3. 业务逻辑异步化(注意保证顺序需求)
消息顺序错乱1. 保证同 Key 的消息发到同一 Partition(Kafka 会按 Key Hash 路由)
2. 一个 Partition 只由一个消费者线程消费
Rebalance 频繁1. 检查消费者是否频繁上下线
2. 设置 group.instance.id(静态成员)
3. 调整 session.timeout.msheartbeat.interval.ms
序列化异常1. 检查序列化器与反序列化器是否匹配
2. JsonDeserializer 需配置 spring.json.trusted.packages
Spring Cloud Stream 函数不触发1. 检查 spring.cloud.stream.function.definition 是否声明了函数名
2. 检查绑定名格式:functionName-in-0 / functionName-out-0
Kafka 与 Spring Boot 版本冲突Spring Boot 3.x 对应 spring-kafka 3.x 和 Kafka Client 3.x;版本不匹配可能抛 NoSuchMethodError