今天整理了.NET 10类库新增的几个常用功能,按老规矩,分享给大家:
.NET 10 新增了 WebSocketStream一个新的 API,用于简化 .NET 中一些最常见的WebSocket 的流式处理方案。
传统 WebSocket API 级别较低,需要大量的代码:处理缓冲和框架、重建消息、管理编码/解码以及编写自定义包装器以与流、通道或其他传输抽象集成。
这些复杂性使得很难将 WebSocket 用作传输,尤其是对于具有流式处理或基于文本的协议或事件驱动的处理程序的应用。
WebSocketStream 通过提供 Stream基于 WebSocket 的抽象来解决这个难题。
这样就可以与现有 API 无缝集成,以便读取、写入和分析数据,无论是二进制数据还是文本,并减少了手动管道的需求。
WebSocketStream 为常见的 WebSocket 生产环境应用启用了高级高阶API。
WebSocketStream 到底解决了什么问题
在 .NET 10 之前:
- System.Net.WebSockets.WebSocket
- 消息模型(Message-based)
- 必须自己处理:
- 分片(Fragment)
- 消息边界
- 循环 Receive / Send
- 无法像 Stream 一样统一抽象
结果是:
- WebSocket 写法 复杂、重复、易错
- 很难和现有 Stream 生态(压缩、加密、管道、序列化)整合
- WebSocket 和 TCP / NamedPipe / HTTP Body 的编程模型割裂
WebSocketStream 的本质变化
.NET 10 提供的 WebSocketStream:
把 WebSocket 连接抽象为一个真正的 Stream 这意味着:
- 不再关心消息帧、分片、边界
- 可以直接:
- 能无缝接入:
- PipeReader / PipeWriter
- System.IO.Stream 全套中间件
这是一次模型级升级,而不是语法糖。
给出一个示例代码:- using System;
- using System.IO;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- // Streaming binary protocol (for example, AMQP).
- Stream transportStream = WebSocketStream.Create(
- connectedWebSocket,
- WebSocketMessageType.Binary,
- closeTimeout: TimeSpan.FromSeconds(10));
- await message.SerializeToStreamAsync(transportStream, cancellationToken);
- var receivePayload = new byte[payloadLength];
- await transportStream.ReadExactlyAsync(receivePayload, cancellationToken);
- transportStream.Dispose();
- // `Dispose` automatically handles closing handshake.
复制代码 流式处理文本协议(例如 STOMP)
- using System.IO;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- // Streaming text protocol (for example, STOMP).
- using Stream transportStream = WebSocketStream.Create(
- connectedWebSocket,
- WebSocketMessageType.Text,
- ownsWebSocket: true);
- // Integration with Stream-based APIs.
- // Don't close the stream, as it's also used for writing.
- using var transportReader = new StreamReader(transportStream, leaveOpen: true);
- var line = await transportReader.ReadLineAsync(cancellationToken); // Automatic UTF-8 and new line handling.
- transportStream.Dispose(); // Automatic closing handshake handling on `Dispose`.
复制代码 流式处理二进制协议(例如 AMQP)
- using System;
- using System.IO;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- // Streaming binary protocol (for example, AMQP).
- Stream transportStream = WebSocketStream.Create(
- connectedWebSocket,
- WebSocketMessageType.Binary,
- closeTimeout: TimeSpan.FromSeconds(10));
- await message.SerializeToStreamAsync(transportStream, cancellationToken);
- var receivePayload = new byte[payloadLength];
- await transportStream.ReadExactlyAsync(receivePayload, cancellationToken);
- transportStream.Dispose();
- // `Dispose` automatically handles closing handshake.
复制代码 以流形式读取单个消息(例如 JSON 反序列化)
- using System.IO;
- using System.Net.WebSockets;
- using System.Text.Json;
- // Reading a single message as a stream (for example, JSON deserialization).
- using Stream messageStream = WebSocketStream.CreateReadableMessageStream(connectedWebSocket, WebSocketMessageType.Text);
- // JsonSerializer.DeserializeAsync reads until the end of stream.
- var appMessage = await JsonSerializer.DeserializeAsync(messageStream);
复制代码 将单个消息编写为流(例如,二进制序列化)
- using System;
- using System.IO;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- // Writing a single message as a stream (for example, binary serialization).
- public async Task SendMessageAsync(AppMessage message, CancellationToken cancellationToken)
- {
- using Stream messageStream = WebSocketStream.CreateWritableMessageStream(_connectedWebSocket, WebSocketMessageType.Binary);
- foreach (ReadOnlyMemory<byte> chunk in message.SplitToChunks())
- {
- await messageStream.WriteAsync(chunk, cancellationToken);
- }
- } // EOM sent on messageStream.Dispose().
复制代码 总结:WebSocketStream 的真实应用场景
场景一:实时数据流(Streaming Data)
典型业务
- 充电站实时状态推送(功率、电流、电压)
- 实时监控大屏(指标流)
- 行情 / 订单流
- 日志 / Trace 实时订阅
使用 WebSocketStream 后WebSocket = 一条持续的数据流
场景二:AI / LLM 实时输出(Token Streaming)
典型业务
- 大模型推理流式返回
- Copilot / Chat / Agent 推理过程
- SSE + WebSocket 混合方案
场景三:大文件 / 二进制流实时传输
using var file = File.OpenRead(path);
await file.CopyToAsync(webSocketStream);
更像 TCP,但仍是 WebSocket
场景四:RPC / 协议隧道(Protocol Tunneling)
- 可以直接跑 已有 Stream 协议
- gRPC-like
- 自定义 framing
- 设备通信协议
await protocolHandler.RunAsync(webSocketStream);
场景五:与 System.IO.Pipelines 深度整合
构建高性能服务端
var reader = PipeReader.Create(webSocketStream);
var writer = PipeWriter.Create(webSocketStream);
- 用 Pipelines 做:
- 和 Kestrel / gRPC / 自研协议一致
适合:
以上总结,分享给大家。
周国庆
2026/1/5
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |