找回密码
 立即注册
首页 业界区 业界 Serilog 日志库简单实践(四)消息队列 Sinks(.net8) ...

Serilog 日志库简单实践(四)消息队列 Sinks(.net8)

僻嘶 5 天前
〇、前言

前文已经介绍过什么是 Serilog,以及其核心特点,详见:https://www.cnblogs.com/hnzhengfy/p/19167414/Serilog_basic。
本文继续对各种类型的 Sink 进行简单的实践,主题是消息队列 Sinks,供参考。
在现代分布式系统中,将日志事件通过消息队列 Sinks 发布出去是一种实现系统解耦、异步处理和流量削峰的高效方式。
消息队列 Sinks 常见的有以下三种:
排名消息队列主要优势领域适用人群1Kafka高吞吐、日志、流处理大数据、互联网大厂2RabbitMQ可靠投递、灵活路由、易运维中小企业、传统行业3RocketMQ事务消息、顺序消费、高可靠中国互联网、电商金融本文就前两种进行简单的介绍和示例,对于 RocketMQ 待后续有机会进行补充。
一、消息队列 Sinks 的用法

1.1 Serilog.Sinks.Kafka:将日志信息写入 Kafka 队列

Serilog.Sinks.Kafka 是 Serilog 日志库的一个 Sink(接收器)插件,用于将结构化日志数据发送到 Apache Kafka 主题。它利用 Kafka 高吞吐量、分布式发布-订阅消息系统的特点,构建实时日志管道,使日志系统能够轻松处理海量日志数据,并支持多系统集成分析。
1.1.1 在 Windows 上安装 Kafka(用于开发测试,生产环境建议 Linux)

注意:Kafka 官方主要支持 Linux 环境,Windows 安装不是官方推荐的方式。但如果是在 Windows 上进行开发测试,还是可以的。
本文使用现今最新的版本 4.1.1,Kafka 4.1.1 默认使用 KRaft(Kafka Raft)模式,即去 ZooKeeper 架构,因此只需安装 java 环境,无需在安装 ZooKeeper 。
1)安装 Java 环境
由于 Kafka 4.1.1 是用 Java 17(class version 61)编译的,对应关系如下:
Java 版本与 class file version 对照关系:
Java 8 → 52
Java 11 → 55
Java 17 → 61
Java 21 → 65
因此,在安装 Kafka 之前可以先安装下 JDK 17。官方下载地址:https://www.oracle.com/java/technologies/javase/jdk17-archive-downloads.html。
直接默认选项安装即可。
然后配置环境变量:JAVA_HOME:C:\Program Files\Java\jdk-17。安装时若改了路径需要改成对应的路径。
Path 中添加一行:%JAVA_HOME%\bin。
然后打开 cmd 窗口验证安装成功:
  1. C:\Users\Administrator>java -version
  2. java version "17.0.12" 2024-07-16 LTS
  3. Java(TM) SE Runtime Environment (build 17.0.12+8-LTS-286)
  4. Java HotSpot(TM) 64-Bit Server VM (build 17.0.12+8-LTS-286, mixed mode, sharing)
复制代码
2)安装 Kafka
下载如下图中的二进制版本(Binary),可以直接运行,无需自己编译。文件名中的 2.13 表示该 Kafka 版本是用 Scala 2.13 编译的,可以忽略。
官方下载地址:https://kafka.apache.org/downloads。
1.png

解压到文件夹:D:\kafka_2.13-4.1.1。
修改配置文件夹D:\kafka_2.13-4.1.1\config\server.properties:
  1. # 【已默认】启用 KRaft 模式(同时作为 broker 和 controller)
  2. process.roles=broker,controller
  3. node.id=1
  4. broker.id=1
  5. # 【必须设置,需手动新增】
  6. # 定义 controller quorum 的投票成员
  7. # 格式:nodeId@host:port
  8. controller.quorum.voters=1@localhost:9093
  9. # controller.quorum.voters=1@localhost:9093 中的 1 必须和 node.id=1 一致
  10. # 9093 是 CONTROLLER 监听端口,必须与 listeners 中的 CONTROLLER://:9093 匹配
  11. # 必须设置!定义 controller quorum 的投票成员
  12. # 格式:nodeId@host:port
  13. controller.quorum.voters=1@localhost:9093
  14. # 【已默认】监听器配置
  15. listeners=PLAINTEXT://:9092,CONTROLLER://:9093
  16. inter.broker.listener.name=PLAINTEXT
  17. controller.listener.names=CONTROLLER
  18. # 日志目录(建议使用 Windows 路径)
  19. log.dirs=/tmp/kraft-combined-logs
复制代码
然后生成集群 ID 并格式化日志目录:(必要步骤,若直接启动,会报错:找不到 meta.properties 文件)
  1. # 虽然出现了 Log4j 的 ERROR 日志,但实际上 UUID 已成功生成
  2. PS D:\kafka_2.13-4.1.1> bin\windows\kafka-storage.bat random-uuid
  3. 2025-12-04T11:26:06.769887100Z main ERROR Reconfiguration failed: No configuration found for '4dc63996' at 'null' in 'null'
  4. wTL_esmQSuKMtvzx6eclsQ
  5. # 使用生成的 Cluster ID 格式化 Kafka 存储目录
  6. PS D:\kafka_2.13-4.1.1> bin\windows\kafka-storage.bat format -t wTL_esmQSuKMtvzx6eclsQ -c config/server.properties
  7. 2025-12-04T11:29:22.735759Z main ERROR Reconfiguration failed: No configuration found for '4dc63996' at 'null' in 'null'
  8. Formatting metadata directory /tmp/kraft-combined-logs with metadata.version 4.1-IV1.
  9. # 关于 Log4j ERROR 警告
  10. # main ERROR Reconfiguration failed: No configuration found for '4dc63996' at 'null' in 'null'
  11. # 是因为 Kafka 工具脚本(如 kafka-storage.bat)没有找到 Log4j 2 配置文件
  12. # 它不影响功能,只是日志系统初始化失败,但仍会使用默认配置。
复制代码
最后再启动 Kafka:bin\windows\kafka-server-start.bat config/server.properties。
看到如下输出就是启动成功了:
2.png

注意:Windows 安装 Kafka 仅适用于开发测试环境,生产环境建议使用 Linux 或 Docker 部署。
server.properties 示例
  1.  # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements.  See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License.  You may obtain a copy of the License at
  7. #
  8. #    http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. ############################# Server Basics #############################
  16. # The role of this server. Setting this puts us in KRaft mode
  17. process.roles=broker,controller
  18. # The node id associated with this instance's roles
  19. node.id=1
  20. broker.id=1
  21. # List of controller endpoints used connect to the controller cluster
  22. controller.quorum.bootstrap.servers=localhost:9093
  23. # 必须设置!定义 controller quorum 的投票成员
  24. # 格式:nodeId@host:port
  25. controller.quorum.voters=1@localhost:9093
  26. ############################# Socket Server Settings #############################
  27. # The address the socket server listens on.
  28. # Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
  29. # If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
  30. # with PLAINTEXT listener name, and port 9092.
  31. #   FORMAT:
  32. #     listeners = listener_name://host_name:port
  33. #   EXAMPLE:
  34. #     listeners = PLAINTEXT://your.host.name:9092
  35. listeners=PLAINTEXT://:9092,CONTROLLER://:9093
  36. # Name of listener used for communication between brokers.
  37. inter.broker.listener.name=PLAINTEXT
  38. # Listener name, hostname and port the broker or the controller will advertise to clients.
  39. # If not set, it uses the value for "listeners".
  40. advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
  41. # A comma-separated list of the names of the listeners used by the controller.
  42. # If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
  43. # This is required if running in KRaft mode.
  44. controller.listener.names=CONTROLLER
  45. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  46. listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  47. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  48. num.network.threads=3
  49. # The number of threads that the server uses for processing requests, which may include disk I/O
  50. num.io.threads=8
  51. # The send buffer (SO_SNDBUF) used by the socket server
  52. socket.send.buffer.bytes=102400
  53. # The receive buffer (SO_RCVBUF) used by the socket server
  54. socket.receive.buffer.bytes=102400
  55. # The maximum size of a request that the socket server will accept (protection against OOM)
  56. socket.request.max.bytes=104857600
  57. ############################# Log Basics #############################
  58. # A comma separated list of directories under which to store log files
  59. # log.dirs=/tmp/kraft-combined-logs
  60. # 数据目录(确保路径存在且可写)
  61. log.dirs=D:/kafka_2.13-4.1.1/data/kafka-logs
  62. # The default number of log partitions per topic. More partitions allow greater
  63. # parallelism for consumption, but this will also result in more files across
  64. # the brokers.
  65. num.partitions=1
  66. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  67. # This value is recommended to be increased for installations with data dirs located in RAID array.
  68. num.recovery.threads.per.data.dir=1
  69. ############################# Internal Topic Settings  #############################
  70. # The replication factor for the group metadata internal topics "__consumer_offsets", "__share_group_state" and "__transaction_state"
  71. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
  72. offsets.topic.replication.factor=1
  73. share.coordinator.state.topic.replication.factor=1
  74. share.coordinator.state.topic.min.isr=1
  75. transaction.state.log.replication.factor=1
  76. transaction.state.log.min.isr=1
  77. ############################# Log Flush Policy #############################
  78. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  79. # the OS cache lazily. The following configurations control the flush of data to disk.
  80. # There are a few important trade-offs here:
  81. #    1. Durability: Unflushed data may be lost if you are not using replication.
  82. #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  83. #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  84. # The settings below allow one to configure the flush policy to flush data after a period of time or
  85. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  86. # The number of messages to accept before forcing a flush of data to disk
  87. #log.flush.interval.messages=10000
  88. # The maximum amount of time a message can sit in a log before we force a flush
  89. #log.flush.interval.ms=1000
  90. ############################# Log Retention Policy #############################
  91. # The following configurations control the disposal of log segments. The policy can
  92. # be set to delete segments after a period of time, or after a given size has accumulated.
  93. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  94. # from the end of the log.
  95. # The minimum age of a log file to be eligible for deletion due to age
  96. log.retention.hours=168
  97. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  98. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  99. #log.retention.bytes=1073741824
  100. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  101. log.segment.bytes=1073741824
  102. # The interval at which log segments are checked to see if they can be deleted according
  103. # to the retention policies
  104. log.retention.check.interval.ms=300000
  105. # 允许自动创建 Topic
  106. auto.create.topics.enable=true
