? ? ? ?本文解析將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細(xì)節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費(fèi) RocketMQ 消息。
一、使用方法
添加maven依賴:
<!--在pom.xml中添加依賴-->
<dependency>
? ? <groupId>org.apache.rocketmq</groupId>
? ? <artifactId>rocketmq-spring-boot-starter</artifactId>
? ? <version>${RELEASE.VERSION}</version>
</dependency>
二、發(fā)送消息
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
請將上述示例配置中的
127.0.0.1:9876
替換成真實(shí)RocketMQ的NameServer地址與端口
1、編寫代碼
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
//send message synchronously
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
//send spring message
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
//send messgae asynchronously
rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
//Send messages orderly
rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
//rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
}
@Data
@AllArgsConstructor
public class OrderPaidEvent implements Serializable{
private String orderId;
private BigDecimal paidMoney;
}
}
三、接收消息
1、Push模式
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
注意:
請將上述示例配置中的
127.0.0.1:9876
替換成真實(shí)RocketMQ的NameServer地址與端口
編寫代碼
@SpringBootApplication
public class ConsumerApplication{
public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args);
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer1 implements RocketMQListener<String>{
public void onMessage(String message) {
log.info("received message: {}", message);
}
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
public void onMessage(OrderPaidEvent orderPaidEvent) {
log.info("received orderPaidEvent: {}", orderPaidEvent);
}
}
}
2、Pull模式
從RocketMQ Spring 2.2.0開始,RocketMQ Srping支持Pull模式消費(fèi)
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test
注意之前l(fā)ite pull consumer的生效配置為rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易與push-consumer混淆,因此在2.2.3版本之后修改為rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.
編寫代碼
@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
//This is an example of pull consumer using rocketMQTemplate.
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
//This is an example of pull consumer using extRocketMQTemplate.
messages = extRocketMQTemplate.receive(String.class);
System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
}
}
四、事務(wù)消息
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
請將上述示例配置中的
127.0.0.1:9876
替換成真實(shí)RocketMQ的NameServer地址與端口
編寫代碼
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
try {
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
} catch (MQClientException e) {
e.printStackTrace(System.out);
}
}
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
}
五、消息軌跡
Producer 端要想使用消息軌跡,需要多配置兩個配置項(xiàng):
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息軌跡的功能需要在?@RocketMQMessageListener
?中進(jìn)行配置對應(yīng)的屬性:
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
enableMsgTrace = true,
customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
注意:
默認(rèn)情況下 Producer 和 Consumer 的消息軌跡功能是開啟的且 trace-topic 為 RMQ_SYS_TRACE_TOPIC Consumer 端的消息軌跡 trace-topic 可以在配置文件中配置?
rocketmq.consumer.customized-trace-topic
?配置項(xiàng),不需要為在每個?@RocketMQMessageListener
?配置。
若需使用阿里云消息軌跡,則需要在
@RocketMQMessageListener
中將accessChannel
配置為CLOUD
。
五、ACL功能
Producer 端要想使用 ACL 功能,需要多配置兩個配置項(xiàng):
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在?@RocketMQMessageListener
?中進(jìn)行配置:
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
accessKey = "AK",
secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
注意:
可以不用為每個?
@RocketMQMessageListener
?注解配置 AK/SK,在配置文件中配置?rocketmq.consumer.access-key
?和?rocketmq.consumer.secret-key
?配置項(xiàng),這兩個配置項(xiàng)的值就是默認(rèn)值
六、請求 應(yīng)答語義支持
RocketMQ-Spring 提供 請求/應(yīng)答 語義支持。
- Producer端
發(fā)送Request消息使用SendAndReceive方法
注意
同步發(fā)送需要在方法的參數(shù)中指明返回值類型
異步發(fā)送需要在回調(diào)的接口中指明返回值類型文章來源:http://www.zghlxwxcb.cn/news/detail-842461.html
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
// 同步發(fā)送request并且等待String類型的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);
// 異步發(fā)送request并且等待User類型的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
System.out.printf("send user object and receive %s %n", message.toString());
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
}
@Data
@AllArgsConstructor
public class User implements Serializable{
private String userName;
private Byte userAge;
}
}
- Consumer端
需要實(shí)現(xiàn)RocketMQReplyListener<T, R> 接口,其中T表示接收值的類型,R表示返回值的類型。文章來源地址http://www.zghlxwxcb.cn/news/detail-842461.html
@SpringBootApplication
public class ConsumerApplication{
public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args);
}
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
@Service
@RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
public User onMessage(User user) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
User replyUser = new User("replyUserName",(byte) 10);
return replyUser;
}
}
@Data
@AllArgsConstructor
public class User implements Serializable{
private String userName;
private Byte userAge;
}
}
到了這里,關(guān)于Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費(fèi) RocketMQ 消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!