RabbitMQ消息队列
什么是消息队列?
一般我们在进行远程调用时,可以使用发送HTTP请求来完成,现在可以使用第二种方式,那就是消息队列。
他能将发送方发送的消息放入队列中,当新消息入队时,会通知接收方进行处理,一般消息发送方称为生产者,接收方称为消费者。
这样所有的请求都直接丢到消息队列中,再由消费者取出,不再是直接连接消费者的形式了,而是增加了一个中间商,这也时一种很好的解耦方案,并且在高并发情况下,由于消费者能力有限,消息队列能起到削峰填谷的作用,堆积一部分请求,再由消费者来慢慢处理,而不会像直接调用那样请求蜂拥而至!
消息队列的具体实现有哪些?
- RabbitMQ:老牌、稳定、功能全,适合传统企业、小中型项目、复杂路由
- Kafka:吞吐量爆炸、高吞吐、日志 / 大数据专用,不适合做业务消息
- RocketMQ:阿里出品,高吞吐 + 业务可靠,电商 / 金融 / 互联网主流
那到底要怎么选择呢?
- 做电商 / 订单 / 支付 → RocketMQ
- 做日志 / 大数据 / 流处理 → Kafka
- 做企业系统 / 微服务解耦 → RabbitMQ
整体对比如下:
对比维度RabbitMQKafkaRocketMQ开发语言ErlangScala/JavaJava协议AMQP、MQTT、STOMP自定义 TCP 协议自定义 TCP 协议吞吐量万级 TPS中等并发几十万 TPS超高吞吐十万级 TPS高吞吐延迟微秒级极低延迟毫秒级毫秒级可靠性高持久化 + 消息确认中配置不当易丢消息最高金融级可靠功能丰富度最全复杂路由、死信、优先级、延迟极简只做发布订阅、流处理很全事务、定时、重试、顺序消息集群依赖自带集群依赖 Zookeeper依赖 NameServer运维难度一般Erlang 问题难排查较高依赖 ZK,参数多较低轻量、易部署社区生态国外成熟、文档全大数据生态极强国内生态好、阿里系完善典型场景企业系统、微服务解耦、复杂路由日志收集、大数据流、埋点电商、金融、订单、秒杀、支付安装消息队列
RabbitMQ运行需要Erlang环境,所以需要先安装Erlang环境
- sudo apt install rabbitmq-server
复制代码 安装完成后可输入sudo rabbitmqctl status 查看当前RabbitMQ的运行状态
可以看到有两个端口,一会使用的就是amqp协议那个端口来连接。
25672时集群化端口。
- 开启RabbitMQ管理面板,这样就可以在浏览器上进行实时访问和监控了:
- sudo rabbitmq-plugins enable rabbitmq_management
复制代码 输入网址http://127.0.0.1:15672 就可以访问了
默认用户和密码都是guest,guest只能在本地进行登录,如果要从远程服务器访问管理面板,要创建一个新的管理员账号,不能使用guest。
创建命令如下 :- # sudo rabbitmqctl add_user 用户名 密码
- sudo rabbitmqctl add_user admin admin
复制代码 然后将管理员权限授予创建的用户- sudo rabbitmqctl set_user_tags admin administrator
复制代码 然后就可以登录了。
RabbitMQ的设计架构如下:
- Channel:客户端连接会使用Channel,再通过Channel访问Rabbit MQ服务器,这里的通信协议不是http,而是amqp协议。
- Exchange:类似于交换机,会根据请求,转发给对应的消息队列,每个队列都可以绑定到Exchange上,这样Exchange就可以将数据转发给队列。可以存在多个。不同的Exchange类型可以用于实现不同的消息模式。
- Queue:消息队列,生产者所有的消息都存放再消息队列中,等待消费者取出。
- Virtual Host:类似于环境隔离,不同环境可以单独配置一个Virtual Host,每个Virtual Host可以包含多个Exchange和Queue,每个Virtual Host互不影响。
使用消息队列
简单模式
一个生产者--------输入数据-----》消息队列---------取出数据-----------》一个消费者
- 先进入管理页面,创建一个新的实验环境,只需新建一个Virtual Host即可:
主页选择admin标签,右侧选择Virtual Host选项,然后创建一个新的Virtual Host
这时,系统会自动增加对应的Exchange
这里先说明前面2个direct类型的交换机。(AMQP default)和amq.direct,他们都是直连模式的交换机。
这个交换机是所有虚拟主机都会自带的一个默认交换机,此交换机不可删除,默认绑定到所有的消息队列,如果通过默认交换机发送消息,那么会根据消息的routingKey决定发送给哪个同名的消息队列,同时也不能显示地将消息队列绑定或解绑到此交换机。
当前交换机的特性是持久化的,如果不持久化,那么一重启,增加的交换机就会消失。所有带D字样的。都表示是持久化的。所有自动生成的交换机都是持久化的。
此交换机和默认交换机类型一致,并且也是持久化,但是它具有绑定关系。如果没指定消息队列绑定到此交换机,那么此交换机无法将信息存放到指定队列。
此时没有消息队列,需要添加一个。
第一行选择刚创建的虚拟主机。
类型选择Classic
持久化可以选择Transient 暂时的(也可以选择持久化)
自动删除选择No(需要至少一个消费者连接到此队列,之后,一旦这个队列没有消费者连接,就会自动删除此队列)
然后就可以保存了。
然后将此消息队列绑定到第二个交换机上。
然后可以在页面上直接发送消息。
这时可以在页面上获取队列中的消息,
Ack Mode (应答模式选择),一共4个选项:
- Nack message requeue true:拒绝消息,不会将消息从队列中取出,并且重新排队,一次可以拒绝多个消息。
- Ack message requeue false:确认应答,确认后会从队列中移除,一次可以确认多个消息。
- Reject message requeue true/false: 拒绝消息,可以指定是否重新排队。
这里使用默认的,只会查看消息。不会取出。
第二个参数是编码格式,使用默认就行
第三个参数是指定取出消息的个数
使用Java操作消息队列
使用简单的maven项目整合消息队列
- <dependency>
- <groupId>com.rabbitmq</groupId>
- amqp-client</artifactId>
- <version>5.14.2</version>
- </dependency>
复制代码
- 实现生产者和消费者。首先是生产者。生产者复制把消息发送到消息队列:
- public static void main( String[] args )
- {
- // 使用 RabbitMQ Java 客户端连接到 RabbitMQ 服务器
- ConnectionFactory factory = new ConnectionFactory();
- // 连接基础配置
- factory.setHost("localhost");
- factory.setPort(5672); // 5672 是 RabbitMQ 的默认 AMQP 端口
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setVirtualHost("/test");
- try {
- // 创建连接
- Connection connection = factory.newConnection();
- // 创建频道
- Channel channel = connection.createChannel();
- // 声明队列,并绑定它们
- channel.queueDeclare("test", false, false, false, null);
- // 将队列绑定到交换机
- channel.queueBind("yyds", "amq.direct", "my-yyds");
- // 发送消息 ,消息需要转换成字节数组
- channel.basicPublish("amq.direct", "my-yyds", null, "Hello World".getBytes());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
复制代码 其中,queueDeclare方法的参数如下:
- queue:队列名称(默认创建后routingKey和队列名称一致)
- durable:是否持久化
- exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个排他队列,并且如果一个Connection已经声明了一个排他队列,其他的Connection不允许建立同名的排他队列,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列会被自动删除。
- autoDelete:是否自动删除
- arguments:设置队列的其他一些参数,这里暂时不需要设置。
其中,queueBind方法参数如下:
- queue:需要绑定的队列名称
- exchange:需要绑定的交换机名称
- routingKey:指定routingKey
其中,basicPublish方法的参数如下:
- exchange:对应的exchange名称,我们这里使用第二个直连交换机
- routingKey:绑定时指定的routingKey
- props:其他配置
- body:消息本体
- public static void main(String[] args) throws IOException, TimeoutException {
- // 使用 RabbitMQ Java 客户端连接到 RabbitMQ 服务器
- ConnectionFactory factory = new ConnectionFactory();
- // 连接基础配置
- factory.setHost("localhost");
- factory.setPort(5672); // 5672 是 RabbitMQ 的默认 AMQP 端口
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setVirtualHost("/test");
- // 创建连接
- // 这里不使用try-with-resources,因为消费者需要持续监听消息,不能在try块结束后关闭连接
- Connection connection = factory.newConnection();
- // 创建频道
- Channel channel = connection.createChannel();
- // 声明队列,并绑定它们
- channel.queueDeclare("test", false, false, false, null);
- // 将队列绑定到交换机
- channel.queueBind("yyds", "amq.direct", "my-yyds");
- // 创建消息消费者,监听队列并处理消息
- channel.basicConsume("yyds", false, (s, delivery) -> {
- String msg = new String(delivery.getBody());
- System.out.println("Received: " + msg);
- // basicAck确认应答,
- // 第一个参数是消息的标签
- // 第二个参数是是否批量确认,如果为true,则会一次性确认所有小于等于该标签的消息,如果为false,则只确认当前消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- // basicNack是拒绝应答,前两个参数与basicAck相同,
- // 第三个参数表示是否重新入队,如果为true,则消息会重新放回队列等待被消费,如果为false,则消息会被丢弃
- //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
- // 跟上面一样,最后一个参数是false,表示不重新入队,而是直接丢弃消息,只不过这里省略了
- //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
- }, s -> {});
- }
复制代码 其中,basicConsume方法参数如下:
- queue:队列名称
- autoAck:自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
- deliver:消息接收后的回调函数,可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
- cancel:当消费者取消订阅时进行的函数回调,这里暂时用不到。
spring boot整合消息队列
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码- spring:
- rabbitmq:
- addresses: localhost
- username: admin
- password: admin
- virtual-host: /test
复制代码- @Configuration
- public class RabbitConfiguration {
- @Bean("directExchange")// 定义交换机,可以很多个
- public Exchange exchange() {
- return ExchangeBuilder.directExchange("amq.direct").build();
- }
- @Bean("yydsQueue") // 定义消息队列
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .build();
- }
- @Bean("binding")
- public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("yydsQueue") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("my-yyds") // 绑定路由键routingKey
- .noargs();
- }
- }
复制代码- // RabbitTemplate封装了大量的方法来简化RabbitMQ的使用
- @Resource
- RabbitTemplate rabbitTemplate;
- @Test
- void publisher() {
- // 最后一个消息本体可以是Object类型
- rabbitTemplate.convertAndSend("amqp.direct", "my-yyds", "Hello, RabbitMQ!");
- }
复制代码- @Component // 注册为bean
- public class TestLinster {
- // 定义此方法为队列yyds的监听器,一旦监听到消息,就会接收并处理
- @RabbitListener(queues = "yyds")
- public void test(Message message) {
- System.out.println(new String(message.getBody()));
- }
- }
复制代码 那如何获取消费者的处理结果呢?
生产者需要这样写:- @Test
- void publisher() {
- Object res = rabbitTemplate.convertSendAndReceive("amqp.direct", "my-yyds", "Hello, RabbitMQ!");
- System.out.println("消费者的相应:"+res);
- }
复制代码 消费者这样写:- @RabbitListener(queues = "yyds")
- public String test1(Message message) {
- System.out.println(new String(message.getBody()));
- return "相应成功";
- }
复制代码 那如果我需要直接接收一个json格式的消息,并且希望直接获取到实体对象呢?
- // 创建一个消息转换器,使用Jackson2JsonMessageConverter将消息转换为JSON格式
- @Bean("jacksonConverter")
- public Jackson2JsonMessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
复制代码
- 消费者的@RabbitListener注解里指定这个消息转换器
- @RabbitListener(queues = "yyds",messageConverter = "jacksonConverter")
- public String test1(User user) {
- System.out.println(user);
- return "相应成功";
- }
复制代码 生产者可以这样直接发送对象消息- @Test
- void publisher() {
- Object res = rabbitTemplate.convertSendAndReceive("amqp.direct", "my-yyds", new User(1, "张三", "123456"));
- System.out.println("消费者的相应:"+res);
- }
复制代码 死信队列
如果队列中的数据迟迟没有消费者处理,就会一直占用队列的空间。
比如抢车票。如果用户下单后不付款,这张票就会一直被这个用户占用,直到超时后才可以被他人购买!
这时可以使用死信队列,将用户超时未付款或者主动取消的订单,进行处理,以下类型的消息都会被判定为死信:
- 消息被拒绝(basic.reject/basic.nack),并且requeue= false
- 消息TTL过期
- 队列达到最大长度
那么如何构建这样的模式呢?
其实本质上就是一个死信交换机+死信队列,当正常队列中的消息被判定为死信时,会被发送到对应的死信交换机,然后通过交换机发送到死信队列,死信队列也有对应的消费者去处理信息!
具体实现步骤如下:
- 在配置类中创建一个死信交换机和死信队列,并进行绑定:
- @Bean("directDLExchange")
- public Exchange dlExchange() {
- // 创建一个死信交换机,类型为direct,名称为dlx.direct
- return ExchangeBuilder.directExchange("dlx.direct").build();
- }
- @Bean("yydsDLQueue") // 创建一个死信队列,名称为dl-yyds
- public Queue dlQueue() {
- return QueueBuilder.nonDurable("dl-yyds") // 非持久化队列
- .build();
- }
- @Bean("dlBinding") // 创建一个绑定,将死信队列绑定到死信交换机上,使用路由键dl-yyds
- public Binding dlBingding(@Qualifier("directDLExchange") Exchange exchange,@Qualifier("yydsDLQueue") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("dl-yyds") // 绑定路由键routingKey
- .noargs();
- }
- @Bean("yydsQueue")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .deadLetterExchange("dlx.direct") // 设置死信交换机为dlx.direct
- .deadLetterRoutingKey("dl-yyds") // 设置死信路由键为dl-yyds
- .build();
- }
复制代码- @RabbitListener(queues = "dl-yyds",messageConverter = "jacksonConverter")
- public String test2(Message message) {
- System.out.println(new String(message.getBody()));
- return "相应成功";
- }
复制代码 另外,Rabbit MQ支持将超过一定时间没被消费的消息自动删除,这需要消息队列设定TTL值,如果消息的存货时间超过了TIme To Live 值,就会被自动删除,然后消息进入死信队列。- @Bean("yydsQueue")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .deadLetterExchange("dlx.direct") // 设置死信交换机为dlx.direct
- .deadLetterRoutingKey("dl-yyds") // 设置死信路由键为dl-yyds
- .ttl(5000) // 设置消息的过期时间为5000毫秒(5秒),超时自动删除
- .build();
- }
复制代码 当消息队列长度达到最大,会把先进入队列的消息放进死信队列- @Bean("yydsQueue")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .deadLetterExchange("dlx.direct") // 设置死信交换机为dlx.direct
- .deadLetterRoutingKey("dl-yyds") // 设置死信路由键为dl-yyds
- .ttl(5000) // 设置消息的过期时间为5000毫秒(5秒)
- .maxLength(3) // 设置队列的最大长度为3条消息
- .build();
- }
复制代码 工作队列模式
这种模式非常适合多个工人等待新的任务到来的场景。
当我们把多个任务丢进消息队列,而此时工人有多个,可以将这些任务分配给各个工人!
实现起来非常简单,只需要创建两个监听器即可:- @RabbitListener(queues = "yyds")
- public void test1(String data) {
- System.out.println("一号队列监听器:"+data);
- }
- @RabbitListener(queues = "yyds")
- public void test1_1(String data) {
- System.out.println("二号队列监听器:"+data);
- }
复制代码 这些监听器会自动轮询获取队列中的消息。
如果一开始,队列中就有一部分消息,这时再开始启用消费者还是轮询获取队列中的消息码?
不是,如果一开就存在部分消息,会被一个消费者一次性全部消耗,因为我们没有对消费者的Prefetch Count(预获取数量,一次性获取消息的最大数量)进行限制。如果想要轮询获取,就需要将这个值设置为1,即消费者一次只能拿一个消息,而不是将所有消息全部获取。
那如何对消费者的预获取数量进行设置呢?
需要再配置类中定义一个自定义的 ,在这里设置消费者Channel的Prefetch Count- @Resource
- private CachingConnectionFactory cachingConnectionFactory;
- @Bean("listenerContainer") // 创建一个RabbitListenerContainerFactory,设置连接工厂和预取计数
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(cachingConnectionFactory);
- factory.setPrefetchCount(1); // 设置预取计数为1,表示每次只处理一条消息,处理完后再处理下一条消息
- return factory;
- }
复制代码 然后在监听器指定刚才设置的监听工厂- @RabbitListener(queues = "yyds",containerFactory = "listenerContainer")
- public void test1(String data) {
- System.out.println("一号队列监听器:"+data);
- }
- @RabbitListener(queues = "yyds",containerFactory = "listenerContainer")
- public void test1_1(String data) {
- System.out.println("二号队列监听器:"+data);
- }
复制代码 除了去定义两个相同的监听器之外,我们也可以在注解中定义,来开启多个消费者,比如现在开启10个相同的消费者- @RabbitListener(queues = "yyds",containerFactory = "listenerContainerFactory",concurrency = "10")
- public void test1_1(String data) {
- System.out.println("二号队列监听器:"+data);
- }
复制代码 发布订阅模式
比如,当购买的云服务器快到期时,就会给手机、邮箱发送续费消息,但是手机短信和邮件发送并不是同一个业务提供的,但是希望能够都去执行,这时,就需要用到发布订阅模式,简而言之就是发布一个,消费多个。
实现这种模式也非常简单,这里需要用到另一种交换机(fanout 扇出类型),这是一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。
具体实现步骤如下:
- 在配置类中设置2个消息队列绑定到fanout交换机:
- @Bean("fanoutExchange")
- public Exchange exchange() {
- return ExchangeBuilder.fanoutExchange("amq.fanout").build();
- }
- @Bean("yydsQueue1")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds1") // 非持久化队列
- .build();
- }
- @Bean("yydsQueue2")
- public Queue queue2() {
- return QueueBuilder.nonDurable("yyds2") // 非持久化队列
- .build();
- }
- @Bean("binding")
- public Binding binding(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("yydsQueue1") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("yyds1") // 绑定路由键routingKey
- .noargs();
- }
- @Bean("binding2")
- public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("yydsQueue2") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("yyds2") // 绑定路由键routingKey
- .noargs();
- }
复制代码- @RabbitListener(queues = "yyds1")
- public void test1(String data) {
- System.out.println("一号队列监听器:"+data);
- }
- @RabbitListener(queues = "yyds2")
- public void test1_1(String data) {
- System.out.println("二号队列监听器:"+data);
- }
复制代码 发送到队列中的消息,这两个监听器都能获取到。这样就实现了发布订阅模式。
路由模式
路由模式就是在绑定时指定想要的routingKey,只有生产者发送时指定了routingKey,带能到达对应的队列。
除了之前的一次绑定外,同一个队列可以多次绑定到交换机,并且使用不同的routingKey,这样只要满足其中一个routingKey,都可以将消息发送到此队列中。
- @Bean("directExchange")
- public Exchange exchange() {
- return ExchangeBuilder.directExchange("amq.direct").build();
- }
- @Bean("yydsQueue")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .build();
- }
- @Bean("binding") // routingKey是yyds1
- public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("yydsQueue") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("yyds1") // 绑定路由键routingKey
- .noargs();
- }
- @Bean("binding2") // routingKey是yyds2
- public Binding binding2(@Qualifier("directExchange") Exchange exchange,@Qualifier("yydsQueue") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("yyds2") // 绑定路由键routingKey
- .noargs();
- }
复制代码- @RabbitListener(queues = "yyds")
- public void test1(String data) {
- System.out.println("一号队列监听器:"+data);
- }
复制代码 主题模式
实际上就是一种模糊匹配的模式,将routingKey以模糊匹配的方式去进行转发。
使用* 或# 来表示
- *表示任意一个单词,比如*.test 可以匹配 a.test
- #表示0或多个单词,比如#.test可以匹配test、aaa.test
具体使用如下:
- @Bean("topicExchange") // 使用topic类型交换机
- public Exchange exchange() {
- return ExchangeBuilder.topicExchange("amq.topic").build();
- }
- @Bean("yydsQueue")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .build();
- }
- @Bean("binding")
- public Binding binding(@Qualifier("topicExchange") Exchange exchange,@Qualifier("yydsQueue") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- .with("*.test.*") // 绑定路由键routingKey
- .noargs();
- }
复制代码 除了使用默认的主题交换机(amqp.topic)之外,还有一个叫做amq.rabbitmq.trace交换机,这个交换机也是topic类型,那这个交换机是做什么的呢?
它是帮助我们记录和追踪生产者和消费者使用消息队列的,它是一个内部交换机,记录消息进入哪个队列,被哪个消费者消费了,具体的使用方法如下:
- sudo rabbitmqctl trace_on -p /test
复制代码
- 开启后,将队列trace绑定到上面的交换机上,publish.# 表示记录所有生产者(无论绑定的是哪个交换机)发布的消息,deliver.# 表示记录所有消费者(无论绑定的是哪个队列)消费的消息:
由于发送到此交换机上的routingKey为publish.交换机名称和deliver.队列名称,分别对应生产者投递到交换机的消息和消费者从队列上获取的消息,因此这里使用通配符进行绑定。
此时,生产者给某个队列发送消息后,trace队列会记录这些消息,此时trace队列中的消息如下:
第四种交换机header
它是根据头部信息决定路由到哪一个消息队列中。在发送消息时,可以携带一些头部信息。
- @Bean("headerExchange") // 注意。,这里返回的是HeadersExchange
- public HeadersExchange exchange() {
- return ExchangeBuilder.headersExchange("amq.headers").build();
- }
- @Bean("yydsQueue")
- public Queue queue() {
- return QueueBuilder.nonDurable("yyds") // 非持久化队列
- .build();
- }
- @Bean("binding")
- public Binding binding(@Qualifier("headerExchange") HeadersExchange exchange,@Qualifier("yydsQueue") Queue queue) {
- return BindingBuilder.bind(queue)// 绑定队列
- .to(exchange)// 绑定交换机
- //.whereAny("a","b").exist() // 绑定路由,whereAny表示满足任意一个条件即可,exist表示存在这个key即可
- //.whereAll("a","b").exist() // 绑定路由,whereAll表示满足所有条件,a和b都必须存在
- .where("test").matches("123456"); // 绑定路由,where表示满足某个条件,头部信息必须包含test,matches表示test的值为123456
- //.whereany(Collections.singletonMap("test","123456")).match() // 传入map要绑定的条件,match表示满足这个条件即可
- }
复制代码 集群搭建
让RabbitMQ之间相互进行数据复制(镜像模式)。主节点的消息队列复制到从节点上。
当前只有一个节点。现在开始设置:
- 在从节点上,先关闭:
- sudo rabbitmqctl stop_app
复制代码 - 注意从节点服务器上的host是否匹配
- 保证2个节点上的erlang.cookie一致,将主节点的erlang.cookie内容复制到从节点上
- 然后重启从节点的rabbitmq
- sudo systemctl restart rabbitmq-server.service
复制代码- sudo rabbitmqctl stop_app
复制代码- sudo rabbitmqctl join_cluster rabbit@ubuntu-server
复制代码- sudo rabbitmqctl start_app
复制代码
这时会发现有2个节点,主节点内部发送了一条消息,从节点也会显示主节点的那个消息。此时若关闭主节点,那么从节点就访问不到这个消息。
此时消息还在主节点上,并没有拷贝到从节点上。
此时,主节点上的队列会创建一个镜像队列,+1 表示有一个镜像队列
此时,在主节点发送消息后,然后关闭服务,从节点上的那个队列就成自己的了,就能从队列中获取到这个消息了。
这时,在启动主节点,然后点同步按钮,会发现,此时从节点变成新的主节点,原来的主节点成为新的从节点。
SpringCloud消息组件
Spring cloud Stream
当不同的系统使用不同的消息队列,比如系统A使用Kafka,系统B使用Rabbit MQ,这时候该怎么办,我又不会Kafka,有没有一种方式像JDBC一样,只需要关心sql和业务本身,而不关心数据库的具体实现呢?
Spring cloud Stream就能够实现,它能屏蔽底层实现,使用统一的消息队列操作方式就能够操作多种不同类型的消息队列。
它屏蔽了Rabbit MQ底层操作,让我们使用同一的input和output形式,以Binder为中间件,这样就算我们切换了不同的消息队列,也无需修改代码,而消息队列底层实现交给了Stream。
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- spring-cloud-dependencies</artifactId>
- <version>2021.0.1</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
复制代码
- 创建两个模块publisher-service和consumer-service,然后引入依赖:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-web</artifactId>
- </dependency>
复制代码- server:
- port: 8001
- spring:
- cloud:
- stream:
- binders: # 配置要绑定的消息中间件
- local-server: # 绑定一个名为local-server的消息中间件,名称随便起
- type: rabbit
- environment:
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: admin
- password: admin
- virtual-host: /
- bindings:
- test-out-0:
- destination: test-exchange
复制代码 4.编写一个controller- @RestController
- public class PublisherController {
- // 注入 StreamBridge 用于发送消息
- @Resource
- StreamBridge streamBridge;
- @RequestMapping("/publish")
- public String publishMessage(String message) {
- // 第一个参数就是RabbitMQ的交换机名称(数据会发送给这个交换机,到达哪个消息队列,不由我们决定)
- // 交换机的命名规则如下:
- // 输入:《名称》-in-《index》
- // 输出:《名称》-out-《index》
- // 这里使用了 "test-out-0" 作为交换机名称,表示这是一个输出交换机,index为0,注意这里的交换机名称会和消费者bean名称对应
- streamBridge.send("test-out-0", "Hello, this is a message from the publisher service: " + message);
- return "消息发送成功:" + new Date();
- }
- }
复制代码
- 编写一个消费者,配置文件和生产者的大部分配置一样,注意端口改成别的端口。
- server:
- port: 8002
- spring:
- cloud:
- stream:
- binders: # 配置要绑定的消息中间件
- local-server: # 绑定一个名为local-server的消息中间件,名称随便起
- type: rabbit
- environment:
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: admin
- password: admin
- virtual-host: /
- bindings:
- test-in-0:
- destination: test-exchange
复制代码 到此为止,生产者和消费者就编写完成了。生产者发送消息,消费者就可以消费了。
Spring Cloud Bus
Spring Cloud Bus是一个消息总线,用于向各个服务广播某些状态的更改(比如云端配置更改,可以结合Config组件实现动态更新配置,当然nacos已经实现了这个功能)或者其他管理指令。
Bus需要基于一个具体的消息队列实现,比如Rabbit MQ或者Kafka,这里使用RabbitMQ
在微服务项目中,我们希望借阅服务的某个接口被调用时,能够给用户服务和图书服务发送一个通知。
具体实现如下:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- spring-cloud-starter-bus-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-actuator</artifactId>
- </dependency>
复制代码- spring:
- rabbitmq:
- addresses: localhost:5672
- username: admin
- password: admin
- virtual-host: /
- management:
- endpoints:
- web:
- exposure:
- include: "*" # 暴露所有端点
复制代码- spring:
- rabbitmq:
- addresses: localhost:5672
- username: guest
- password: guest
- virtual-host: /
复制代码 然后启动这三个服务。
RabbitMQ 会新增一个SpringCloudBus交换机,并且生成三个消息队列。
这样就可以监听并接收消息了。
现在访问借阅服务/actuator/busrefresh端口,通知其他服务进行刷新
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |