找回密码
 立即注册
首页 业界区 业界 WebSocket 的使用

WebSocket 的使用

拍棹 前天 10:00
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器和客户端之间进行实时双向通信。
基本使用

1. 创建 WebSocket 连接
  1. // 创建 WebSocket 连接
  2. const socket = new WebSocket('ws://localhost:8080');
  3. // 或者使用安全连接
  4. const secureSocket = new WebSocket('wss://example.com/socket');
复制代码
2. WebSocket 事件
  1. // 连接建立时触发
  2. socket.onopen = function(event) {
  3.     console.log('连接已建立');
  4.     socket.send('Hello Server!');
  5. };
  6. // 接收到消息时触发
  7. socket.onmessage = function(event) {
  8.     console.log('收到消息:', event.data);
  9.     // 处理接收到的数据
  10. };
  11. // 发生错误时触发
  12. socket.onerror = function(error) {
  13.     console.error('WebSocket 错误:', error);
  14. };
  15. // 连接关闭时触发
  16. socket.onclose = function(event) {
  17.     console.log('连接关闭', event.code, event.reason);
  18.     // 可以在这里尝试重连
  19. };
复制代码
完整示例

客户端示例
  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4.     <title>WebSocket 示例</title>
  5. </head>
  6. <body>
  7.    
  8.         <input type="text" id="messageInput" placeholder="输入消息">
  9.         <button onclick="sendMessage()">发送</button>
  10.    
  11.    
  12.    
  13. </body>
  14. </html>
复制代码
Node.js 服务器端示例
  1. // 使用 ws 库
  2. const WebSocket = require('ws');
  3. // 创建 WebSocket 服务器
  4. const wss = new WebSocket.Server({ port: 8080 });
  5. console.log('WebSocket 服务器启动在 ws://localhost:8080');
  6. // 连接处理
  7. wss.on('connection', function connection(ws) {
  8.     console.log('新客户端连接');
  9.    
  10.     // 发送欢迎消息
  11.     ws.send(JSON.stringify({
  12.         type: 'system',
  13.         message: '欢迎连接到服务器!'
  14.     }));
  15.    
  16.     // 接收客户端消息
  17.     ws.on('message', function incoming(message) {
  18.         console.log('收到消息:', message);
  19.         
  20.         try {
  21.             const data = JSON.parse(message);
  22.             
  23.             // 广播消息给所有客户端
  24.             wss.clients.forEach(function each(client) {
  25.                 if (client !== ws && client.readyState === WebSocket.OPEN) {
  26.                     client.send(JSON.stringify({
  27.                         type: 'message',
  28.                         sender: '用户',
  29.                         message: data.content,
  30.                         timestamp: new Date().toISOString()
  31.                     }));
  32.                 }
  33.             });
  34.         } catch (error) {
  35.             console.error('消息解析错误:', error);
  36.         }
  37.     });
  38.    
  39.     // 连接关闭
  40.     ws.on('close', function() {
  41.         console.log('客户端断开连接');
  42.     });
  43.    
  44.     // 错误处理
  45.     ws.on('error', function(error) {
  46.         console.error('WebSocket 错误:', error);
  47.     });
  48. });
复制代码
WebSocket 状态
  1. // 检查连接状态
  2. switch(socket.readyState) {
  3.     case WebSocket.CONNECTING:  // 0 - 连接中
  4.         console.log('连接中...');
  5.         break;
  6.     case WebSocket.OPEN:        // 1 - 已连接
  7.         console.log('已连接');
  8.         break;
  9.     case WebSocket.CLOSING:     // 2 - 关闭中
  10.         console.log('正在关闭...');
  11.         break;
  12.     case WebSocket.CLOSED:      // 3 - 已关闭
  13.         console.log('已关闭');
  14.         break;
  15. }
复制代码
高级特性

