WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器和客户端之间进行实时双向通信。
基本使用
1. 创建 WebSocket 连接
- // 创建 WebSocket 连接
- const socket = new WebSocket('ws://localhost:8080');
- // 或者使用安全连接
- const secureSocket = new WebSocket('wss://example.com/socket');
复制代码 2. WebSocket 事件
- // 连接建立时触发
- socket.onopen = function(event) {
- console.log('连接已建立');
- socket.send('Hello Server!');
- };
- // 接收到消息时触发
- socket.onmessage = function(event) {
- console.log('收到消息:', event.data);
- // 处理接收到的数据
- };
- // 发生错误时触发
- socket.onerror = function(error) {
- console.error('WebSocket 错误:', error);
- };
- // 连接关闭时触发
- socket.onclose = function(event) {
- console.log('连接关闭', event.code, event.reason);
- // 可以在这里尝试重连
- };
复制代码 完整示例
客户端示例
- <!DOCTYPE html>
- <html>
- <head>
- <title>WebSocket 示例</title>
- </head>
- <body>
-
- <input type="text" id="messageInput" placeholder="输入消息">
- <button onclick="sendMessage()">发送</button>
-
-
-
- </body>
- </html>
复制代码 Node.js 服务器端示例
- // 使用 ws 库
- const WebSocket = require('ws');
- // 创建 WebSocket 服务器
- const wss = new WebSocket.Server({ port: 8080 });
- console.log('WebSocket 服务器启动在 ws://localhost:8080');
- // 连接处理
- wss.on('connection', function connection(ws) {
- console.log('新客户端连接');
-
- // 发送欢迎消息
- ws.send(JSON.stringify({
- type: 'system',
- message: '欢迎连接到服务器!'
- }));
-
- // 接收客户端消息
- ws.on('message', function incoming(message) {
- console.log('收到消息:', message);
-
- try {
- const data = JSON.parse(message);
-
- // 广播消息给所有客户端
- wss.clients.forEach(function each(client) {
- if (client !== ws && client.readyState === WebSocket.OPEN) {
- client.send(JSON.stringify({
- type: 'message',
- sender: '用户',
- message: data.content,
- timestamp: new Date().toISOString()
- }));
- }
- });
- } catch (error) {
- console.error('消息解析错误:', error);
- }
- });
-
- // 连接关闭
- ws.on('close', function() {
- console.log('客户端断开连接');
- });
-
- // 错误处理
- ws.on('error', function(error) {
- console.error('WebSocket 错误:', error);
- });
- });
复制代码 WebSocket 状态
- // 检查连接状态
- switch(socket.readyState) {
- case WebSocket.CONNECTING: // 0 - 连接中
- console.log('连接中...');
- break;
- case WebSocket.OPEN: // 1 - 已连接
- console.log('已连接');
- break;
- case WebSocket.CLOSING: // 2 - 关闭中
- console.log('正在关闭...');
- break;
- case WebSocket.CLOSED: // 3 - 已关闭
- console.log('已关闭');
- break;
- }
复制代码 高级特性
1. 心跳检测
- // 心跳检测
- let heartbeatInterval;
- socket.onopen = function() {
- console.log('连接建立');
-
- // 开始心跳
- heartbeatInterval = setInterval(() => {
- if (socket.readyState === WebSocket.OPEN) {
- socket.send(JSON.stringify({ type: 'ping' }));
- }
- }, 30000);
- };
- socket.onclose = function() {
- // 清除心跳
- clearInterval(heartbeatInterval);
- };
复制代码 2. 重连机制
- class WebSocketClient {
- constructor(url) {
- this.url = url;
- this.socket = null;
- this.reconnectAttempts = 0;
- this.maxReconnectAttempts = 5;
- this.reconnectDelay = 1000;
- }
-
- connect() {
- this.socket = new WebSocket(this.url);
-
- this.socket.onopen = () => {
- console.log('连接成功');
- this.reconnectAttempts = 0;
- };
-
- this.socket.onclose = (event) => {
- console.log('连接断开,尝试重连...');
- this.reconnect();
- };
-
- this.socket.onerror = (error) => {
- console.error('连接错误:', error);
- };
- }
-
- reconnect() {
- if (this.reconnectAttempts < this.maxReconnectAttempts) {
- this.reconnectAttempts++;
- const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
-
- setTimeout(() => {
- console.log(`第 ${this.reconnectAttempts} 次重连`);
- this.connect();
- }, delay);
- } else {
- console.error('重连次数已达上限');
- }
- }
-
- send(data) {
- if (this.socket.readyState === WebSocket.OPEN) {
- this.socket.send(data);
- }
- }
- }
复制代码 3. 二进制数据传输
- // 发送二进制数据
- socket.onopen = function() {
- // 发送 ArrayBuffer
- const buffer = new ArrayBuffer(4);
- const view = new Uint8Array(buffer);
- view[0] = 1;
- view[1] = 2;
- view[2] = 3;
- view[3] = 4;
-
- socket.send(buffer);
-
- // 发送 Blob
- const blob = new Blob(['Hello'], { type: 'text/plain' });
- socket.send(blob);
- };
- // 接收二进制数据
- socket.binaryType = 'arraybuffer'; // 或 'blob'
- socket.onmessage = function(event) {
- if (event.data instanceof ArrayBuffer) {
- // 处理 ArrayBuffer
- const view = new Uint8Array(event.data);
- console.log('收到二进制数据:', view);
- } else {
- // 处理文本数据
- console.log('收到文本数据:', event.data);
- }
- };
复制代码 Spring Boot 中使用 WebSocket
添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-websocket</artifactId>
- </dependency>
复制代码 基础配置类
- @Configuration
- @EnableWebSocketMessageBroker
- public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
-
- @Override
- public void configureMessageBroker(MessageBrokerRegistry config) {
- // 消息代理前缀
- config.enableSimpleBroker("/topic", "/queue");
- // 应用目的地前缀
- config.setApplicationDestinationPrefixes("/app");
- // 用户目的地前缀(一对一消息)
- config.setUserDestinationPrefix("/user");
- }
-
- @Override
- public void registerStompEndpoints(StompEndpointRegistry registry) {
- // 注册 WebSocket 端点
- registry.addEndpoint("/ws")
- .setAllowedOriginPatterns("*")
- .withSockJS(); // 支持 SockJS 降级
-
- // 也可以添加多个端点
- registry.addEndpoint("/ws-native")
- .setAllowedOriginPatterns("*");
- }
-
- @Override
- public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
- // 配置传输限制
- registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB
- registration.setSendTimeLimit(20 * 1000); // 发送超时 20秒
- registration.setSendBufferSizeLimit(512 * 1024); // 发送缓冲区限制 512KB
- }
- }
复制代码 控制器示例
- @Controller
- public class WebSocketController {
-
- // 注入消息模板
- @Autowired
- private SimpMessagingTemplate messagingTemplate;
-
- /**
- * 处理客户端发送的消息
- * 目的地:/app/chat
- */
- @MessageMapping("/chat")
- @SendTo("/topic/messages")
- public ChatMessage handleMessage(ChatMessage message) {
- message.setTimestamp(new Date());
- System.out.println("收到消息: " + message.getContent());
- return message;
- }
-
- /**
- * 发送广播消息
- */
- @GetMapping("/broadcast")
- public void broadcast(String content) {
- ChatMessage message = new ChatMessage();
- message.setContent(content);
- message.setSender("系统");
- message.setTimestamp(new Date());
-
- // 发送到 /topic/messages
- messagingTemplate.convertAndSend("/topic/messages", message);
- }
-
- /**
- * 发送点对点消息
- */
- @GetMapping("/sendToUser")
- public void sendToUser(String userId, String content) {
- ChatMessage message = new ChatMessage();
- message.setContent(content);
- message.setSender("管理员");
- message.setTimestamp(new Date());
-
- // 发送给指定用户:/user/{userId}/queue/messages
- messagingTemplate.convertAndSendToUser(
- userId,
- "/queue/messages",
- message
- );
- }
- }
- // 消息实体类
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class ChatMessage {
- private String sender;
- private String content;
- private Date timestamp;
- }
复制代码 连接拦截器
- @Component
- public class WebSocketInterceptor extends ChannelInterceptorAdapter {
-
- @Override
- public Message<?> preSend(Message<?> message, MessageChannel channel) {
- StompHeaderAccessor accessor =
- MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
-
- if (StompCommand.CONNECT.equals(accessor.getCommand())) {
- // 连接建立时处理
- String token = accessor.getFirstNativeHeader("token");
- // 验证 token...
- System.out.println("用户连接: " + accessor.getSessionId());
- } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
- // 连接断开时处理
- System.out.println("用户断开: " + accessor.getSessionId());
- }
-
- return message;
- }
- }
复制代码 原生 Java WebSocket(JSR 356)
注解方式
- @ServerEndpoint("/chat/{userId}")
- @Component
- public class ChatEndpoint {
-
- // 存储所有连接
- private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
-
- // 存储用户ID和session的映射
- private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
-
- /**
- * 连接建立时调用
- */
- @OnOpen
- public void onOpen(Session session, @PathParam("userId") String userId) {
- System.out.println("连接建立: " + session.getId() + ", 用户: " + userId);
-
- // 保存连接
- sessions.put(session.getId(), session);
- userSessionMap.put(userId, session.getId());
-
- // 通知其他用户有新用户上线
- broadcast("系统", "用户 " + userId + " 上线了");
- }
-
- /**
- * 收到消息时调用
- */
- @OnMessage
- public void onMessage(String message, Session session,
- @PathParam("userId") String userId) {
- System.out.println("收到消息: " + message + " from: " + userId);
-
- try {
- // 解析消息
- JSONObject json = new JSONObject(message);
- String content = json.getString("content");
- String toUserId = json.optString("to", null);
-
- if (toUserId != null) {
- // 私聊消息
- sendToUser(userId, toUserId, content);
- } else {
- // 群发消息
- broadcast(userId, content);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 连接关闭时调用
- */
- @OnClose
- public void onClose(Session session, @PathParam("userId") String userId) {
- System.out.println("连接关闭: " + session.getId());
-
- // 移除连接
- sessions.remove(session.getId());
- userSessionMap.remove(userId);
-
- // 通知其他用户
- broadcast("系统", "用户 " + userId + " 下线了");
- }
-
- /**
- * 发生错误时调用
- */
- @OnError
- public void onError(Session session, Throwable error) {
- System.out.println("连接错误: " + session.getId());
- error.printStackTrace();
- }
-
- /**
- * 广播消息给所有用户
- */
- private void broadcast(String sender, String content) {
- JSONObject message = new JSONObject();
- message.put("sender", sender);
- message.put("content", content);
- message.put("timestamp", System.currentTimeMillis());
- message.put("type", "broadcast");
-
- // 发送给所有连接的客户端
- for (Session session : sessions.values()) {
- if (session.isOpen()) {
- try {
- session.getAsyncRemote().sendText(message.toString());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- /**
- * 发送私聊消息
- */
- private void sendToUser(String fromUserId, String toUserId, String content) {
- String toSessionId = userSessionMap.get(toUserId);
- if (toSessionId != null) {
- Session toSession = sessions.get(toSessionId);
- if (toSession != null && toSession.isOpen()) {
- try {
- JSONObject message = new JSONObject();
- message.put("sender", fromUserId);
- message.put("content", content);
- message.put("timestamp", System.currentTimeMillis());
- message.put("type", "private");
-
- toSession.getAsyncRemote().sendText(message.toString());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
复制代码 编程方式(继承 Endpoint 类)
- @ServerEndpoint("/game")
- public class GameEndpoint extends Endpoint {
-
- private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
-
- @Override
- public void onOpen(Session session, EndpointConfig config) {
- System.out.println("新连接: " + session.getId());
- sessions.add(session);
-
- // 添加消息处理器
- session.addMessageHandler(new MessageHandler.Whole<String>() {
- @Override
- public void onMessage(String message) {
- System.out.println("收到: " + message);
- // 处理游戏逻辑
- handleGameMessage(session, message);
- }
- });
-
- // 发送欢迎消息
- try {
- JSONObject welcome = new JSONObject();
- welcome.put("type", "welcome");
- welcome.put("message", "欢迎加入游戏!");
- welcome.put("sessionId", session.getId());
- session.getBasicRemote().sendText(welcome.toString());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onClose(Session session, CloseReason closeReason) {
- System.out.println("连接关闭: " + session.getId());
- sessions.remove(session);
-
- // 通知其他玩家
- broadcastPlayerLeft(session.getId());
- }
-
- @Override
- public void onError(Session session, Throwable thr) {
- System.err.println("连接错误: " + session.getId());
- thr.printStackTrace();
- }
-
- private void handleGameMessage(Session session, String message) {
- try {
- JSONObject json = new JSONObject(message);
- String type = json.getString("type");
-
- switch (type) {
- case "move":
- // 处理移动
- handlePlayerMove(session, json);
- break;
- case "chat":
- // 处理聊天
- handleChatMessage(session, json);
- break;
- default:
- System.out.println("未知消息类型: " + type);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void handlePlayerMove(Session session, JSONObject moveData) {
- // 处理玩家移动逻辑
- // 广播给所有玩家
- broadcastGameUpdate(moveData);
- }
-
- private void handleChatMessage(Session session, JSONObject chatData) {
- // 广播聊天消息
- JSONObject broadcastMsg = new JSONObject();
- broadcastMsg.put("type", "chat");
- broadcastMsg.put("sender", session.getId());
- broadcastMsg.put("message", chatData.getString("message"));
- broadcastMsg.put("timestamp", System.currentTimeMillis());
-
- broadcast(broadcastMsg.toString());
- }
-
- private void broadcast(String message) {
- synchronized (sessions) {
- for (Session s : sessions) {
- if (s.isOpen()) {
- try {
- s.getBasicRemote().sendText(message);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
复制代码 配置文件
application.yml 配置
- spring:
- websocket:
- # WebSocket 配置
- enabled: true
-
- server:
- # 服务器配置
- port: 8080
- servlet:
- context-path: /api
-
- # 自定义配置
- websocket:
- max-sessions: 1000
- heartbeat-interval: 30000
- max-message-size: 128KB
复制代码 心跳检测和连接管理
- @Component
- public class WebSocketHeartbeat {
-
- @Autowired
- private SimpMessagingTemplate messagingTemplate;
-
- private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
- @PostConstruct
- public void init() {
- // 每30秒发送一次心跳
- scheduler.scheduleAtFixedRate(() -> {
- try {
- messagingTemplate.convertAndSend("/topic/heartbeat",
- Map.of("timestamp", System.currentTimeMillis(),
- "type", "heartbeat"));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }, 0, 30, TimeUnit.SECONDS);
- }
-
- @PreDestroy
- public void destroy() {
- scheduler.shutdown();
- }
- }
复制代码 消息编码器/解码器
- // 自定义消息编解码器
- @Component
- public class ChatMessageConverter implements MessageConverter {
-
- @Override
- public Message<?> toMessage(Object payload, MessageHeaders headers) {
- if (payload instanceof ChatMessage) {
- ChatMessage msg = (ChatMessage) payload;
- byte[] bytes = serializeMessage(msg);
- return MessageBuilder.createMessage(bytes, headers);
- }
- return null;
- }
-
- @Override
- public Object fromMessage(Message<?> message, Class<?> targetClass) {
- if (targetClass == ChatMessage.class) {
- byte[] bytes = (byte[]) message.getPayload();
- return deserializeMessage(bytes);
- }
- return null;
- }
-
- private byte[] serializeMessage(ChatMessage message) {
- try {
- return new ObjectMapper().writeValueAsBytes(message);
- } catch (Exception e) {
- throw new RuntimeException("序列化失败", e);
- }
- }
-
- private ChatMessage deserializeMessage(byte[] bytes) {
- try {
- return new ObjectMapper().readValue(bytes, ChatMessage.class);
- } catch (Exception e) {
- throw new RuntimeException("反序列化失败", e);
- }
- }
- }
复制代码 集群支持
- @Configuration
- @EnableRedisRepositories
- public class RedisConfig {
-
- @Bean
- public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- return container;
- }
-
- @Bean
- public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
- RedisTemplate<String, Object> template = new RedisTemplate<>();
- template.setConnectionFactory(factory);
- template.setKeySerializer(new StringRedisSerializer());
- template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
- return template;
- }
- }
- // Redis 广播消息
- @Component
- public class RedisMessagePublisher {
-
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
-
- public void publish(String channel, Object message) {
- redisTemplate.convertAndSend(channel, message);
- }
- }
- @Component
- public class RedisMessageSubscriber implements MessageListener {
-
- @Autowired
- private SimpMessagingTemplate messagingTemplate;
-
- @Override
- public void onMessage(Message message, byte[] pattern) {
- // 处理从 Redis 收到的消息
- // 转发给 WebSocket 客户端
- String channel = new String(pattern);
- String msg = new String(message.getBody());
-
- messagingTemplate.convertAndSend("/topic/" + channel, msg);
- }
- }
复制代码 Spring Boot 的 STOMP 实现更加完整和易于使用,而原生 WebSocket 则更加灵活。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |