国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

本博客屬于 《RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu)》的子博客

一、整體架構(gòu)

step1:消息落庫,業(yè)務(wù)數(shù)據(jù)存庫的同時(shí),也要將消息記錄存入數(shù)據(jù)庫,二者要保證原子性;

step2:Producer發(fā)送消息到MQ Broker;

step3:Producer收到 broker 返回的確認(rèn)消息;

step4:更改消息記錄庫的狀態(tài)(定義三種狀態(tài):0待確認(rèn)、1已確認(rèn)、2確認(rèn)失敗);

step5:定時(shí)任務(wù)獲取長時(shí)間處于待確認(rèn)狀態(tài)的消息;

step6:Producer重試發(fā)送消息;

step7:重試次數(shù)超過3次,將消息狀態(tài)更新為確認(rèn)失敗,后續(xù)根據(jù)具體業(yè)務(wù)再處理確認(rèn)失敗的消息;

RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))

二、消息記錄的增刪改查

1. 當(dāng)前項(xiàng)目名為 rabbit-core-producer,為了實(shí)現(xiàn)消息記錄入庫,需要跟數(shù)據(jù)庫打交道,這里首先添加依賴:

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
	    <dependency>
	      <groupId>org.mybatis.spring.boot</groupId>
	      <artifactId>mybatis-spring-boot-starter</artifactId>
	      <version>1.1.1</version>
	    </dependency>
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>druid</artifactId>
		    <version>1.1.10</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency> 

?2. 消息記錄的建表語句?rabbit-producer-message-schema.sql

-- 表 broker_message.broker_message 結(jié)構(gòu)
CREATE TABLE IF NOT EXISTS `broker_message` (
  `message_id` varchar(128) NOT NULL,
  `message` varchar(4000),
  `try_count` int(4) DEFAULT 0,
  `status` varchar(10) DEFAULT '',
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

將?rabbit-producer-message-schema.sql 放在 rabbit-core-producer 項(xiàng)目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer項(xiàng)目?在 RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu)?有具體說明(當(dāng)前博客是?RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu) 的其中一個(gè)章節(jié))。

3. 數(shù)據(jù)源的配置文件?rabbit-producer-message.properties

rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=root
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

同樣需要將該文件放在 rabbit-core-producer 項(xiàng)目下的 /src/main/resources/rabbit-producer-message.properties。

因?yàn)樯厦媾渲弥杏杏玫綌?shù)據(jù)庫 broker_message,所以需要自己提前建好一個(gè)數(shù)據(jù)庫 broker_message。

4.?BrokerMessage.java

public class BrokerMessage implements Serializable {
	
	private static final long serialVersionUID = 7447792462810110841L;

	private String messageId;

    private Message message;

    private Integer tryCount = 0;

    private String status;

    private Date nextRetry;

    private Date createTime;

    private Date updateTime;

    // getter、setter方法省略
}

5. BrokerMessageMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" >
  <resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" >
    <id column="message_id" property="messageId" jdbcType="VARCHAR" />
    <result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
    <result column="try_count" property="tryCount" jdbcType="INTEGER" />
    <result column="status" property="status" jdbcType="VARCHAR" />
    <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  </resultMap>
  <sql id="Base_Column_List" >
    message_id, message, try_count, status, next_retry, create_time, update_time
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
    select 
    <include refid="Base_Column_List" />
    from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
    delete from broker_message
    where message_id = #{messageId,jdbcType=VARCHAR}
  </delete>
  <insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    insert into broker_message (message_id, message, try_count, 
      status, next_retry, create_time, 
      update_time)
    values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER}, 
      #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP}, 
      #{updateTime,jdbcType=TIMESTAMP})
  </insert>
  <insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    insert into broker_message
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        message_id,
      </if>
      <if test="message != null" >
        message,
      </if>
      <if test="tryCount != null" >
        try_count,
      </if>
      <if test="status != null" >
        status,
      </if>
      <if test="nextRetry != null" >
        next_retry,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="updateTime != null" >
        update_time,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="messageId != null" >
        #{messageId,jdbcType=VARCHAR},
      </if>
      <if test="message != null" >
        #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    update broker_message
    <set >
      <if test="message != null" >
        message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      </if>
      <if test="tryCount != null" >
        try_count = #{tryCount,jdbcType=INTEGER},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=VARCHAR},
      </if>
      <if test="nextRetry != null" >
        next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      </if>
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="updateTime != null" >
        update_time = #{updateTime,jdbcType=TIMESTAMP},
      </if>
    </set>
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
    update broker_message
    set message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
      try_count = #{tryCount,jdbcType=INTEGER},
      status = #{status,jdbcType=VARCHAR},
      next_retry = #{nextRetry,jdbcType=TIMESTAMP},
      create_time = #{createTime,jdbcType=TIMESTAMP},
      update_time = #{updateTime,jdbcType=TIMESTAMP}
    where message_id = #{messageId,jdbcType=VARCHAR}
  </update>
  

  <update id="changeBrokerMessageStatus" >
    update broker_message bm
    set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
      	bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}  
  </update>
  
  
  <select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
  	<![CDATA[
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
	    and bm.next_retry < sysdate()
    ]]>
  </select>
  
  <select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
	    select message_id, message, try_count, status, next_retry, create_time, update_time
	    from broker_message bm
	    where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}  	
  </select>
  
  
   <update id="update4TryCount" >
    update broker_message bm
    set bm.try_count = bm.try_count + 1,
      bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
   </update>
   
   
</mapper>

6.?BrokerMessageMapper.java

@Mapper
public interface BrokerMessageMapper {
	
    int deleteByPrimaryKey(String messageId);
    
    int insert(BrokerMessage record);

    int insertSelective(BrokerMessage record);

    BrokerMessage selectByPrimaryKey(String messageId);

    int updateByPrimaryKeySelective(BrokerMessage record);

    int updateByPrimaryKeyWithBLOBs(BrokerMessage record);

    int updateByPrimaryKey(BrokerMessage record);
	
	void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);

	List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
	
	List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
	
	int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
	
}

7.?MessageStoreService.java(這里不加接口類了,直接在MessageStoreService.java中寫具體邏輯實(shí)現(xiàn))

@Service
public class MessageStoreService {

	@Autowired
	private BrokerMessageMapper brokerMessageMapper;
	
	public int insert(BrokerMessage brokerMessage) {
		return this.brokerMessageMapper.insert(brokerMessage);
	}
	
	public BrokerMessage selectByMessageId(String messageId) {
		return this.brokerMessageMapper.selectByPrimaryKey(messageId);
	}

	public void succuess(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_OK.getCode(),
				new Date());
	}
	
	public void failure(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_FAIL.getCode(),
				new Date());
	}
	
	public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
		return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
	}
	
	public int updateTryCount(String brokerMessageId) {
		return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
	}
}

三、整合數(shù)據(jù)源

1. 讀取配置文件,生成數(shù)據(jù)源,RabbitProducerDataSourceConfiguration.java

@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
	
	private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
	
	@Value("${rabbit.producer.druid.type}")
	private Class<? extends DataSource> dataSourceType;
	
	@Bean(name = "rabbitProducerDataSource")
	@Primary
	// 以這個(gè)rabbit.producer.druid.jdbc為前綴的屬性值都會(huì)注入到DataSource中
	@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
	public DataSource rabbitProducerDataSource() throws SQLException {
		DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
		LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
		return rabbitProducerDataSource;
	}
	
    public DataSourceProperties primaryDataSourceProperties(){
        return new DataSourceProperties();
    }
    
    public DataSource primaryDataSource(){
        return primaryDataSourceProperties().initializeDataSourceBuilder().build();
    }
	
}

2. 執(zhí)行指定的sql腳本 ,BrokerMessageConfiguration.java