1. 心跳检测
  1. // 心跳检测
  2. let heartbeatInterval;
  3. socket.onopen = function() {
  4.     console.log('连接建立');
  5.    
  6.     // 开始心跳
  7.     heartbeatInterval = setInterval(() => {
  8.         if (socket.readyState === WebSocket.OPEN) {
  9.             socket.send(JSON.stringify({ type: 'ping' }));
  10.         }
  11.     }, 30000);
  12. };
  13. socket.onclose = function() {
  14.     // 清除心跳
  15.     clearInterval(heartbeatInterval);
  16. };
复制代码
2. 重连机制
  1. class WebSocketClient {
  2.     constructor(url) {
  3.         this.url = url;
  4.         this.socket = null;
  5.         this.reconnectAttempts = 0;
  6.         this.maxReconnectAttempts = 5;
  7.         this.reconnectDelay = 1000;
  8.     }
  9.    
  10.     connect() {
  11.         this.socket = new WebSocket(this.url);
  12.         
  13.         this.socket.onopen = () => {
  14.             console.log('连接成功');
  15.             this.reconnectAttempts = 0;
  16.         };
  17.         
  18.         this.socket.onclose = (event) => {
  19.             console.log('连接断开,尝试重连...');
  20.             this.reconnect();
  21.         };
  22.         
  23.         this.socket.onerror = (error) => {
  24.             console.error('连接错误:', error);
  25.         };
  26.     }
  27.    
  28.     reconnect() {
  29.         if (this.reconnectAttempts < this.maxReconnectAttempts) {
  30.             this.reconnectAttempts++;
  31.             const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
  32.             
  33.             setTimeout(() => {
  34.                 console.log(`第 ${this.reconnectAttempts} 次重连`);
  35.                 this.connect();
  36.             }, delay);
  37.         } else {
  38.             console.error('重连次数已达上限');
  39.         }
  40.     }
  41.    
  42.     send(data) {
  43.         if (this.socket.readyState === WebSocket.OPEN) {
  44.             this.socket.send(data);
  45.         }
  46.     }
  47. }
复制代码
3. 二进制数据传输
  1. // 发送二进制数据
  2. socket.onopen = function() {
  3.     // 发送 ArrayBuffer
  4.     const buffer = new ArrayBuffer(4);
  5.     const view = new Uint8Array(buffer);
  6.     view[0] = 1;
  7.     view[1] = 2;
  8.     view[2] = 3;
  9.     view[3] = 4;
  10.    
  11.     socket.send(buffer);
  12.    
  13.     // 发送 Blob
  14.     const blob = new Blob(['Hello'], { type: 'text/plain' });
  15.     socket.send(blob);
  16. };
  17. // 接收二进制数据
  18. socket.binaryType = 'arraybuffer'; // 或 'blob'
  19. socket.onmessage = function(event) {
  20.     if (event.data instanceof ArrayBuffer) {
  21.         // 处理 ArrayBuffer
  22.         const view = new Uint8Array(event.data);
  23.         console.log('收到二进制数据:', view);
  24.     } else {
  25.         // 处理文本数据
  26.         console.log('收到文本数据:', event.data);
  27.     }
  28. };
复制代码
Spring Boot 中使用 WebSocket

添加依赖
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     spring-boot-starter-websocket</artifactId>
  4. </dependency>
复制代码
基础配置类
  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4.    
  5.     @Override
  6.     public void configureMessageBroker(MessageBrokerRegistry config) {
  7.         // 消息代理前缀
  8.         config.enableSimpleBroker("/topic", "/queue");
  9.         // 应用目的地前缀
  10.         config.setApplicationDestinationPrefixes("/app");
  11.         // 用户目的地前缀(一对一消息)
  12.         config.setUserDestinationPrefix("/user");
  13.     }
  14.    
  15.     @Override
  16.     public void registerStompEndpoints(StompEndpointRegistry registry) {
  17.         // 注册 WebSocket 端点
  18.         registry.addEndpoint("/ws")
  19.                 .setAllowedOriginPatterns("*")
  20.                 .withSockJS();  // 支持 SockJS 降级
  21.         
  22.         // 也可以添加多个端点
  23.         registry.addEndpoint("/ws-native")
  24.                 .setAllowedOriginPatterns("*");
  25.     }
  26.    
  27.     @Override
  28.     public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  29.         // 配置传输限制
  30.         registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB
  31.         registration.setSendTimeLimit(20 * 1000);    // 发送超时 20秒
  32.         registration.setSendBufferSizeLimit(512 * 1024); // 发送缓冲区限制 512KB
  33.     }
  34. }
复制代码
控制器示例
  1. @Controller
  2. public class WebSocketController {
  3.    
  4.     // 注入消息模板
  5.     @Autowired
  6.     private SimpMessagingTemplate messagingTemplate;
  7.    
  8.     /**
  9.      * 处理客户端发送的消息
  10.      * 目的地:/app/chat
  11.      */
  12.     @MessageMapping("/chat")
  13.     @SendTo("/topic/messages")
  14.     public ChatMessage handleMessage(ChatMessage message) {
  15.         message.setTimestamp(new Date());
  16.         System.out.println("收到消息: " + message.getContent());
  17.         return message;
  18.     }
  19.    
  20.     /**
  21.      * 发送广播消息
  22.      */
  23.     @GetMapping("/broadcast")
  24.     public void broadcast(String content) {
  25.         ChatMessage message = new ChatMessage();
  26.         message.setContent(content);
  27.         message.setSender("系统");
  28.         message.setTimestamp(new Date());
  29.         
  30.         // 发送到 /topic/messages
  31.         messagingTemplate.convertAndSend("/topic/messages", message);
  32.     }
  33.    
  34.     /**
  35.      * 发送点对点消息
  36.      */
  37.     @GetMapping("/sendToUser")
  38.     public void sendToUser(String userId, String content) {
  39.         ChatMessage message = new ChatMessage();
  40.         message.setContent(content);
  41.         message.setSender("管理员");
  42.         message.setTimestamp(new Date());
  43.         
  44.         // 发送给指定用户:/user/{userId}/queue/messages
  45.         messagingTemplate.convertAndSendToUser(
  46.             userId,
  47.             "/queue/messages",
  48.             message
  49.         );
  50.     }
  51. }
  52. // 消息实体类
  53. @Data
  54. @AllArgsConstructor
  55. @NoArgsConstructor
  56. public class ChatMessage {
  57.     private String sender;
  58.     private String content;
  59.     private Date timestamp;
  60. }
复制代码
连接拦截器
  1. @Component
  2. public class WebSocketInterceptor extends ChannelInterceptorAdapter {
  3.    
  4.     @Override
  5.     public Message<?> preSend(Message<?> message, MessageChannel channel) {
  6.         StompHeaderAccessor accessor =
  7.             MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
  8.         
  9.         if (StompCommand.CONNECT.equals(accessor.getCommand())) {
  10.             // 连接建立时处理
  11.             String token = accessor.getFirstNativeHeader("token");
  12.             // 验证 token...
  13.             System.out.println("用户连接: " + accessor.getSessionId());
  14.         } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
  15.             // 连接断开时处理
  16.             System.out.println("用户断开: " + accessor.getSessionId());
  17.         }
  18.         
  19.         return message;
  20.     }
  21. }
复制代码
原生 Java WebSocket(JSR 356)