复制代码
3)创建 topic
创建语句:(注意,可以替换自定义的 topic 名:my-logs-topic1212)
  1. .\bin\windows\kafka-topics.bat --create --topic my-logs-topic1212 --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
复制代码
注意:此语句正常几秒钟就可以执行完成,若时间比较长,需要等待其返回错误提示。
可能会遇到如下错误:
  1. Error while executing topic command : Call(...createTopics...) timed out...
  2. Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled createTopics request... due to node 1 being disconnected
复制代码
原因:说明 Kafka 客户端(kafka-topics.bat)无法连接到 Kafka broker,或者 broker 虽然启动但未完全就绪(特别是 KRaft Controller 未选举完成)。
在 KRaft 模式 下,Kafka 启动分为两个角色:Controller:管理元数据(topics、partitions 等);Broker:处理生产/消费请求。如果 Controller 尚未完成初始化或选举,即使 kafka-server-start.bat 看似“已启动”,元数据操作(如创建 Topic)仍会失败。
解决方案:
  1. # 【步骤 1】确认 Kafka 是以 KRaft 模式 启动的
  2. # 检查是否使用了 正确的配置文件:
  3. # 正确启动命令(KRaft 模式)
  4. .\bin\windows\kafka-server-start.bat .\config\server.properties
  5. # 【步骤 2】检查 server.properties 中的关键配置
  6. # 打开 config/server.properties,确保包含:
  7. # 监听地址(必须!)
  8. process.roles=broker,controller
  9. node.id=1
  10. broker.id=1
  11. controller.quorum.voters=1@localhost:9093
  12. listeners=PLAINTEXT://:9092,CONTROLLER://:9093
  13. advertised.listeners=PLAINTEXT://localhost:9092
  14. inter.broker.listener.name=PLAINTEXT
  15. controller.listener.names=CONTROLLER
  16. # 数据目录(确保路径存在且可写)
  17. log.dirs=D:/kafka_2.13-4.1.1/data/kafka-logs
  18. # 注意:
  19. # controller.quorum.voters 必须与 node.id 匹配
  20. # log.dirs 路径不能包含空格或中文
  21. # 第一次启动前,清空 log.dirs 目录
  22. # 【步骤 3】首次启动前格式化存储目录(关键!)
  23. # KRaft 模式要求:先格式化存储目录,否则 broker 无法正常加入集群
  24. # 1. 生成 Cluster ID(只需一次)
  25. .\bin\windows\kafka-storage.bat random-uuid
  26. # 2. 格式化日志目录
  27. .\bin\windows\kafka-storage.bat format -t <上一步生成的UUID> -c .\config\kraft\server.properties
  28. # 【步骤 4】启动 Kafka 并等待完全就绪
  29. .\bin\windows\kafka-server-start.bat .\config\server.properties
  30. # Kafka 启动成功后,等待 10~15 秒 再执行 kafka-topics.bat
  31. # KRaft 启动比 ZooKeeper 模式慢,立即创建 Topic 会失败
复制代码
  1. # 实际操作过程示例:
  2. PS D:\kafka_2.13-4.1.1> .\bin\windows\kafka-storage.bat random-uuid
  3. nOhIgFFsQm2aiodLw8y7cQ
  4. PS D:\kafka_2.13-4.1.1> bin\windows\kafka-storage.bat format -t nOhIgFFsQm2aiodLw8y7cQ -c config/server.properties --ignore-formatted
  5. Formatting metadata directory /tmp/kraft-combined-logs with metadata.version 4.1-IV1.
  6. PS D:\kafka_2.13-4.1.1> bin\windows\kafka-server-start.bat config/server.properties
  7. DEPRECATED: A Log4j 1.x configuration file has been detected, which is no longer recommended.
  8. 。。。
  9. [2025-12-12 17:31:28,080] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
  10. [2025-12-12 17:31:28,082] INFO Kafka version: 4.1.1 (org.apache.kafka.common.utils.AppInfoParser)
  11. [2025-12-12 17:31:28,082] INFO Kafka commitId: be816b82d25370ce (org.apache.kafka.common.utils.AppInfoParser)
  12. [2025-12-12 17:31:28,084] INFO Kafka startTimeMs: 1765531888081 (org.apache.kafka.common.utils.AppInfoParser)
  13. [2025-12-12 17:31:28,085] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
  14. # 至此重新启动成功,然后另打开一个窗口,进行下面操作:
  15. # 添加 topic
  16. PS D:\kafka_2.13-4.1.1> .\bin\windows\kafka-topics.bat --create --topic my-logs-topic1212 --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  17. Created topic my-logs-topic1212.
  18. #查看已添加的 topic
  19. PS D:\kafka_2.13-4.1.1> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
  20. my-logs-topic1212
复制代码
4)验证 Kafka 已经启动
  1. # 可以列出 topic 表明:Kafka Broker 已接受客户端连接,并能响应元数据请求
  2. PS D:\kafka_2.13-4.1.1> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
  3. my-logs-topic1212
  4. # Kafka 是 Java 应用,必须有 java.exe 进程
  5. # 后续 netstat 显示该 PID 监听 9092 端口,确认就是 Kafka 进程
  6. PS D:\kafka_2.13-4.1.1> tasklist /fi "imagename eq java.exe"
  7. 映像名称                       PID 会话名              会话#       内存使用
  8. ========================= ======== ================ =========== ============
  9. java.exe                     90224 Console                    2    432,272 K
  10. # 9092 端口处于 LISTENING 状态,且 PID 匹配
  11. # 表示 Kafka Broker 正在监听所有 IPv4/IPv6 接口的 9092 端口
  12. # PID 90224 与 tasklist 中的 Java 进程一致,确认是 Kafka 在监听
  13. PS D:\kafka_2.13-4.1.1> netstat -ano | findstr :9092
  14.   TCP    0.0.0.0:9092           0.0.0.0:0              LISTENING       90224
  15.   TCP    127.0.0.1:52704        127.0.0.1:9092         TIME_WAIT       0
  16.   TCP    127.0.0.1:52706        127.0.0.1:9092         TIME_WAIT       0
  17.   TCP    127.0.0.1:52758        127.0.0.1:9092         TIME_WAIT       0
  18.   TCP    127.0.0.1:52766        127.0.0.1:9092         TIME_WAIT       0
  19.   TCP    [::]:9092              [::]:0                 LISTENING       90224
  20. # 多个 TIME_WAIT 是之前 kafka-topics.bat 建立的短连接关闭后的正常状态