/**
 * 	$BrokerMessageConfiguration 
 * 	幫我執(zhí)行SQL腳本
 * 	幫我進(jìn)行數(shù)據(jù)庫表結(jié)構(gòu)的創(chuàng)建
 *
 */
@Configuration
public class BrokerMessageConfiguration {

    @Autowired
    private DataSource rabbitProducerDataSource;

    /**
     * 加載 rabbit-producer-message-schema.sql 腳本(這是一個(gè)建表語句)
     */
    @Value("classpath:rabbit-producer-message-schema.sql")
    private Resource schemaScript;
    
    @Bean
    public DataSourceInitializer initDataSourceInitializer() {
    	System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
        final DataSourceInitializer initializer = new DataSourceInitializer();
        // 設(shè)置之前生成的數(shù)據(jù)源
        initializer.setDataSource(rabbitProducerDataSource);
        // 執(zhí)行指定的sql腳本
        initializer.setDatabasePopulator(databasePopulator());
        return initializer;
    }

    /**
     * 執(zhí)行指定的sql腳本
     * @return
     */
    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(schemaScript);
        return populator;
    }
}

3. 接下來是和 Mybatis 配置相關(guān)的文件:RabbitProducerMyBatisConfiguration.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration執(zhí)行完才能執(zhí)行,即數(shù)據(jù)源生成之后才能執(zhí)行當(dāng)前類
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {

	@Resource(name= "rabbitProducerDataSource")
	private DataSource rabbitProducerDataSource;
	
	@Bean(name="rabbitProducerSqlSessionFactory")
	public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
		
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(rabbitProducerDataSource);
		ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
		try {
			// mapper.xml文件加載,這些配置本可以寫在 application.yml 中,但是由于要作為一個(gè)基礎(chǔ)組件,所以寫在代碼里,跟業(yè)務(wù)層面解綁,讓業(yè)務(wù)層面無感知
			bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));
			SqlSessionFactory sqlSessionFactory = bean.getObject();
			sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
			return sqlSessionFactory;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	@Bean(name="rabbitProducerSqlSessionTemplate")
	public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
		return new SqlSessionTemplate(sqlSessionFactory);
	}
	
}

4. Mapper掃描配置相關(guān)的文件:?RabbitProducerMybatisMapperScanerConfig.java

@Configuration
// @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration執(zhí)行完才能執(zhí)行,即數(shù)據(jù)源生成之后才能執(zhí)行當(dāng)前類
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {
	
	@Bean(name="rabbitProducerMapperScannerConfigurer")
    public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
        // mapper.java文件加載,這些配置本可以寫在 application.yml 中,但是由于要作為一個(gè)基礎(chǔ)組件,所以寫在代碼里,跟業(yè)務(wù)層面解綁,讓業(yè)務(wù)層面無感知
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
        mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");
        return mapperScannerConfigurer;
    }

}

四、可靠性發(fā)送消息代碼實(shí)現(xiàn)

/**
 * 	$RabbitBrokerImpl 真正的發(fā)送不同類型的消息實(shí)現(xiàn)類
 *
 */
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {

	@Autowired
	private RabbitTemplateContainer rabbitTemplateContainer;
	
	@Autowired
	private MessageStoreService messageStoreService;
	
	/**
	 * 可靠性消息發(fā)送
	 */
	@Override
	public void reliantSend(Message message) {
		message.setMessageType(MessageType.RELIANT);
		BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
		if(bm == null) {
			//1. 把數(shù)據(jù)庫的消息發(fā)送日志先記錄好
			Date now = new Date();
			BrokerMessage brokerMessage = new BrokerMessage();
			brokerMessage.setMessageId(message.getMessageId());
			brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
			//tryCount默認(rèn)等于0 所以在最開始發(fā)送的時(shí)候不需要進(jìn)行設(shè)置
			brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
			brokerMessage.setCreateTime(now);
			brokerMessage.setUpdateTime(now);
			brokerMessage.setMessage(message);
			messageStoreService.insert(brokerMessage);			
		}
		//2. 執(zhí)行真正的發(fā)送消息邏輯
		sendKernel(message);
	}

	@Override
	public void rapidSend(Message message) {
		// 省略...
	}
	
	/**
	 * 	$sendKernel 發(fā)送消息的核心方法 使用異步線程池進(jìn)行發(fā)送消息
	 * @param message
	 */
	private void sendKernel(Message message) {
		AsyncBaseQueue.submit((Runnable) () -> {
			CorrelationData correlationData =
					// 回調(diào)函數(shù)confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
					new CorrelationData(String.format("%s#%s#%s",
							message.getMessageId(),
							System.currentTimeMillis(),
							message.getMessageType()));
			String topic = message.getTopic();
			String routingKey = message.getRoutingKey();
			RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
			rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
			log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());			
		});
	}

	@Override
	public void confirmSend(Message message) {
		// 省略...
	}

	@Override
	public void sendMessages() {
		// 省略...
	}

}

