一、什么是站内信?
站内信(In-App Messaging 或 Internal Messaging)是指在一个软件系统或平台内部,用户之间或系统与用户之间进行非实时或准实时文字通信的功能模块。它不依赖外部通信渠道(如短信、邮件),而是完全在应用内部完成消息的发送、接收与管理。
在企业级管理系统(如 OA、ERP、CRM、HRM、项目管理平台等)中,站内信是信息触达和协同办公的重要基础设施
二、站内信的核心功能
功能类别
- 消息收发 支持系统自动发送通知,或用户之间发送私信。
- 消息分类 如:系统通知、审批提醒、任务指派、公告、私聊等。
- 已读/未读状态 用户可标记消息为已读,系统可统计未读数量(常用于红点提示)。
- 消息列表与详情 提供消息中心页面,支持分页、筛选、搜索、按时间排序。
- 批量操作 如“全部标为已读”、“批量删除”等,提升操作效率。
- 实时提醒(可选) 通过 WebSocket 或轮询,在新消息到达时即时通知用户。
- 多端同步 Web、移动端等不同终端的消息状态保持一致。
- 权限与安全 用户只能查看自己的消息,敏感内容需防泄露、防篡改。
三、对比多种实现方式
(一)、按消息投递模型分类
在企业级管理系统中,实现站内信(In-App Messaging)有多种技术路径和架构方案。不同的实现方式适用于不同规模、性能要求、实时性需求和系统复杂度。下面从核心维度出发,系统性地介绍实现站内信的多种方式,并对比其优缺点与适用场景。
一、按消息投递模型分类
- 写扩散(Push 模型 / Fan-out on Write)
<blockquote>
原理:发送消息时,为每个接收者单独写入一条记录到其“收件箱”。
优点:
查询快:用户读消息只需查自己的 inbox 表,无需 join。
支持个性化:可标记不同用户是否已读、是否删除。
缺点:
写压力大:群发 1000 人 = 写 1000 条记录。
存储冗余:相同内容重复存储。
适用场景:
用户量中等( new ConcurrentHashMap()); // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 SseEmitter emitter = new SseEmitter(86400000L); emitters.put(token, emitter); // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token emitter.onCompletion(() -> { SseEmitter remove = emitters.remove(token); if (remove != null) { remove.complete(); } }); emitter.onTimeout(() -> { SseEmitter remove = emitters.remove(token); if (remove != null) { remove.complete(); } }); emitter.onError((e) -> { SseEmitter remove = emitters.remove(token); if (remove != null) { remove.complete(); } }); try { // 向客户端发送一条连接成功的事件 emitter.send(SseEmitter.event().comment("connected")); } catch (IOException e) { // 如果发送消息失败,则从映射表中移除 emitter emitters.remove(token); } return emitter; } /** * 断开指定用户的 SSE 连接 * * @param userIdAndTenantId 用户的唯一标识符,用于区分不同用户的连接 * @param token 用户的唯一令牌,用于识别具体的连接 */ public void disconnect(String userIdAndTenantId, String token) { if (userIdAndTenantId == null || token == null) { return; } Map emitters = USER_TOKEN_EMITTERS.get(userIdAndTenantId); if (MapUtil.isNotEmpty(emitters)) { try { SseEmitter sseEmitter = emitters.get(token); sseEmitter.send(SseEmitter.event().comment("disconnected")); sseEmitter.complete(); } catch (Exception ignore) { } emitters.remove(token); } else { USER_TOKEN_EMITTERS.remove(userIdAndTenantId); } } /** * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息 * * @param consumer 处理SSE消息的消费者函数 */ public void subscribeMessage(Consumer consumer) { // 使用RedisTemplate实现订阅逻辑 redisTemplate.execute(connection -> { connection.subscribe((message, pattern) -> { try { // 反序列化消息 String body = new String(message.getBody()); // 添加数据格式检查 if (body.startsWith("[")) { log.warn("接收到意外的数组格式数据: {}", body); // 如果确实是数组格式,可以选择处理第一个元素或跳过 JSONArray array = JSONUtil.parseArray(body); if (!array.isEmpty()) { SseMessageDto sseMessage = JSONUtil.toBean(array.getJSONObject(0), SseMessageDto.class); consumer.accept(sseMessage); } return; } SseMessageDto sseMessage = JSONUtil.toBean(body, SseMessageDto.class); // 执行消费逻辑 consumer.accept(sseMessage); } catch (Exception e) { log.error("处理SSE订阅消息异常", e); } }, SSE_TOPIC.getBytes()); return null; }, true); } /** * 向指定的用户会话发送消息 * * @param userIdAndTenantId 要发送消息的用户id * @param message 要发送的消息内容 */ public void sendMessage(String userIdAndTenantId, String message) { Map emitters = USER_TOKEN_EMITTERS.get(userIdAndTenantId); if (MapUtil.isNotEmpty(emitters)) { for (Map.Entry entry : emitters.entrySet()) { try { SseEmitter sseEmitter = entry.getValue(); sseEmitter.send(SseEmitter.event() .name("message") .data(message)); } catch (Exception e) { SseEmitter remove = emitters.remove(entry.getKey()); if (remove != null) { remove.complete(); } } } } else { USER_TOKEN_EMITTERS.remove(userIdAndTenantId); } } /** * 推送未读数量 * @param userIdAndTenantId userId+租户Id * @param unreadCount 未读 数量 */ public void sendMessage(String userIdAndTenantId, Long unreadCount) { Map emitters = USER_TOKEN_EMITTERS.get(userIdAndTenantId); if (MapUtil.isNotEmpty(emitters)) { for (Map.Entry entry : emitters.entrySet()) { try { SseEmitter sseEmitter = entry.getValue(); sseEmitter.send(SseEmitter.event() .name("unreadCount") .data(unreadCount < 0 ? 0 : unreadCount)); } catch (Exception e) { SseEmitter remove = emitters.remove(entry.getKey()); if (remove != null) { remove.complete(); } } } } else { USER_TOKEN_EMITTERS.remove(userIdAndTenantId); } } /** * 本机全用户会话发送消息 * * @param message 要发送的消息内容 */ public void sendMessage(String message) { for (String userIdAndTenantId : USER_TOKEN_EMITTERS.keySet()) { sendMessage(userIdAndTenantId, message); } } /** * 发布SSE订阅消息 * * @param sseMessageDto 要发布的SSE消息对象 */ public void publishMessage(SseMessageDto sseMessageDto) { SseMessageDto broadcastMessage = new SseMessageDto(); broadcastMessage.setMessage(sseMessageDto.getMessage()); broadcastMessage.setUserIds(sseMessageDto.getUserIds()); // 使用RedisTemplate发布消息 redisTemplate.convertAndSend(SSE_TOPIC, broadcastMessage); log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); } /** * 向所有的用户发布订阅的消息(群发) * * @param message 要发布的消息内容 */ public void publishAll(String message) { SseMessageDto broadcastMessage = new SseMessageDto(); broadcastMessage.setMessage(message); // 使用RedisTemplate发布消息 redisTemplate.convertAndSend(SSE_TOPIC, broadcastMessage); log.info("向所有的用户发布订阅的消息:{} session keys:{} message:{}", SSE_TOPIC, "all", message); }}[/code]SSE的监听器
- <!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>SSE站内信系统</title>
-
- </head>
- <body>
- <h1>SSE站内信系统</h1>
-
- 未连接
-
-
- <button id="connectBtn">连接SSE</button>
- <button id="disconnectBtn">断开连接</button>
- <button id="refreshBtn">刷新历史消息</button>
- <button id="clearBtn">清空消息</button>
-
-
-
- <h2>消息列表</h2>
-
-
-
- </body>
- </html>
复制代码 SSE收发消息的工具类
- CREATE TABLE "public"."sys_message" (
- "message_id" int8 NOT NULL,
- "title" varchar(255) COLLATE "pg_catalog"."default",
- "business_type" varchar(255) COLLATE "pg_catalog"."default",
- "content" text COLLATE "pg_catalog"."default",
- "sender_id" int8,
- "sender_name" varchar(255) COLLATE "pg_catalog"."default",
- "receiver_id" int8,
- "receiver_name" varchar(255) COLLATE "pg_catalog"."default",
- "is_read" bool,
- "read_time" timestamp(6),
- "create_time" timestamp(6),
- "update_time" timestamp(6),
- "tenant_id" varchar(64) COLLATE "pg_catalog"."default" DEFAULT '000000'::character varying,
- CONSTRAINT "sys_message_pkey" PRIMARY KEY ("message_id")
- )
- ;
- ALTER TABLE "public"."sys_message"
- OWNER TO "postgres";
- COMMENT ON COLUMN "public"."sys_message"."message_id" IS '主键';
- COMMENT ON COLUMN "public"."sys_message"."title" IS '标题';
- COMMENT ON COLUMN "public"."sys_message"."business_type" IS '业务分类【服务消息|系统消息|预警消息】';
- COMMENT ON COLUMN "public"."sys_message"."content" IS '站内信内容';
- COMMENT ON COLUMN "public"."sys_message"."sender_id" IS '站内信发送者Id';
- COMMENT ON COLUMN "public"."sys_message"."sender_name" IS '发送者名称';
- COMMENT ON COLUMN "public"."sys_message"."receiver_id" IS '站内信接收者Id';
- COMMENT ON COLUMN "public"."sys_message"."receiver_name" IS '接受者名称';
- COMMENT ON COLUMN "public"."sys_message"."is_read" IS 'true=已读';
- COMMENT ON COLUMN "public"."sys_message"."read_time" IS '站内信阅读时间';
- COMMENT ON COLUMN "public"."sys_message"."create_time" IS '站内信生产时间';
- COMMENT ON COLUMN "public"."sys_message"."update_time" IS '排序时间,默认就是生产时间';
- COMMENT ON COLUMN "public"."sys_message"."tenant_id" IS '租户Id';
复制代码 SSE手动装配
- import com.baomidou.mybatisplus.annotation.IdType;
- import com.baomidou.mybatisplus.annotation.TableId;
- import com.baomidou.mybatisplus.annotation.TableName;
- import com.fasterxml.jackson.annotation.JsonFormat;
- import lombok.Data;
- import org.springframework.format.annotation.DateTimeFormat;
- import java.time.LocalDateTime;
- /** 站内信息
- * 万里悲秋常作客,百年多病独登台
- * @author : makeJava
- */
- @Data
- @TableName("sys_message")
- public class SysMessage {
- // 站内信消息Id
- @TableId(value = "message_id", type = IdType.ASSIGN_ID)
- @JsonFormat(shape = JsonFormat.Shape.STRING)
- private Long messageId;
- // 站内信标题
- private String title;
- // 业务分类【服务消息|系统消息|预警消息】
- private String businessType;
- // 站内信内容
- private String content;
- // 站内信发送者Id {如果发送者Id为-1就是所有人都能搜到}
- private Long senderId;
- // 站内信发送者名称
- private String senderName;
- // 站内信接收者Id
- private Long receiverId;
- // 站内信接收者名称
- private String receiverName;
- // true=已读
- private Boolean isRead;
- // 站内信阅读时间
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")
- private LocalDateTime readTime;
- // 站内信创建时间
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")
- private LocalDateTime createTime;
- // 排序使用时间
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")
- private LocalDateTime updateTime;
- // 租户隔离
- private String tenantId;
- }
复制代码 SSE接口层
- import com.baomidou.mybatisplus.core.mapper.BaseMapper;
- import com.dcqc.irs.system.domain.SysMessage;
- import org.apache.ibatis.annotations.Mapper;
- /**
- * 万里悲秋常作客,百年多病独登台
- * @author : makeJava
- */
- @Mapper
- public interface SysMessageMapper extends BaseMapper<SysMessage> {
- }
复制代码 实体对象
- import com.baomidou.mybatisplus.extension.service.IService;
- import com.dcqc.irs.system.domain.SysMessage;
- /**
- * 万里悲秋常作客,百年多病独登台
- * @author : makeJava
- */
- public interface SysMessageService extends IService<SysMessage> {
- long countReadMessage(Long userId,String tenantId);
- }
- /**
- * 万里悲秋常作客,百年多病独登台
- * @author : makeJava
- */
- @Service
- @Slf4j
- public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMessage> implements SysMessageService {
- @Resource
- private SysMessageMapper sysMessageMapper;
- /**
- * 接收系统---站内信消息
- * @param event event
- */
- @EventListener
- public void onSysMessageEvent(SysMessageEvent event) {
- log.info("接收到系统消息: {}", event.getTitle());
- // 创建消息
- SysMessage sysMessage = new SysMessage();
- sysMessage.setMessageId(event.getMessageId());
- sysMessage.setTitle(event.getTitle());
- sysMessage.setContent(event.getContent());
- sysMessage.setSenderId(event.getSenderId());
- sysMessage.setSenderName(event.getSenderName());
- sysMessage.setReceiverId(event.getReceiverId());
- sysMessage.setReceiverName(event.getReceiverName());
- sysMessage.setTenantId(event.getTenantId());
- sysMessage.setIsRead(false);
- sysMessage.setBusinessType(event.getBusinessType());
- sysMessage.setCreateTime(LocalDateTime.now());
- sysMessage.setUpdateTime(LocalDateTime.now());
- if (sysMessage.getMessageId() == null) {
- sysMessage.setMessageId(IdWorker.getId());
- }
- save(sysMessage);
- }
- /**
- * 统计未读消息
- * @param userId userId
- * @param tenantId tenantId
- * @return long
- */
- @Override
- public long countReadMessage(Long userId, String tenantId) {
- return sysMessageMapper.selectCount(Wrappers.lambdaQuery(SysMessage.class).eq(SysMessage::getReceiverId, userId).eq(SysMessage::getTenantId, tenantId).eq(SysMessage::getIsRead, false));
- }
- }
复制代码 效果
模拟给他发一条消息
检查SSE的连接客户端收到
断开连接
F12查看SSE长连接
SSE搭建的站内信结束
出处:http://www.cnblogs.com/gtnotgod】/个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!
如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!
Java入门到入坟
万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•̀ω•́)っ✎⁾⁾!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |