# rocketmq-producer **Repository Path**: ryan.van/rocketmq-producer ## Basic Information - **Project Name**: rocketmq-producer - **Description**: 个基于 Spring Boot 的 RocketMQ 生产者集成 starter,提供了简单易用的 RocketMQ 生产者客户端接入能力,简化了 RocketMQ 生产者的配置和使用流程,支持多种消息发送方式和重试策略。 - **Primary Language**: Unknown - **License**: MIT-0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-20 - **Last Updated**: 2025-08-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RocketMQ Producer Spring Boot Starter [![Maven Central](https://img.shields.io/maven-central/v/com.petalsdata.rocketmq/rocketmq-producer-spring-boot-starter.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22com.petalsdata.rocketmq%22%20AND%20a:%22rocketmq-producer-spring-boot-starter%22) ## 项目介绍 `rocketmq-producer-spring-boot-starter` 是一个基于 Spring Boot 的 RocketMQ 生产者集成 starter,提供了简单易用的 RocketMQ 生产者客户端接入能力,简化了 RocketMQ 生产者的配置和使用流程,支持多种消息发送方式和重试策略。 ## 功能特性 - 自动配置 RocketMQ 生产者客户端 - 支持多生产者实例配置与管理 - 支持 ACL 权限认证 - 支持消息发送重试机制(包含指数退避策略) - 支持消息压缩配置 - 支持最大消息大小限制 - 与 Spring Boot 无缝集成 - 支持 Spring Boot 的配置属性绑定 - 提供健康检查指标 ## 版本支持 - Spring Boot: 2.7.0+ (可通过 pom.xml 调整) - RocketMQ: 5.1.4+ (可通过 pom.xml 调整) - Java: 1.8+ (推荐使用 Java 11+) ## 快速开始 ### 1. 添加依赖 在 `pom.xml` 文件中添加以下依赖: ```xml com.petalsdata.rocketmq rocketmq-producer-spring-boot-starter 1.0.0 ``` ### 2. 配置参数 在 `application.yml` 或 `application.properties` 中添加 RocketMQ 配置: ```yaml rocketmq: name-server: 127.0.0.1 port: 9876 # 可选:全局 ACL 配置 access-key: your-access-key secret-key: your-secret-key # 可选:全局重试配置 retry: max-attempts: 3 initial-delay-millis: 100 delay-multiplier: 2.0 # 多生产者配置 producers: producer1: group: group1 send-msg-timeout: 60000 retry-times-when-send-failed: 2 compress-msg-body-over-howmuch: 4096 max-message-size: 4194304 # 可选:生产者级 ACL 配置(覆盖全局配置) # access-key: producer1-access-key # secret-key: producer1-secret-key producer2: group: group2 unit-name: unit2 ``` ### 3. 使用示例 #### 3.1 发送同步消息 ```java import com.petalsdata.rocketmq.producer.RocketMQProducerFactory; import com.petalsdata.rocketmq.producer.RocketProducer; import org.apache.rocketmq.client.producer.SendResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired private RocketMQProducerFactory producerFactory; @GetMapping("/send-sync") public String sendSyncMessage() { // 获取指定生产者 RocketProducer producer = producerFactory.getProducer("producer1"); if (producer == null) { return "Producer not found"; } // 发送同步消息 try { SendResult result = producer.sendSyncMessage("topic-test", "Hello RocketMQ"); return "Message sent successfully, msgId: " + result.getMsgId(); } catch (Exception e) { e.printStackTrace(); return "Failed to send message: " + e.getMessage(); } } } ``` #### 3.2 发送带重试机制的同步消息 ```java @GetMapping("/send-sync-with-retry") public String sendSyncMessageWithRetry() { RocketProducer producer = producerFactory.getProducer("producer1"); if (producer == null) { return "Producer not found"; } try { // 使用默认重试配置 SendResult result = producer.sendSyncMessageWithRetry("topic-test", "Hello RocketMQ with retry"); return "Message sent successfully, msgId: " + result.getMsgId(); } catch (Exception e) { return "Failed to send message after retries: " + e.getMessage(); } } ``` #### 3.3 发送带标签的消息 ```java @GetMapping("/send-with-tag") public String sendMessageWithTag() { RocketProducer producer = producerFactory.getProducer("producer1"); if (producer == null) { return "Producer not found"; } try { SendResult result = producer.sendSyncMessage("topic-test", "tag1", "Hello RocketMQ with tag"); return "Message sent successfully, msgId: " + result.getMsgId(); } catch (Exception e) { return "Failed to send message: " + e.getMessage(); } } ``` ## 配置说明 ### 核心配置项 | 配置项 | 描述 | 默认值 | |-------|-----|-------| | `rocketmq.name-server` | RocketMQ NameServer 地址 | 无,必须配置 | | `rocketmq.port` | RocketMQ NameServer 端口 | 无,可选 | | `rocketmq.access-key` | 全局访问密钥 | 无,可选 | | `rocketmq.secret-key` | 全局密钥 | 无,可选 | ### 重试配置项 | 配置项 | 描述 | 默认值 | |-------|-----|-------| | `rocketmq.retry.max-attempts` | 最大重试次数 | 3 | | `rocketmq.retry.initial-delay-millis` | 初始重试延迟时间(毫秒) | 100 | | `rocketmq.retry.delay-multiplier` | 重试延迟倍数 | 2.0 | ### 生产者配置项 | 配置项 | 描述 | 默认值 | |-------|-----|-------| | `rocketmq.producers.{key}.group` | 生产者组名 | 无,必须配置 | | `rocketmq.producers.{key}.unit-name` | 单元名称 | 无,可选 | | `rocketmq.producers.{key}.send-msg-timeout` | 发送消息超时时间(毫秒) | 60000 | | `rocketmq.producers.{key}.retry-times-when-send-failed` | 发送失败重试次数 | 2 | | `rocketmq.producers.{key}.compress-msg-body-over-howmuch` | 消息压缩阈值(字节) | 4096 | | `rocketmq.producers.{key}.max-message-size` | 最大消息大小(字节) | 1048576 | | `rocketmq.producers.{key}.access-key` | 生产者访问密钥 | 无,可选 | | `rocketmq.producers.{key}.secret-key` | 生产者密钥 | 无,可选 | | `rocketmq.producers.{key}.retry` | 生产者级重试配置 | 继承全局配置 | ## 项目结构 ``` rocketmq-spring-boot-starter/ ├── src/main/java/com/petalsdata/rocketmq/ │ ├── autoconfigure/ │ │ ├── RocketMQAutoConfiguration.java // 核心配置类 │ │ └── RocketMQProperties.java // 配置属性类 │ ├── acl/ │ │ └── RocketMQACLUtil.java // ACL工具类 │ ├── producer/ │ │ ├── RocketProducer.java // 生产者封装类 │ │ └── RocketMQProducerFactory.java // 生产者工厂类 └── src/main/resources/META-INF/ └── spring.factories // Spring Boot自动配置入口 ├── pom.xml // Maven依赖配置 └── README.md // 项目文档 ``` ## 注意事项 1. 确保 RocketMQ 服务已正确安装和启动 2. 生产者组名必须唯一且不为空 3. 如需使用 ACL 认证,请确保 RocketMQ 服务已开启 ACL 功能 4. 发送消息时,确保主题已存在或 RocketMQ 服务配置为自动创建主题