并且在回調(diào)函數(shù)中,也要添加相應(yīng)的邏輯:

/**
 * 	$RabbitTemplateContainer池化封裝
 * 	每一個(gè)topic 對(duì)應(yīng)一個(gè)RabbitTemplate
 *	1.	提高發(fā)送的效率
 * 	2. 	可以根據(jù)不同的需求制定化不同的RabbitTemplate, 比如每一個(gè)topic 都有自己的routingKey規(guī)則
 */
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {

	private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
	
	private Splitter splitter = Splitter.on("#");
	
	private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
	
	@Autowired
	private ConnectionFactory connectionFactory;
	
	@Autowired
	private MessageStoreService messageStoreService;
	
	public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
		// 省略...
	}

	/**
	 * 	無論是 confirm 消息 還是 reliant 消息 ,發(fā)送消息以后 broker都會(huì)去回調(diào)confirm
	 */
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		// 	具體的消息應(yīng)答
		List<String> strings = splitter.splitToList(correlationData.getId());
		String messageId = strings.get(0);
		long sendTime = Long.parseLong(strings.get(1));
		String messageType = strings.get(2);
		if(ack) {
			//	當(dāng)Broker 返回ACK成功時(shí), 就是更新一下日志表里對(duì)應(yīng)的消息發(fā)送狀態(tài)為 SEND_OK
			
			// 	如果當(dāng)前消息類型為reliant 我們就去數(shù)據(jù)庫查找并進(jìn)行更新
			if(MessageType.RELIANT.endsWith(messageType)) {
				this.messageStoreService.succuess(messageId);
			}
			log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
		} else {
			log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
			
		}
	}
}

上面大部分代碼都是在實(shí)現(xiàn)迅速類型的消息發(fā)送時(shí)已經(jīng)編寫了,只是在 confirm()方法中添加了:

            // 	如果當(dāng)前消息類型為reliant 我們就去數(shù)據(jù)庫查找并進(jìn)行更新
			if(MessageType.RELIANT.endsWith(messageType)) {
				this.messageStoreService.succuess(messageId);
			}

五、定時(shí)任務(wù)獲取長時(shí)間處于待確認(rèn)狀態(tài)的消息并重新發(fā)送

1. 實(shí)現(xiàn)分布式定時(shí)任務(wù)

這里的定時(shí)任務(wù)是使用 ElasticJob,并對(duì)其進(jìn)行封裝,封裝在項(xiàng)目?rabbit-task中,封裝成為了兩個(gè)注解 @EnableElasticJob 和?@ElasticJobConfig 。

具體的 ElasticJob 的使用和封裝過程可參考教程:ElasticJob使用與封裝

2.? 將封裝好的項(xiàng)目?rabbit-task 添加到 當(dāng)前項(xiàng)目中并使用

(1)引入 rabbit-task 的依賴

        <dependency>
    		<groupId>com.bfxy.base.rabbit</groupId>
    		<artifactId>rabbit-task</artifactId> 
    		<version>0.0.1-SNAPSHOT</version>			
  		</dependency>

(2)使用注解@EnableElasticJob

在當(dāng)前項(xiàng)目 rabbit-core-producer 中的 自動(dòng)裝配類 中添加注解 @EnableElasticJob,使得當(dāng) 應(yīng)用程序啟動(dòng)的時(shí)候,就能對(duì) ZooKeeper注冊(cè)中心進(jìn)行初始化,以及 ElasticJob的定時(shí)任務(wù)解析類 ElasticJobConfParser 的初始化。

/**
 * 	$RabbitProducerAutoConfiguration 自動(dòng)裝配 
 *
 */
@EnableElasticJob
@Configuration
@ComponentScan({"com.didiok.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {


}

(3)實(shí)現(xiàn)定時(shí)任務(wù)的具體處理邏輯并在類上加注解@EnableElasticJob

這里為了消息的可靠性發(fā)送,我們需要抓取 超時(shí)卻仍處于待確認(rèn)狀態(tài) 的消息,進(jìn)行重新發(fā)送消息。這里使用 ElasticJob 的流式定時(shí)任務(wù) DataFlowJob。

@Component
@ElasticJobConfig(
		name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
		cron= "0/10 * * * * ?",
		description = "可靠性投遞消息補(bǔ)償任務(wù)",
		overwrite = true,
		shardingTotalCount = 1
		)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{

	@Autowired
	private MessageStoreService messageStoreService;
	
	@Autowired
	private RabbitBroker rabbitBroker;
	
	private static final int MAX_RETRY_COUNT = 3;
	
	@Override
	public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
		// 抓取狀態(tài)為未確認(rèn),而且 next_retry 小于當(dāng)前時(shí)間的這些消息,為了確定百分百能發(fā)送成功,需要再進(jìn)行重發(fā)
		List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
		log.info("--------@@@@@ 抓取數(shù)據(jù)集合, 數(shù)量:	{} 	@@@@@@-----------" , list.size());
		return list;
	}

	@Override
	public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
		
		dataList.forEach( brokerMessage -> {
			
			String messageId = brokerMessage.getMessageId();
			if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
				// 重試次數(shù)大于3,就不再進(jìn)行重發(fā)了,直接認(rèn)為發(fā)送失敗,更改標(biāo)記為失敗
				this.messageStoreService.failure(messageId);
				log.warn(" -----消息設(shè)置為最終失敗,消息ID: {} -------", messageId);
			} else {
				//	每次重發(fā)的時(shí)候要更新一下try_count和next_retry字段
				this.messageStoreService.updateTryCount(messageId);
				// 	重發(fā)消息
				this.rabbitBroker.reliantSend(brokerMessage.getMessage());
			}
			
		});
	}
}

上面的代碼中加入了注解

@ElasticJobConfig(
      name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
      cron= "0/10 * * * * ?",
      description = "可靠性投遞消息補(bǔ)償任務(wù)",
      overwrite = true,
      shardingTotalCount = 1
      )

則該類中的邏輯會(huì)定時(shí)執(zhí)行。

對(duì)于重發(fā)消息的代碼 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已經(jīng)做過說明,這里不再贅述。文章來源地址http://www.zghlxwxcb.cn/news/detail-442204.html