注解方式
  1. @ServerEndpoint("/chat/{userId}")
  2. @Component
  3. public class ChatEndpoint {
  4.    
  5.     // 存储所有连接
  6.     private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
  7.    
  8.     // 存储用户ID和session的映射
  9.     private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
  10.    
  11.     /**
  12.      * 连接建立时调用
  13.      */
  14.     @OnOpen
  15.     public void onOpen(Session session, @PathParam("userId") String userId) {
  16.         System.out.println("连接建立: " + session.getId() + ", 用户: " + userId);
  17.         
  18.         // 保存连接
  19.         sessions.put(session.getId(), session);
  20.         userSessionMap.put(userId, session.getId());
  21.         
  22.         // 通知其他用户有新用户上线
  23.         broadcast("系统", "用户 " + userId + " 上线了");
  24.     }
  25.    
  26.     /**
  27.      * 收到消息时调用
  28.      */
  29.     @OnMessage
  30.     public void onMessage(String message, Session session,
  31.                          @PathParam("userId") String userId) {
  32.         System.out.println("收到消息: " + message + " from: " + userId);
  33.         
  34.         try {
  35.             // 解析消息
  36.             JSONObject json = new JSONObject(message);
  37.             String content = json.getString("content");
  38.             String toUserId = json.optString("to", null);
  39.             
  40.             if (toUserId != null) {
  41.                 // 私聊消息
  42.                 sendToUser(userId, toUserId, content);
  43.             } else {
  44.                 // 群发消息
  45.                 broadcast(userId, content);
  46.             }
  47.         } catch (Exception e) {
  48.             e.printStackTrace();
  49.         }
  50.     }
  51.    
  52.     /**
  53.      * 连接关闭时调用
  54.      */
  55.     @OnClose
  56.     public void onClose(Session session, @PathParam("userId") String userId) {
  57.         System.out.println("连接关闭: " + session.getId());
  58.         
  59.         // 移除连接
  60.         sessions.remove(session.getId());
  61.         userSessionMap.remove(userId);
  62.         
  63.         // 通知其他用户
  64.         broadcast("系统", "用户 " + userId + " 下线了");
  65.     }
  66.    
  67.     /**
  68.      * 发生错误时调用
  69.      */
  70.     @OnError
  71.     public void onError(Session session, Throwable error) {
  72.         System.out.println("连接错误: " + session.getId());
  73.         error.printStackTrace();
  74.     }
  75.    
  76.     /**
  77.      * 广播消息给所有用户
  78.      */
  79.     private void broadcast(String sender, String content) {
  80.         JSONObject message = new JSONObject();
  81.         message.put("sender", sender);
  82.         message.put("content", content);
  83.         message.put("timestamp", System.currentTimeMillis());
  84.         message.put("type", "broadcast");
  85.         
  86.         // 发送给所有连接的客户端
  87.         for (Session session : sessions.values()) {
  88.             if (session.isOpen()) {
  89.                 try {
  90.                     session.getAsyncRemote().sendText(message.toString());
  91.                 } catch (Exception e) {
  92.                     e.printStackTrace();
  93.                 }
  94.             }
  95.         }
  96.     }
  97.    
  98.     /**
  99.      * 发送私聊消息
  100.      */
  101.     private void sendToUser(String fromUserId, String toUserId, String content) {
  102.         String toSessionId = userSessionMap.get(toUserId);
  103.         if (toSessionId != null) {
  104.             Session toSession = sessions.get(toSessionId);
  105.             if (toSession != null && toSession.isOpen()) {
  106.                 try {
  107.                     JSONObject message = new JSONObject();
  108.                     message.put("sender", fromUserId);
  109.                     message.put("content", content);
  110.                     message.put("timestamp", System.currentTimeMillis());
  111.                     message.put("type", "private");
  112.                     
  113.                     toSession.getAsyncRemote().sendText(message.toString());
  114.                 } catch (Exception e) {
  115.                     e.printStackTrace();
  116.                 }
  117.             }
  118.         }
  119.     }
  120. }
复制代码
编程方式(继承 Endpoint 类)
  1. @ServerEndpoint("/game")
  2. public class GameEndpoint extends Endpoint {
  3.    
  4.     private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
  5.    
  6.     @Override
  7.     public void onOpen(Session session, EndpointConfig config) {
  8.         System.out.println("新连接: " + session.getId());
  9.         sessions.add(session);
  10.         
  11.         // 添加消息处理器
  12.         session.addMessageHandler(new MessageHandler.Whole<String>() {
  13.             @Override
  14.             public void onMessage(String message) {
  15.                 System.out.println("收到: " + message);
  16.                 // 处理游戏逻辑
  17.                 handleGameMessage(session, message);
  18.             }
  19.         });
  20.         
  21.         // 发送欢迎消息
  22.         try {
  23.             JSONObject welcome = new JSONObject();
  24.             welcome.put("type", "welcome");
  25.             welcome.put("message", "欢迎加入游戏!");
  26.             welcome.put("sessionId", session.getId());
  27.             session.getBasicRemote().sendText(welcome.toString());
  28.         } catch (IOException e) {
  29.             e.printStackTrace();
  30.         }
  31.     }
  32.    
  33.     @Override
  34.     public void onClose(Session session, CloseReason closeReason) {
  35.         System.out.println("连接关闭: " + session.getId());
  36.         sessions.remove(session);
  37.         
  38.         // 通知其他玩家
  39.         broadcastPlayerLeft(session.getId());
  40.     }
  41.    
  42.     @Override
  43.     public void onError(Session session, Throwable thr) {
  44.         System.err.println("连接错误: " + session.getId());
  45.         thr.printStackTrace();
  46.     }
  47.    
  48.     private void handleGameMessage(Session session, String message) {
  49.         try {
  50.             JSONObject json = new JSONObject(message);
  51.             String type = json.getString("type");
  52.             
  53.             switch (type) {
  54.                 case "move":
  55.                     // 处理移动
  56.                     handlePlayerMove(session, json);
  57.                     break;
  58.                 case "chat":
  59.                     // 处理聊天
  60.                     handleChatMessage(session, json);
  61.                     break;
  62.                 default:
  63.                     System.out.println("未知消息类型: " + type);
  64.             }
  65.         } catch (Exception e) {
  66.             e.printStackTrace();
  67.         }
  68.     }
  69.    
  70.     private void handlePlayerMove(Session session, JSONObject moveData) {
  71.         // 处理玩家移动逻辑
  72.         // 广播给所有玩家
  73.         broadcastGameUpdate(moveData);
  74.     }
  75.    
  76.     private void handleChatMessage(Session session, JSONObject chatData) {
  77.         // 广播聊天消息
  78.         JSONObject broadcastMsg = new JSONObject();
  79.         broadcastMsg.put("type", "chat");
  80.         broadcastMsg.put("sender", session.getId());
  81.         broadcastMsg.put("message", chatData.getString("message"));
  82.         broadcastMsg.put("timestamp", System.currentTimeMillis());
  83.         
  84.         broadcast(broadcastMsg.toString());
  85.     }
  86.    
  87.     private void broadcast(String message) {
  88.         synchronized (sessions) {
  89.             for (Session s : sessions) {
  90.                 if (s.isOpen()) {
  91.                     try {
  92.                         s.getBasicRemote().sendText(message);
  93.                     } catch (IOException e) {
  94.                         e.printStackTrace();
  95.                     }
  96.                 }
  97.             }
  98.         }
  99.     }
  100. }
复制代码
 配置文件

application.yml 配置
  1. spring:
  2.   websocket:
  3.     # WebSocket 配置
  4.     enabled: true
  5.    
  6. server:
  7.   # 服务器配置
  8.   port: 8080
  9.   servlet:
  10.     context-path: /api
  11.   
  12. # 自定义配置
  13. websocket:
  14.   max-sessions: 1000
  15.   heartbeat-interval: 30000
  16.   max-message-size: 128KB
