本博客屬于 《RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu)》的子博客
一、整體架構(gòu)
step1:消息落庫,業(yè)務(wù)數(shù)據(jù)存庫的同時(shí),也要將消息記錄存入數(shù)據(jù)庫,二者要保證原子性;
step2:Producer發(fā)送消息到MQ Broker;
step3:Producer收到 broker 返回的確認(rèn)消息;
step4:更改消息記錄庫的狀態(tài)(定義三種狀態(tài):0待確認(rèn)、1已確認(rèn)、2確認(rèn)失敗);
step5:定時(shí)任務(wù)獲取長時(shí)間處于待確認(rèn)狀態(tài)的消息;
step6:Producer重試發(fā)送消息;
step7:重試次數(shù)超過3次,將消息狀態(tài)更新為確認(rèn)失敗,后續(xù)根據(jù)具體業(yè)務(wù)再處理確認(rèn)失敗的消息;
二、消息記錄的增刪改查
1. 當(dāng)前項(xiàng)目名為 rabbit-core-producer,為了實(shí)現(xiàn)消息記錄入庫,需要跟數(shù)據(jù)庫打交道,這里首先添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
?2. 消息記錄的建表語句?rabbit-producer-message-schema.sql
-- 表 broker_message.broker_message 結(jié)構(gòu)
CREATE TABLE IF NOT EXISTS `broker_message` (
`message_id` varchar(128) NOT NULL,
`message` varchar(4000),
`try_count` int(4) DEFAULT 0,
`status` varchar(10) DEFAULT '',
`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
將?rabbit-producer-message-schema.sql 放在 rabbit-core-producer 項(xiàng)目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer項(xiàng)目?在 RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu)?有具體說明(當(dāng)前博客是?RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu) 的其中一個(gè)章節(jié))。
3. 數(shù)據(jù)源的配置文件?rabbit-producer-message.properties
rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=root
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true
同樣需要將該文件放在 rabbit-core-producer 項(xiàng)目下的 /src/main/resources/rabbit-producer-message.properties。
因?yàn)樯厦媾渲弥杏杏玫綌?shù)據(jù)庫 broker_message,所以需要自己提前建好一個(gè)數(shù)據(jù)庫 broker_message。
4.?BrokerMessage.java
public class BrokerMessage implements Serializable {
private static final long serialVersionUID = 7447792462810110841L;
private String messageId;
private Message message;
private Integer tryCount = 0;
private String status;
private Date nextRetry;
private Date createTime;
private Date updateTime;
// getter、setter方法省略
}
5. BrokerMessageMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" >
<resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" >
<id column="message_id" property="messageId" jdbcType="VARCHAR" />
<result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
<result column="try_count" property="tryCount" jdbcType="INTEGER" />
<result column="status" property="status" jdbcType="VARCHAR" />
<result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
<result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
</resultMap>
<sql id="Base_Column_List" >
message_id, message, try_count, status, next_retry, create_time, update_time
</sql>
<select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
select
<include refid="Base_Column_List" />
from broker_message
where message_id = #{messageId,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
delete from broker_message
where message_id = #{messageId,jdbcType=VARCHAR}
</delete>
<insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
insert into broker_message (message_id, message, try_count,
status, next_retry, create_time,
update_time)
values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER},
#{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
#{updateTime,jdbcType=TIMESTAMP})
</insert>
<insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
insert into broker_message
<trim prefix="(" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
message_id,
</if>
<if test="message != null" >
message,
</if>
<if test="tryCount != null" >
try_count,
</if>
<if test="status != null" >
status,
</if>
<if test="nextRetry != null" >
next_retry,
</if>
<if test="createTime != null" >
create_time,
</if>
<if test="updateTime != null" >
update_time,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
#{messageId,jdbcType=VARCHAR},
</if>
<if test="message != null" >
#{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
</if>
<if test="tryCount != null" >
#{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
#{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
#{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
#{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
#{updateTime,jdbcType=TIMESTAMP},
</if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
update broker_message
<set >
<if test="message != null" >
message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
</if>
<if test="tryCount != null" >
try_count = #{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
status = #{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
create_time = #{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
update_time = #{updateTime,jdbcType=TIMESTAMP},
</if>
</set>
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
update broker_message
set message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
try_count = #{tryCount,jdbcType=INTEGER},
status = #{status,jdbcType=VARCHAR},
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
create_time = #{createTime,jdbcType=TIMESTAMP},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="changeBrokerMessageStatus" >
update broker_message bm
set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
</update>
<select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
<![CDATA[
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message bm
where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
and bm.next_retry < sysdate()
]]>
</select>
<select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message bm
where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
</select>
<update id="update4TryCount" >
update broker_message bm
set bm.try_count = bm.try_count + 1,
bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
</update>
</mapper>
6.?BrokerMessageMapper.java
@Mapper
public interface BrokerMessageMapper {
int deleteByPrimaryKey(String messageId);
int insert(BrokerMessage record);
int insertSelective(BrokerMessage record);
BrokerMessage selectByPrimaryKey(String messageId);
int updateByPrimaryKeySelective(BrokerMessage record);
int updateByPrimaryKeyWithBLOBs(BrokerMessage record);
int updateByPrimaryKey(BrokerMessage record);
void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);
List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
}
7.?MessageStoreService.java(這里不加接口類了,直接在MessageStoreService.java中寫具體邏輯實(shí)現(xiàn))
@Service
public class MessageStoreService {
@Autowired
private BrokerMessageMapper brokerMessageMapper;
public int insert(BrokerMessage brokerMessage) {
return this.brokerMessageMapper.insert(brokerMessage);
}
public BrokerMessage selectByMessageId(String messageId) {
return this.brokerMessageMapper.selectByPrimaryKey(messageId);
}
public void succuess(String messageId) {
this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
BrokerMessageStatus.SEND_OK.getCode(),
new Date());
}
public void failure(String messageId) {
this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
BrokerMessageStatus.SEND_FAIL.getCode(),
new Date());
}
public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
}
public int updateTryCount(String brokerMessageId) {
return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
}
}
三、整合數(shù)據(jù)源
1. 讀取配置文件,生成數(shù)據(jù)源,RabbitProducerDataSourceConfiguration.java
@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
@Value("${rabbit.producer.druid.type}")
private Class<? extends DataSource> dataSourceType;
@Bean(name = "rabbitProducerDataSource")
@Primary
// 以這個(gè)rabbit.producer.druid.jdbc為前綴的屬性值都會(huì)注入到DataSource中
@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
public DataSource rabbitProducerDataSource() throws SQLException {
DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
return rabbitProducerDataSource;
}
public DataSourceProperties primaryDataSourceProperties(){
return new DataSourceProperties();
}
public DataSource primaryDataSource(){
return primaryDataSourceProperties().initializeDataSourceBuilder().build();
}
}
2. 執(zhí)行指定的sql腳本 ,BrokerMessageConfiguration.java
/**
* $BrokerMessageConfiguration
* 幫我執(zhí)行SQL腳本
* 幫我進(jìn)行數(shù)據(jù)庫表結(jié)構(gòu)的創(chuàng)建
*
*/
@Configuration
public class BrokerMessageConfiguration {
@Autowired
private DataSource rabbitProducerDataSource;
/**
* 加載 rabbit-producer-message-schema.sql 腳本(這是一個(gè)建表語句)
*/
@Value("classpath:rabbit-producer-message-schema.sql")
private Resource schemaScript;
@Bean
public DataSourceInitializer initDataSourceInitializer() {
System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
final DataSourceInitializer initializer = new DataSourceInitializer();
// 設(shè)置之前生成的數(shù)據(jù)源
initializer.setDataSource(rabbitProducerDataSource);
// 執(zhí)行指定的sql腳本
initializer.setDatabasePopulator(databasePopulator());
return initializer;
}
/**
* 執(zhí)行指定的sql腳本
* @return
*/
private DatabasePopulator databasePopulator() {
final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(schemaScript);
return populator;
}
}
3. 接下來是和 Mybatis 配置相關(guān)的文件:RabbitProducerMyBatisConfiguration.java
@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration執(zhí)行完才能執(zhí)行,即數(shù)據(jù)源生成之后才能執(zhí)行當(dāng)前類
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {
@Resource(name= "rabbitProducerDataSource")
private DataSource rabbitProducerDataSource;
@Bean(name="rabbitProducerSqlSessionFactory")
public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(rabbitProducerDataSource);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
// mapper.xml文件加載,這些配置本可以寫在 application.yml 中,但是由于要作為一個(gè)基礎(chǔ)組件,所以寫在代碼里,跟業(yè)務(wù)層面解綁,讓業(yè)務(wù)層面無感知
bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));
SqlSessionFactory sqlSessionFactory = bean.getObject();
sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
return sqlSessionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Bean(name="rabbitProducerSqlSessionTemplate")
public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
4. Mapper掃描配置相關(guān)的文件:?RabbitProducerMybatisMapperScanerConfig.java
@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration執(zhí)行完才能執(zhí)行,即數(shù)據(jù)源生成之后才能執(zhí)行當(dāng)前類
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {
@Bean(name="rabbitProducerMapperScannerConfigurer")
public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
// mapper.java文件加載,這些配置本可以寫在 application.yml 中,但是由于要作為一個(gè)基礎(chǔ)組件,所以寫在代碼里,跟業(yè)務(wù)層面解綁,讓業(yè)務(wù)層面無感知
MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");
return mapperScannerConfigurer;
}
}
四、可靠性發(fā)送消息代碼實(shí)現(xiàn)
/**
* $RabbitBrokerImpl 真正的發(fā)送不同類型的消息實(shí)現(xiàn)類
*
*/
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {
@Autowired
private RabbitTemplateContainer rabbitTemplateContainer;
@Autowired
private MessageStoreService messageStoreService;
/**
* 可靠性消息發(fā)送
*/
@Override
public void reliantSend(Message message) {
message.setMessageType(MessageType.RELIANT);
BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
if(bm == null) {
//1. 把數(shù)據(jù)庫的消息發(fā)送日志先記錄好
Date now = new Date();
BrokerMessage brokerMessage = new BrokerMessage();
brokerMessage.setMessageId(message.getMessageId());
brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
//tryCount默認(rèn)等于0 所以在最開始發(fā)送的時(shí)候不需要進(jìn)行設(shè)置
brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
brokerMessage.setCreateTime(now);
brokerMessage.setUpdateTime(now);
brokerMessage.setMessage(message);
messageStoreService.insert(brokerMessage);
}
//2. 執(zhí)行真正的發(fā)送消息邏輯
sendKernel(message);
}
@Override
public void rapidSend(Message message) {
// 省略...
}
/**
* $sendKernel 發(fā)送消息的核心方法 使用異步線程池進(jìn)行發(fā)送消息
* @param message
*/
private void sendKernel(Message message) {
AsyncBaseQueue.submit((Runnable) () -> {
CorrelationData correlationData =
// 回調(diào)函數(shù)confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
new CorrelationData(String.format("%s#%s#%s",
message.getMessageId(),
System.currentTimeMillis(),
message.getMessageType()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());
});
}
@Override
public void confirmSend(Message message) {
// 省略...
}
@Override
public void sendMessages() {
// 省略...
}
}
并且在回調(diào)函數(shù)中,也要添加相應(yīng)的邏輯:
/**
* $RabbitTemplateContainer池化封裝
* 每一個(gè)topic 對(duì)應(yīng)一個(gè)RabbitTemplate
* 1. 提高發(fā)送的效率
* 2. 可以根據(jù)不同的需求制定化不同的RabbitTemplate, 比如每一個(gè)topic 都有自己的routingKey規(guī)則
*/
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
private Splitter splitter = Splitter.on("#");
private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private MessageStoreService messageStoreService;
public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
// 省略...
}
/**
* 無論是 confirm 消息 還是 reliant 消息 ,發(fā)送消息以后 broker都會(huì)去回調(diào)confirm
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 具體的消息應(yīng)答
List<String> strings = splitter.splitToList(correlationData.getId());
String messageId = strings.get(0);
long sendTime = Long.parseLong(strings.get(1));
String messageType = strings.get(2);
if(ack) {
// 當(dāng)Broker 返回ACK成功時(shí), 就是更新一下日志表里對(duì)應(yīng)的消息發(fā)送狀態(tài)為 SEND_OK
// 如果當(dāng)前消息類型為reliant 我們就去數(shù)據(jù)庫查找并進(jìn)行更新
if(MessageType.RELIANT.endsWith(messageType)) {
this.messageStoreService.succuess(messageId);
}
log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
} else {
log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
}
}
}
上面大部分代碼都是在實(shí)現(xiàn)迅速類型的消息發(fā)送時(shí)已經(jīng)編寫了,只是在 confirm()方法中添加了:
// 如果當(dāng)前消息類型為reliant 我們就去數(shù)據(jù)庫查找并進(jìn)行更新
if(MessageType.RELIANT.endsWith(messageType)) {
this.messageStoreService.succuess(messageId);
}
五、定時(shí)任務(wù)獲取長時(shí)間處于待確認(rèn)狀態(tài)的消息并重新發(fā)送
1. 實(shí)現(xiàn)分布式定時(shí)任務(wù)
這里的定時(shí)任務(wù)是使用 ElasticJob,并對(duì)其進(jìn)行封裝,封裝在項(xiàng)目?rabbit-task中,封裝成為了兩個(gè)注解 @EnableElasticJob 和?@ElasticJobConfig 。
具體的 ElasticJob 的使用和封裝過程可參考教程:ElasticJob使用與封裝
2.? 將封裝好的項(xiàng)目?rabbit-task 添加到 當(dāng)前項(xiàng)目中并使用
(1)引入 rabbit-task 的依賴
<dependency>
<groupId>com.bfxy.base.rabbit</groupId>
<artifactId>rabbit-task</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
(2)使用注解@EnableElasticJob
在當(dāng)前項(xiàng)目 rabbit-core-producer 中的 自動(dòng)裝配類 中添加注解 @EnableElasticJob,使得當(dāng) 應(yīng)用程序啟動(dòng)的時(shí)候,就能對(duì) ZooKeeper注冊(cè)中心進(jìn)行初始化,以及 ElasticJob的定時(shí)任務(wù)解析類 ElasticJobConfParser 的初始化。
/**
* $RabbitProducerAutoConfiguration 自動(dòng)裝配
*
*/
@EnableElasticJob
@Configuration
@ComponentScan({"com.didiok.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}
(3)實(shí)現(xiàn)定時(shí)任務(wù)的具體處理邏輯并在類上加注解@EnableElasticJob
這里為了消息的可靠性發(fā)送,我們需要抓取 超時(shí)卻仍處于待確認(rèn)狀態(tài) 的消息,進(jìn)行重新發(fā)送消息。這里使用 ElasticJob 的流式定時(shí)任務(wù) DataFlowJob。
@Component
@ElasticJobConfig(
name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
cron= "0/10 * * * * ?",
description = "可靠性投遞消息補(bǔ)償任務(wù)",
overwrite = true,
shardingTotalCount = 1
)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{
@Autowired
private MessageStoreService messageStoreService;
@Autowired
private RabbitBroker rabbitBroker;
private static final int MAX_RETRY_COUNT = 3;
@Override
public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
// 抓取狀態(tài)為未確認(rèn),而且 next_retry 小于當(dāng)前時(shí)間的這些消息,為了確定百分百能發(fā)送成功,需要再進(jìn)行重發(fā)
List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
log.info("--------@@@@@ 抓取數(shù)據(jù)集合, 數(shù)量: {} @@@@@@-----------" , list.size());
return list;
}
@Override
public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
dataList.forEach( brokerMessage -> {
String messageId = brokerMessage.getMessageId();
if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
// 重試次數(shù)大于3,就不再進(jìn)行重發(fā)了,直接認(rèn)為發(fā)送失敗,更改標(biāo)記為失敗
this.messageStoreService.failure(messageId);
log.warn(" -----消息設(shè)置為最終失敗,消息ID: {} -------", messageId);
} else {
// 每次重發(fā)的時(shí)候要更新一下try_count和next_retry字段
this.messageStoreService.updateTryCount(messageId);
// 重發(fā)消息
this.rabbitBroker.reliantSend(brokerMessage.getMessage());
}
});
}
}
上面的代碼中加入了注解
@ElasticJobConfig( name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob", cron= "0/10 * * * * ?", description = "可靠性投遞消息補(bǔ)償任務(wù)", overwrite = true, shardingTotalCount = 1 )
則該類中的邏輯會(huì)定時(shí)執(zhí)行。文章來源:http://www.zghlxwxcb.cn/news/detail-442204.html
對(duì)于重發(fā)消息的代碼 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已經(jīng)做過說明,這里不再贅述。文章來源地址http://www.zghlxwxcb.cn/news/detail-442204.html
到了這里,關(guān)于RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!