找回密码
 立即注册
首页 业界区 安全 Apache Kafka实战:Spring Boot消息队列完整指南 ...

Apache Kafka实战:Spring Boot消息队列完整指南

斜素欣 昨天 22:00
前言

Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。
一、Kafka 核心概念


  • Producer:消息生产者
  • Consumer:消息消费者
  • Broker:Kafka 服务节点
  • Topic:消息主题分类
  • Partition:Topic 的分区,实现并行处理
  • Consumer Group:消费者组,实现负载均衡
二、Docker 安装 Kafka
  1. # docker-compose.yml
  2. services:
  3.   zookeeper:
  4.     image: confluentinc/cp-zookeeper:7.5.0
  5.     environment:
  6.       ZOOKEEPER_CLIENT_PORT: 2181
  7.   kafka:
  8.     image: confluentinc/cp-kafka:7.5.0
  9.     depends_on: [zookeeper]
  10.     ports: ["9092:9092"]
  11.     environment:
  12.       KAFKA_BROKER_ID: 1
  13.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  14.       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
  15.       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
复制代码
三、Spring Boot 整合 Kafka
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     spring-kafka</artifactId>
  4. </dependency>
  5. # application.yml
  6. spring:
  7.   kafka:
  8.     bootstrap-servers: localhost:9092
  9.     producer:
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.     consumer:
  13.       group-id: my-group
  14.       auto-offset-reset: earliest
  15.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  16.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码
四、生产者
  1. @Service
  2. public class OrderProducer {
  3.     @Autowired
  4.     private KafkaTemplate<String, String> kafkaTemplate;
  5.     public void sendOrder(String orderId) {
  6.         String message = "{"orderId":""+orderId+"","time":""+LocalDateTime.now()+""}";
  7.         kafkaTemplate.send("order-topic", orderId, message);
  8.         System.out.println("Sent: " + message);
  9.     }
  10. }
复制代码
五、消费者
  1. @Component
  2. public class OrderConsumer {
  3.     @KafkaListener(topics = "order-topic", groupId = "my-group")
  4.     public void consume(ConsumerRecord<String, String> record) {
  5.         System.out.println("Received: key=" + record.key());
  6.         System.out.println("Value: " + record.value());
  7.         System.out.println("Partition: " + record.partition());
  8.         System.out.println("Offset: " + record.offset());
  9.     }
  10. }
复制代码
六、手动提交 Offset
  1. @KafkaListener(topics = "order-topic", groupId = "my-group")
  2. public void consumeWithManualAck(
  3.         ConsumerRecord<String, String> record,        Acknowledgment ack) {
  4.     try {
  5.         processOrder(record.value());
  6.         ack.acknowledge();  // 手动确认
  7.     } catch (Exception e) {
  8.         // 处理失败,消息会被重新消费
  9.         log.error("Failed to process: {}", record.key(), e);
  10.     }
  11. }
  12. # 开启手动提交
  13. spring.kafka.listener.ack-mode: manual
复制代码
七、常用命令速查
  1. # 创建 Topic
  2. kafka-topics.sh --create --topic order-topic \
  3.   --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  4. # 查看 Topic 列表
  5. kafka-topics.sh --list --bootstrap-server localhost:9092
  6. # 查看消费者组
  7. kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
  8. # 控制台消费者
  9. kafka-console-consumer.sh --topic order-topic \
  10.   --bootstrap-server localhost:9092 --from-beginning
复制代码
总结

Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。

觉得有帮助请点赞收藏!有问题欢迎评论区交流
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册