复制代码
心跳检测和连接管理
  1. @Component
  2. public class WebSocketHeartbeat {
  3.    
  4.     @Autowired
  5.     private SimpMessagingTemplate messagingTemplate;
  6.    
  7.     private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  8.    
  9.     @PostConstruct
  10.     public void init() {
  11.         // 每30秒发送一次心跳
  12.         scheduler.scheduleAtFixedRate(() -> {
  13.             try {
  14.                 messagingTemplate.convertAndSend("/topic/heartbeat",
  15.                     Map.of("timestamp", System.currentTimeMillis(),
  16.                            "type", "heartbeat"));
  17.             } catch (Exception e) {
  18.                 e.printStackTrace();
  19.             }
  20.         }, 0, 30, TimeUnit.SECONDS);
  21.     }
  22.    
  23.     @PreDestroy
  24.     public void destroy() {
  25.         scheduler.shutdown();
  26.     }
  27. }
复制代码
消息编码器/解码器
  1. // 自定义消息编解码器
  2. @Component
  3. public class ChatMessageConverter implements MessageConverter {
  4.    
  5.     @Override
  6.     public Message<?> toMessage(Object payload, MessageHeaders headers) {
  7.         if (payload instanceof ChatMessage) {
  8.             ChatMessage msg = (ChatMessage) payload;
  9.             byte[] bytes = serializeMessage(msg);
  10.             return MessageBuilder.createMessage(bytes, headers);
  11.         }
  12.         return null;
  13.     }
  14.    
  15.     @Override
  16.     public Object fromMessage(Message<?> message, Class<?> targetClass) {
  17.         if (targetClass == ChatMessage.class) {
  18.             byte[] bytes = (byte[]) message.getPayload();
  19.             return deserializeMessage(bytes);
  20.         }
  21.         return null;
  22.     }
  23.    
  24.     private byte[] serializeMessage(ChatMessage message) {
  25.         try {
  26.             return new ObjectMapper().writeValueAsBytes(message);
  27.         } catch (Exception e) {
  28.             throw new RuntimeException("序列化失败", e);
  29.         }
  30.     }
  31.    
  32.     private ChatMessage deserializeMessage(byte[] bytes) {
  33.         try {
  34.             return new ObjectMapper().readValue(bytes, ChatMessage.class);
  35.         } catch (Exception e) {
  36.             throw new RuntimeException("反序列化失败", e);
  37.         }
  38.     }
  39. }
复制代码
集群支持
  1. @Configuration
  2. @EnableRedisRepositories
  3. public class RedisConfig {
  4.    
  5.     @Bean
  6.     public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
  7.         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  8.         container.setConnectionFactory(connectionFactory);
  9.         return container;
  10.     }
  11.    
  12.     @Bean
  13.     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
  14.         RedisTemplate<String, Object> template = new RedisTemplate<>();
  15.         template.setConnectionFactory(factory);
  16.         template.setKeySerializer(new StringRedisSerializer());
  17.         template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
  18.         return template;
  19.     }
  20. }
  21. // Redis 广播消息
  22. @Component
  23. public class RedisMessagePublisher {
  24.    
  25.     @Autowired
  26.     private RedisTemplate<String, Object> redisTemplate;
  27.    
  28.     public void publish(String channel, Object message) {
  29.         redisTemplate.convertAndSend(channel, message);
  30.     }
  31. }
  32. @Component
  33. public class RedisMessageSubscriber implements MessageListener {
  34.    
  35.     @Autowired
  36.     private SimpMessagingTemplate messagingTemplate;
  37.    
  38.     @Override
  39.     public void onMessage(Message message, byte[] pattern) {
  40.         // 处理从 Redis 收到的消息
  41.         // 转发给 WebSocket 客户端
  42.         String channel = new String(pattern);
  43.         String msg = new String(message.getBody());
  44.         
  45.         messagingTemplate.convertAndSend("/topic/" + channel, msg);
  46.     }
  47. }
复制代码
Spring Boot 的 STOMP 实现更加完整和易于使用,而原生 WebSocket 则更加灵活。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册