复制代码
1.1.2 测试往 Kafka 队列中写入消息

1)创建 .NET 8.0 控制台应用程序,然后安装必要的包:
  1. dotnet add package Serilog
  2. dotnet add package Serilog.Sinks.Console
  3. dotnet add package Serilog.Sinks.ConfluentKafka
复制代码
3.png

2)修改 Program.cs
  1. using Confluent.Kafka;
  2. using Serilog;
  3. using Serilog.Formatting.Json;
  4. using Serilog.Sinks.Kafka;
  5. // 配置 Kafka Producer
  6. var kafkaConfig = new ProducerConfig
  7. {
  8.     BootstrapServers = "localhost:9092",
  9.     ClientId = "serilog-dotnet8-demo",
  10.     Acks = Acks.All,
  11.     EnableIdempotence = true,
  12.     MessageTimeoutMs = 30000
  13. };
  14. // 配置 Serilog
  15. Log.Logger = new LoggerConfiguration()
  16.     .MinimumLevel.Information()
  17.     .WriteTo.Console() // 可选:同时输出到控制台
  18.     .WriteTo.Kafka(
  19.         topic: "my-logs-topic1212",
  20.         null,
  21.         producerConfig: kafkaConfig,
  22.         formatter: new JsonFormatter() // 或自定义格式
  23.     )
  24. .CreateLogger();
  25. // 测试日志
  26. Log.Information("应用程序启动成功");
  27. Log.Warning("用户 {UserId} 尝试了高风险操作", 123);
  28. Log.Error(new Exception("数据库连接失败"), "数据库错误");
  29. // 模拟业务
  30. for (int i = 1; i <= 3; i++)
  31. {
  32.     Log.Information("处理任务 {TaskId},进度 {Progress:P}", i, i / 3.0);
  33.     await Task.Delay(200);
  34. }
  35. // 确保日志发送完成
  36. Log.CloseAndFlush();
  37. Console.WriteLine("\n日志已发送到 Kafka 主题 'app-logs'。按任意键退出...");
  38. Console.ReadKey();
复制代码
2)下载和安装 RabbitMQ
官方下载地址:https://www.rabbitmq.com/docs/install-windows#downloads。
下载 RabbitMQ Server Windows Installer (.exe)。例如:rabbitmq-server-4.2.1.exe。推荐使用 .exe 安装包(带图形界面和 Windows 服务),而非 .zip 压缩包。
4.png

安装完成后,同样需要修改环境变量:
新增一个,RABBITMQ_SERVER:C:\Program Files\RabbitMQ Server。
追加 Path:%RABBITMQ_SERVER%\bin。
最后,验证是否安装成功。
  1. # 使用命令查看
  2. .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic <your-topic> --from-beginning
  3. # 【kafka-console-consumer.bat】
  4. # Kafka 自带的 命令行消费者工具(Console Consumer)
  5. # 【--bootstrap-server localhost:9092】
  6. # 指定 Kafka Broker 的连接地址(KRaft 模式下必需)
  7. # 格式:host:port,多个 Broker 用逗号分隔,如 host1:9092,host2:9092
  8. # 为什么叫 "bootstrap"?客户端只需连接其中一个 Broker,即可获取整个集群的元数据(包括所有 Topic、Partition 分布等),后续会直接与对应 Partition 的 Leader 通信
  9. # 【--topic <your-topic>】
  10. # 指定要消费的 Topic 名称,这里一次只能指定 一个 Topic
  11. # 【--from-beginning】
  12. # 从该 Topic 的最早消息(offset = 0)开始消费,而不是从最新位置开始
  13. # 【--property print.key=true】同时打印消息的 Key(默认只打印 Value)
  14. # 【--property print.timestamp=true】打印消息的时间戳(CreateTime 或 LogAppendTime)
复制代码
另外,RabbitMQ 自带一个 Web 管理界面(Management Plugin),默认未启用。
  1. # 即使 Kafka 正确输出 UTF-8,Windows 控制台默认用 GBK 显示,仍会乱码
  2. # 在 PowerShell 或 CMD 中执行:
  3. chcp 65001  # 65001 是 UTF-8 的代码页
  4. # PowerShell 中设置环境变量后启动
  5. $env:KAFKA_OPTS="-Dfile.encoding=UTF-8"
  6. # 然后再重新运行 consumer 命令
  7. .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic <your-topic> --from-beginning
复制代码
启用后需要手动重启下 RabbitMQ 服务,然后通过 http://localhost:15672/ 访问。需要登录账号和密码,均为 guest。
注意:guest 用户默认只能从 localhost 登录,若需远程访问,需创建新用户并授权。
在 sbin 目录下可使用以下命令:
命令说明.\rabbitmqctl.bat status查看节点状态.\rabbitmqctl.bat list_users列出用户.\rabbitmqctl.bat add_user myuser mypass添加用户.\rabbitmqctl.bat set_user_tags myuser administrator设置用户为管理员.\rabbitmqctl.bat set_permissions -p / myuser ".*" ".*" ".*"授予 vhost / 的全部权限配置允许远程访问:
  1. WARN Stopping serving logs in dir D:\kafka_2.13-4.1.1\data\kafka-logs (kafka.log.LogManager)
  2. ERROR Shutdown broker because all log dirs in D:\kafka_2.13-4.1.1\data\kafka-logs have failed (kafka.log.LogManager)
复制代码
1.2.2 通过 Serilog.Sinks.RabbitMQ 8.0.0 实现讲消息写入到队列

示例程序的目标:将日志信息通过 localhost guest 账号,推送到 amq.fanout 交换机。
1)创建项目并引入必要的包
创建一个基于 .net8 的控制台应用程序,并引入以下包:
  1. C:\Windows\system32>erl
  2. Erlang/OTP 27 [erts-15.2.3] [source] [64-bit] [smp:20:20] [ds:20:20:10] [async-threads:1] [jit:ns]
  3. Eshell V15.2.3 (press Ctrl+G to abort, type help(). for help)
  4. 1>
复制代码
2)修改 Program.cs
  1. C:\Windows\system32>sc query RabbitMQ
  2. SERVICE_NAME: RabbitMQ
  3.         TYPE               : 10  WIN32_OWN_PROCESS
  4.         STATE              : 4  RUNNING
  5.                                 (STOPPABLE, NOT_PAUSABLE, ACCEPTS_SHUTDOWN)
  6.         WIN32_EXIT_CODE    : 0  (0x0)
  7.         SERVICE_EXIT_CODE  : 0  (0x0)
  8.         CHECKPOINT         : 0x0
  9.         WAIT_HINT          : 0x0
复制代码
在测试写入日志之前,需要先手动新增队列:logs_first,然后再将此队列加入到 amq.fanout 交换机中。
新增队列,填入 Name,然后其他默认,直接保存。
5.png

将新增的队列绑定到交换机 amq.fanout 中:(直接输入上一步新增的队列,点击 Bind)
6.png

然后运行程序,查看队列中的消息,确认是否写入成功。
1.2.3 查看消息写入结果

点击队列名,进入队列的详细信息。
7.png

点击(Get Message(s))获取指定条数队列中的消息。
8.png

如下是消息的详细信息:
  1. # 以管理员身份打开 命令提示符 或 PowerShell,执行(共两步):
  2. # 1)进入 RabbitMQ 安装目录的 sbin 文件夹(通常如下,版本号需要改成本地的)
  3. C:\Program Files\RabbitMQ Server>cd C:\Program Files\RabbitMQ Server\rabbitmq_server-4.2.1\sbin
  4. # 2)启用管理插件
  5. C:\Program Files\RabbitMQ Server\rabbitmq_server-4.2.1\sbin>.\rabbitmq-plugins.bat enable rabbitmq_management
  6. Enabling plugins on node rabbit@chengzijia:
  7. rabbitmq_management
  8. The following plugins have been configured:
  9.   rabbitmq_management
  10.   rabbitmq_management_agent
  11.   rabbitmq_web_dispatch
  12. Applying plugin configuration to rabbit@chengzijia...
  13. The following plugins have been enabled:
  14.   rabbitmq_management
  15.   rabbitmq_management_agent
  16.   rabbitmq_web_dispatch
  17. set 3 plugins.
  18. Offline change; changes will take effect at broker restart.
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册