# MQ 统一推送入口设计文档 > **设计日期**: 2026-06-04 > **版本**: v1.0 > **状态**: 已批准 > **目标**: 创建统一消息队列推送入口 MqClient,封装多 Provider 路由、Action 注入、异常兜底,对标 spring-rui MqDefaultClient --- ## 一、背景与目标 ### 1.1 现状分析 当前项目已有基础 MQ 能力: - `MqService` 接口:提供 `send()` 方法,面向业务开发者直接使用 - `Message` 模型:含 id, topic, payload, headers, timestamp, retryCount - `RedisMqService` / `RabbitMqService`:分别基于 Redis Pub/Sub 和 RabbitMQ 的实现 - `MqTopic` 注解:用于方法级消息订阅 **存在的问题**: 1. 缺乏统一推送门面:业务代码需要直接注入 `MqService` 并选择实现,无法自动按 Provider 路由 2. 无 Provider 抽象:无法在多环境(开发用 Redis、生产用 RabbitMQ)间平滑切换 3. 无 Action 语义:消息缺乏"添加/删除/更新"等业务动作标识 4. 异常处理分散:各实现自行处理异常,缺乏统一兜底 ### 1.2 目标定义 1. 创建 `MqClient` 门面类,作为业务层唯一推送入口 2. 引入 `MqPublisher` Provider 接口,实现多 MQ 后端自动路由 3. 扩展 `Message` 模型,支持 action、provider、exchange、delay 等高级字段 4. 保持现有 `MqService` 接口不变,确保向后兼容 5. 所有推送操作统一异常捕获和日志记录 --- ## 二、详细设计 ### 2.1 整体架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ 业务层 (Business) │ │ MqClient.publish(...) │ └──────────────────────────┬──────────────────────────────────┘ │ ┌──────────────────────────▼──────────────────────────────────┐ │ MqClient (门面/统一入口) │ │ · 自动从 Spring 容器获取所有 MqPublisher 实现 │ │ · 按 support(MqProvider) 过滤匹配的 Publisher │ │ · 自动注入 action 到 payload │ │ · 统一 try-catch + 日志 │ └──────────────────────────┬──────────────────────────────────┘ │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ RedisMqPublisher │ │ RabbitMqPublisher│ │ FutureProvider │ │ (Redis实现) │ │ (RabbitMQ实现) │ │ (可扩展) │ └─────────────────┘ └─────────────────┘ └─────────────────┘ ``` ### 2.2 核心组件 | 组件 | 职责 | 位置 | |------|------|------| | `MqClient` | 统一推送门面,业务层唯一入口 | `rui-common-mq` | | `MqPublisher` | Provider 能力接口,定义 support + publish | `rui-common-mq` | | `MqProvider` | Provider 枚举(RABBITMQ, REDIS) | `rui-common-mq` | | `MqProperties` | 配置属性(默认 provider、topic prefix) | `rui-common-mq` | | `MqAction` | 消息动作枚举(ADDED, DELETED 等) | `rui-common-mq` | | `Message` | 扩展后的消息模型 | `rui-common-mq` | ### 2.3 数据流 ``` 启动阶段 (一次) │ ▼ Spring 注入 List → 遍历注册到 EnumMap │ ▼ publisherMap = { RABBITMQ: RabbitMqPublisher, REDIS: RedisMqPublisher } 运行时 (每次 publish) │ ▼ MqClient.publish(String topic, MqAction action, T payload) │ ├─ 1. 构造 Message ├─ 2. 若 action != NONE,将 action 序列化后注入 payload ├─ 3. 若 message.provider == null,使用 MqProperties 默认值 ├─ 4. publisherMap.get(provider) → O(1) 直接取到 Publisher ├─ 5. 调用 publisher.publish(message) └─ 6. catch Exception → log.error,不抛异常 ``` ### 2.4 接口设计 **MqClient(门面类)** 采用**构造器注入 + 启动预构建 EnumMap**,避免运行时遍历: ```java @Component @Slf4j public class MqClient { private final MqProperties properties; private final EnumMap publisherMap = new EnumMap<>(MqProvider.class); public MqClient(MqProperties properties, List publishers) { this.properties = properties; for (MqPublisher p : publishers) { publisherMap.put(p.getProvider(), p); // O(1) 注册 } } /** 使用默认 Provider 推送 */ public void publish(String topic, T payload); /** 使用默认 Provider 推送,带 Action */ public void publish(String topic, MqAction action, T payload); /** 指定 Provider 推送 */ public void publish(MqProvider provider, String topic, T payload); /** 指定 Provider 推送,带 Action */ public void publish(MqProvider provider, String topic, MqAction action, T payload); /** 完整 Message 推送 */ public void publish(Message message); private MqPublisher resolve(MqProvider provider) { return publisherMap.get(provider); // O(1) 查找 } } ``` **MqPublisher(Provider 接口)** ```java public interface MqPublisher { /** 返回该 Publisher 支持的 Provider 类型 */ MqProvider getProvider(); /** 基础推送 */ void publish(String topic, T payload); /** 完整消息推送 */ void publish(Message message); } ``` **MqProvider(枚举)** ```java public enum MqProvider { RABBITMQ, REDIS } ``` **MqProperties(配置)** ```java @ConfigurationProperties(prefix = "mq") @Data public class MqProperties { private MqProvider provider = MqProvider.RABBITMQ; private String prefix = "rui"; public String enTopic(String topic) { ... } public String deTopic(String topic) { ... } } ``` **MqAction(动作枚举,精简版)** ```java public enum MqAction { NONE, ADDED, DELETED, UPDATED, CREATED, CANCEL, ENABLED, SUCCESSFUL, FAILURE } ``` ### 2.5 扩展现有实现 **RedisMqService** 调整: - 实现 `MqPublisher` 接口 - `support(MqProvider.REDIS)` 返回 true - `publish()` 复用现有 `send()` 逻辑 **RabbitMqService** 调整: - 实现 `MqPublisher` 接口 - `support(MqProvider.RABBITMQ)` 返回 true - `publish()` 复用现有 `send()` 逻辑 ### 2.6 错误处理 - **异常策略**:`MqClient` 所有 publish 方法统一 try-catch,记录 error 日志,不抛异常给业务层 - **Provider 不匹配**:若找不到支持该 Provider 的 Publisher,记录 warn 日志 - **Payload 为空**:自动创建空 JSON 对象 `{}` --- ## 三、验收标准 - [ ] `MqClient` 可正常注入并调用 `publish()` 发送消息 - [ ] 通过配置 `mq.provider=redis` 可自动切换到 Redis 实现 - [ ] `publish(topic, MqAction.ADDED, payload)` 发送的消息包含 action 字段 - [ ] 发送异常时不抛异常,仅记录日志 - [ ] 现有 `MqService.send()` 调用不受影响,向后兼容 - [ ] 项目可正常编译通过 --- ## 四、风险与依赖 | 风险 | 影响 | 缓解措施 | |------|------|---------| | 现有 `MqService` 被多处使用 | 中 | 不修改 `MqService`,新建 `MqPublisher` 接口 | | `Message` 扩展后序列化兼容性 | 低 | 新增字段均为可空,不影响现有序列化 | | SpringUtil 在静态方法中可能未初始化 | 低 | MqClient 通过构造器注入,非静态调用 | --- ## 五、对标分析 | 维度 | spring-rui (MqDefaultClient) | 本设计 (MqClient) | |------|------------------------------|-------------------| | 门面类名 | `MqDefaultClient` | `MqClient`(更简洁) | | Provider 接口 | `MqService`(含 support/publish) | `MqPublisher`(不冲突现有 `MqService`) | | 配置类名 | `MqAutoConfiguration` | `MqProperties`(更语义化) | | JSON 框架 | fastjson2 | Jackson(适配本项目) | | Provider 支持 | MQTT, RABBITMQ, REDIS | RABBITMQ, REDIS(MQTT 暂不需要) | | Action 枚举 | `Actions`(80+ 项) | `MqAction`(精简 9 项) | | 包名 | `org.rui.common.mq` | `com.rui.common.mq` |