找回密码
 立即注册
首页 业界区 业界 SpringBoot进阶教程(八十九)rabbitmq长链接及域名TTL, ...

SpringBoot进阶教程(八十九)rabbitmq长链接及域名TTL,多机房切换配置重连能力

醋辛 昨天 22:25
在Spring Boot中配置RabbitMQ以解决长连接稳定性、域名TTL问题及机房切换后的自动重连能力,需结合 连接工厂参数优化、DNS缓存刷新、自定义重连策略 三个核心方向。下面将介绍可直接落地的完整配置方案。
v一、基础依赖与核心配置

首先确保pom.xml 中引入Spring AMQP依赖(默认集成RabbitMQ 客户端):
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
v二、连接工厂配置(解决长连接稳定性)

通过ConnectionFactory配置RabbitMQ连接参数,重点优化心跳检测、自动恢复、超时控制等参数,确保长连接稳定。在application.yml中配置:
  1. spring:
  2.   rabbitmq:
  3.     addresses: rabbitmq.example.com:5672  # 域名+端口(多节点用逗号分隔)
  4.     username: admin
  5.     password: your_password
  6.     virtual-host: /
  7.     # 连接超时设置
  8.     connection-timeout: 30000  # 连接超时30秒
  9.     # 心跳检测(关键:检测连接存活)
  10.     requested-heartbeat: 60  # 心跳间隔60秒,避免被防火墙/负载均衡器断开
  11.     # 自动恢复配置(客户端内置重连机制)
  12.     publisher-confirm-type: CORRELATED  # 确保消息发布确认可靠性
  13.     publisher-returns: true  # 开启消息返回机制
  14.     listener:
  15.       simple:
  16.         retry:
  17.           enabled: true  # 开启消费者重试(避免连接抖动导致消息丢失)
  18.           max-attempts: 3  # 最大重试次数
  19.           initial-interval: 1000  # 重试初始间隔1秒
  20.     # 连接工厂高级配置(通过Java代码进一步定制)
复制代码
v三、解决域名 TTL(DNS 缓存)问题

JVM会缓存DNS解析结果,机房切换后域名IP变更时,需强制刷新解析。通过以下两种方式实现:
3.1JVM 层面控制 DNS 缓存(全局生效)在应用启动参数中添加JVM系统属性,缩短DNS缓存时间:
  1. java -Dsun.net.inetaddr.ttl=10 -Dsun.net.inetaddr.negative.ttl=5 -jar your-app.jar
复制代码

  • sun.net.inetaddr.ttl=10:正缓存(成功解析的 IP)10 秒后过期,强制重新解析。
  • sun.net.inetaddr.negative.ttl=5:负缓存(解析失败的记录)5 秒后过期,避免长期无法连接。
3.2连接工厂层面主动刷新 DNS(精准控制)通过自定义ConnectionFactory,在每次创建连接前主动解析域名获取最新IP,绕过本地缓存:
  1. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionListener;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import com.rabbitmq.client.Connection;
  8. import java.net.InetAddress;
  9. import java.net.UnknownHostException;
  10. @Configuration
  11. public class RabbitMQConfig {
  12.     @Value("${spring.rabbitmq.addresses}")
  13.     private String addresses;  // 原始配置的域名地址(如 rabbitmq.example.com:5672)
  14.     @Value("${spring.rabbitmq.username}")
  15.     private String username;
  16.     @Value("${spring.rabbitmq.password}")
  17.     private String password;
  18.     @Value("${spring.rabbitmq.virtual-host}")
  19.     private String virtualHost;
  20.     @Bean
  21.     public ConnectionFactory connectionFactory() {
  22.         // 1. 解析域名获取最新IP(核心:绕过DNS缓存)
  23.         String resolvedAddresses = resolveDomainToIp(addresses);
  24.         
  25.         // 2. 创建RabbitMQ连接工厂
  26.         CachingConnectionFactory factory = new CachingConnectionFactory();
  27.         factory.setAddresses(resolvedAddresses);  // 使用解析后的IP地址
  28.         factory.setUsername(username);
  29.         factory.setPassword(password);
  30.         factory.setVirtualHost(virtualHost);
  31.         factory.setConnectionTimeout(30000);  // 连接超时30秒
  32.         factory.setRequestedHeartbeat(60);  // 心跳间隔60秒
  33.         // 3. 启用自动恢复(关键:连接断开后自动重建)
  34.         factory.setAutomaticRecoveryEnabled(true);  // 开启自动恢复
  35.         factory.setNetworkRecoveryInterval(5000);  // 网络恢复重试间隔5秒
  36.         factory.setTopologyRecoveryEnabled(true);  // 恢复队列/交换机绑定(拓扑恢复)
  37.         // 4. 注册连接监听器(监控连接状态,触发自定义重连)
  38.         factory.addConnectionListener(new ConnectionListener() {
  39.             @Override
  40.             public void onClose(Connection connection) {
  41.                 // 连接关闭时触发(非主动关闭)
  42.                 if (!connection.isOpen()) {
  43.                     System.out.println("RabbitMQ连接已关闭,准备重连...");
  44.                     // 可在此处添加额外重连逻辑(如刷新IP后重建连接)
  45.                     refreshConnection(factory);
  46.                 }
  47.             }
  48.             @Override
  49.             public void onShutDown(ShutdownSignalException signal) {
  50.                 // 处理服务端主动关闭信号
  51.                 if (!signal.isInitiatedByApplication()) {
  52.                     System.err.println("RabbitMQ连接被强制关闭,原因:" + signal.getReason());
  53.                     refreshConnection(factory);
  54.                 }
  55.             }
  56.         });
  57.         return factory;
  58.     }
  59.     // 解析域名获取最新IP(替换域名中的主机为IP)
  60.     private String resolveDomainToIp(String addresses) {
  61.         try {
  62.             // 分割多节点地址(如 "host1:5672,host2:5672")
  63.             String[] addressArray = addresses.split(",");
  64.             StringBuilder resolved = new StringBuilder();
  65.             for (String addr : addressArray) {
  66.                 String[] hostPort = addr.split(":");
  67.                 String host = hostPort[0];
  68.                 String port = hostPort.length > 1 ? hostPort[1] : "5672";
  69.                
  70.                 // 解析域名获取最新IP
  71.                 InetAddress[] inetAddresses = InetAddress.getAllByName(host);
  72.                 String latestIp = inetAddresses[0].getHostAddress();  // 取第一个IP(多IP可轮询)
  73.                 resolved.append(latestIp).append(":").append(port).append(",");
  74.             }
  75.             // 移除最后一个逗号
  76.             return resolved.substring(0, resolved.length() - 1);
  77.         } catch (UnknownHostException e) {
  78.             System.err.println("域名解析失败,使用原始地址:" + e.getMessage());
  79.             return addresses;  // 解析失败时 fallback 到原始域名
  80.         }
  81.     }
  82.     // 刷新连接(重建连接工厂)
  83.     private void refreshConnection(CachingConnectionFactory factory) {
  84.         try {
  85.             // 1. 关闭旧连接
  86.             factory.destroy();
  87.             // 2. 重新解析域名获取新IP
  88.             String newAddresses = resolveDomainToIp(addresses);
  89.             // 3. 更新连接工厂地址并重建连接
  90.             factory.setAddresses(newAddresses);
  91.             factory.createConnection();  // 主动创建新连接
  92.             System.out.println("RabbitMQ重连成功,新地址:" + newAddresses);
  93.         } catch (Exception e) {
  94.             System.err.println("重连失败,5秒后重试:" + e.getMessage());
  95.             // 延迟重试(避免频繁失败)
  96.             try {
  97.                 Thread.sleep(5000);
  98.             } catch (InterruptedException ie) {
  99.                 Thread.currentThread().interrupt();
  100.             }
  101.             refreshConnection(factory);  // 递归重试
  102.         }
  103.     }
  104. }
复制代码
v四、机房切换重连能力增强(关键机制)

上述配置已实现基础重连,若需应对极端场景(如机房完全切换、多节点故障),可补充以下增强策略:
4.1多机房节点优先级配置若RabbitMQ部署在多机房(如主机房rabbitmq-primary.example.com、备机房rabbitmq-secondary.example.com),可在addresses中按优先级配置,重连时优先尝试主机房,失败后自动切换到备机房:
  1. spring:
  2.   rabbitmq:
  3.     addresses: rabbitmq-primary.example.com:5672,rabbitmq-secondary.example.com:5672  # 主备顺序
复制代码
客户端会按顺序尝试连接,主机房故障时自动切换到备机房。
4.2结合服务发现动态获取节点(适合大规模部署)若使用服务发现组件(如 Nacos、Consul)管理RabbitMQ节点,可在resolveDomainToIp方法中从服务发现获取健康节点,动态更新连接地址:
  1. // 从服务发现获取健康节点(伪代码)
  2. private List<String> getHealthyNodes() {
  3.     // 调用服务发现API,获取健康的RabbitMQ节点(如 "10.0.1.1:5672,10.0.2.1:5672")
  4.     return serviceDiscovery.getHealthyInstances("rabbitmq-service");
  5. }
复制代码
4.3重连时的资源恢复保障开启topologyRecoveryEnabled: true后,客户端会自动恢复队列、交换机、绑定关系及消费者。若需更严格的资源校验,可在重连成功后主动检查:
  1. // 重连成功后校验队列是否存在(示例)
  2. @Autowired
  3. private RabbitAdmin rabbitAdmin;
  4. public void validateQueue(String queueName) {
  5.     if (!rabbitAdmin.getQueueProperties(queueName).containsKey("queue")) {
  6.         // 队列不存在,重新声明
  7.         Queue queue = QueueBuilder.durable(queueName)
  8.                 .withArgument("x-message-ttl", 28800000)
  9.                 .build();
  10.         rabbitAdmin.declareQueue(queue);
  11.     }
  12. }
复制代码
v五、监控与告警(确保问题可感知)

通过Spring Boot Actuator监控RabbitMQ连接状态,配置如下:
  1. management:
  2.   endpoints:
  3.     web:
  4.       exposure:
  5.         include: rabbithealth,health
  6.   endpoint:
  7.     rabbithealth:
  8.       enabled: true  # 开启RabbitMQ专属健康检查
复制代码
健康检查会返回连接状态、通道数、消费者数等信息,结合Prometheus + Grafana 可实时监控连接波动,机房切换时及时告警。
注意:Spring Boot Actuator 是一个非常强大的监控和管理工具,但如果配置不当,确实会带来​​严重的安全风险​​。确保Actuator安全的关键在于遵循 ​​“最小权限原则”​​ ,即只暴露最少必要的信息,并严格控制访问。
v六、核心配置总结

配置方向关键操作长连接稳定性启用心跳检测(requested-heartbeat)、设置合理超时(connection-timeout)DNS 缓存问题缩短 JVM DNS 缓存时间 + 重连时主动解析域名机房切换重连启用自动恢复(automaticRecoveryEnabled)+ 自定义重连监听器 + 多节点优先级资源一致性开启拓扑恢复(topologyRecoveryEnabled)+ 主动校验队列 / 交换机v博客总结

通过以上配置,Spring Boot应用可在RabbitMQ机房切换时自动刷新DNS解析、重建连接并恢复资源,确保消息通信不中断。
其他参考/学习资料:v源码地址

https://github.com/toutouge/javademosecond

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册