到了這里,關(guān)于RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制 rabbitmq在消息的發(fā)送與接收中,會(huì)經(jīng)過上面的流程,這些流程中每一步都有可能導(dǎo)致消息丟失,或者消費(fèi)失敗甚至直接是服務(wù)器宕機(jī)等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機(jī)制 生產(chǎn)者確認(rèn)機(jī)制 消息持久

    2024年02月08日
    瀏覽(37)
  • RabbitMQ高級(jí)篇---消息可靠性

    RabbitMQ高級(jí)篇---消息可靠性

    1、消息可靠性: 消息從發(fā)送到消費(fèi)者接受,會(huì)經(jīng)歷多個(gè)過程,每個(gè)消息傳遞的過程都可能導(dǎo)致消息的丟失: 常見的丟失原因: 發(fā)送時(shí)消息丟失原因: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) Rab

    2024年01月20日
    瀏覽(30)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場(chǎng)景 1.2 MQ導(dǎo)致消息丟失 1.3 消費(fèi)者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機(jī)制 2.2 生產(chǎn)者確認(rèn)機(jī)制 2.3 實(shí)現(xiàn)生產(chǎn)者確認(rèn) 2.3.1 配置yml開啟生產(chǎn)者確認(rèn) 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(25)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對(duì)這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • 消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性

    消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性

    官方網(wǎng)址 源碼:https://kafka.apache.org/downloads 快速開始:https://kafka.apache.org/documentation/#gettingStarted springcloud整合 發(fā)送消息流程 主線程:主線程只負(fù)責(zé)組織消息,如果是同步發(fā)送會(huì)阻塞,如果是異步發(fā)送需要傳入一個(gè)回調(diào)函數(shù)。 Map集合:存儲(chǔ)了主線程的消息。 Sender線程:真正的

    2024年03月10日
    瀏覽(34)
  • 【RabbitMQ】之消息的可靠性方案

    【RabbitMQ】之消息的可靠性方案

    一、數(shù)據(jù)丟失場(chǎng)景 二、數(shù)據(jù)可靠性方案 1、生產(chǎn)者丟失消息解決方案 2、MQ 隊(duì)列丟失消息解決方案 3、消費(fèi)者丟失消息解決方案 MQ 消息數(shù)據(jù)完整的鏈路為 :從 Producer 發(fā)送消息到 RabbitMQ 服務(wù)器中,再由 Broker 服務(wù)的 Exchange 根據(jù) Routing_Key 路由到指定的 Queue 隊(duì)列中,最后投送到消

    2024年02月14日
    瀏覽(24)
  • RabbitMQ之消息的可靠性傳遞

    提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動(dòng)添加 RabbitMQ之消息的可靠性傳遞 提示:寫完文章后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內(nèi)容: 在當(dāng)今的信息化時(shí)代,消息傳遞在企業(yè)級(jí)應(yīng)用和分布式

    2024年01月19日
    瀏覽(21)
  • RabbitMQ消息可靠性問題及解決

    RabbitMQ消息可靠性問題及解決

    說明:在RabbitMQ消息傳遞過程中,有以下問題: 消息沒發(fā)到交換機(jī) 消息沒發(fā)到隊(duì)列 MQ宕機(jī),消息在隊(duì)列中丟失 消息者接收到消息后,未能正常消費(fèi)(程序報(bào)錯(cuò)),此時(shí)消息已在隊(duì)列中移除 針對(duì)以上問題,提供以下解決方案: 消息確認(rèn):確認(rèn)消息是否發(fā)送到交換機(jī)、隊(duì)列;

    2024年02月16日
    瀏覽(25)
  • rabbitmq如何保證消息的可靠性

    RabbitMQ可以通過以下方式來保證消息的可靠性: 在發(fā)布消息時(shí),可以設(shè)置消息的delivery mode為2,這樣消息會(huì)被持久化存儲(chǔ)在磁盤上,即使RabbitMQ服務(wù)器重啟,消息也不會(huì)丟失。 可以創(chuàng)建持久化的隊(duì)列,這樣即使RabbitMQ服務(wù)器重啟,隊(duì)列也不會(huì)丟失。 在消費(fèi)者端,可以 設(shè)置手動(dòng)

    2024年01月23日
    瀏覽(26)
  • RabbitMQ 能保證消息可靠性嗎

    RabbitMQ 能保證消息可靠性嗎

    手把手教你,本地RabbitMQ服務(wù)搭建(windows) 消息隊(duì)列選型——為什么選擇RabbitMQ RabbitMQ靈活運(yùn)用,怎么理解五種消息模型 推或拉? RabbitMQ 消費(fèi)模式該如何選擇 死信是什么,如何運(yùn)用RabbitMQ的死信機(jī)制? 前面我們?cè)谧鯩Q組件選型時(shí),提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包