前言
Apache Kafka 是分布式消息队列的事实标准,本文带你实战 Spring Boot 整合 Kafka,完成生产者和消费者的完整开发。
一、Kafka 核心概念
- Producer:消息生产者
- Consumer:消息消费者
- Broker:Kafka 服务节点
- Topic:消息主题分类
- Partition:Topic 的分区,实现并行处理
- Consumer Group:消费者组,实现负载均衡
二、Docker 安装 Kafka
- # docker-compose.yml
- services:
- zookeeper:
- image: confluentinc/cp-zookeeper:7.5.0
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- kafka:
- image: confluentinc/cp-kafka:7.5.0
- depends_on: [zookeeper]
- ports: ["9092:9092"]
- environment:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
复制代码 三、Spring Boot 整合 Kafka
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- spring-kafka</artifactId>
- </dependency>
- # application.yml
- spring:
- kafka:
- bootstrap-servers: localhost:9092
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: my-group
- auto-offset-reset: earliest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码 四、生产者
- @Service
- public class OrderProducer {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- public void sendOrder(String orderId) {
- String message = "{"orderId":""+orderId+"","time":""+LocalDateTime.now()+""}";
- kafkaTemplate.send("order-topic", orderId, message);
- System.out.println("Sent: " + message);
- }
- }
复制代码 五、消费者
- @Component
- public class OrderConsumer {
- @KafkaListener(topics = "order-topic", groupId = "my-group")
- public void consume(ConsumerRecord<String, String> record) {
- System.out.println("Received: key=" + record.key());
- System.out.println("Value: " + record.value());
- System.out.println("Partition: " + record.partition());
- System.out.println("Offset: " + record.offset());
- }
- }
复制代码 六、手动提交 Offset
- @KafkaListener(topics = "order-topic", groupId = "my-group")
- public void consumeWithManualAck(
- ConsumerRecord<String, String> record, Acknowledgment ack) {
- try {
- processOrder(record.value());
- ack.acknowledge(); // 手动确认
- } catch (Exception e) {
- // 处理失败,消息会被重新消费
- log.error("Failed to process: {}", record.key(), e);
- }
- }
- # 开启手动提交
- spring.kafka.listener.ack-mode: manual
复制代码 七、常用命令速查
- # 创建 Topic
- kafka-topics.sh --create --topic order-topic \
- --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
- # 查看 Topic 列表
- kafka-topics.sh --list --bootstrap-server localhost:9092
- # 查看消费者组
- kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
- # 控制台消费者
- kafka-console-consumer.sh --topic order-topic \
- --bootstrap-server localhost:9092 --from-beginning
复制代码 总结
Kafka 是微服务架构中异步通信的核心组件。核心要点:Producer/Consumer 模式实现解耦、Partition 实现并行处理、Consumer Group 实现负载均衡、手动提交 Offset 保证消息可靠性。
觉得有帮助请点赞收藏!有问题欢迎评论区交流
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |