任務(wù)調(diào)度框架
Java中如何實(shí)現(xiàn)定時(shí)任務(wù)?
比如:
1.每天早上6點(diǎn)定時(shí)執(zhí)行
2.每月最后一個(gè)工作日,考勤統(tǒng)計(jì)
3.每個(gè)月25號(hào)信用卡還款
4.會(huì)員生日祝福
5.每隔3秒,自動(dòng)提醒
10分鐘的超時(shí)訂單的自動(dòng)取消,每隔30秒或1分鐘查詢一次訂單,拿當(dāng)前的時(shí)間上前推10分鐘
定時(shí)任務(wù),資源會(huì)有誤差的存在,如果使用定時(shí)任務(wù)
定時(shí)任務(wù),用于統(tǒng)計(jì)的時(shí)候最多。
自動(dòng)統(tǒng)計(jì)考勤,一般0點(diǎn)之后開始統(tǒng)計(jì),可以使用定時(shí)任務(wù)
nacos心跳
晚上要求和采購部門生成采購單,達(dá)到最低預(yù)警值的時(shí)候,去發(fā)給采購部門
我們可以通過任務(wù)調(diào)度框架實(shí)現(xiàn)上述的需求
任務(wù)調(diào)度框架,可以實(shí)現(xiàn)定時(shí)任務(wù),實(shí)現(xiàn)間隔多少時(shí)間的重復(fù)執(zhí)行,實(shí)現(xiàn)指定日期的重復(fù)執(zhí)行
電商自動(dòng)好評,間隔時(shí)間長的,誤差幾分鐘,影響不大。
java中任務(wù)調(diào)度框架:
1、Spring Task spring自帶的
2、Quartz 古老的框架
3、XXL-Job
4、第三方云平臺(tái)-比如說:阿里云-SchedulerX等等
選擇一個(gè):Spring Task
2個(gè)注解+
包:task任務(wù)、job
使用步驟:
1、開關(guān)類,使用注解
@EnableScheduling // 開啟任務(wù)調(diào)度
2、定義定時(shí)任務(wù)
@Scheduled(cron = "0/3 * * * ?")
CORN表達(dá)式:
秒 分 時(shí) 日 月 星期幾 年 其中,只有年可以省略
定時(shí)任務(wù),需要重復(fù)執(zhí)行的方法
CORN表達(dá)式:
特殊字符串,主要用來描述時(shí)間的,用于任務(wù)調(diào)度等
https://cron.qqe2.com/
/ 間隔
- 是連續(xù)
, 枚舉值
L 最后,星期、日中用
W 有效工作日
LW 某個(gè)月最后一個(gè)工作日
# 用于確定每個(gè)月第幾個(gè)星期幾,母親節(jié)或父親節(jié)
4#2 某個(gè)月的第二個(gè)星期三 4代表星期三,中文的時(shí)候有可能不影響
項(xiàng)目名:SpringTask01
Spring Web、Lombok
RabbitMq實(shí)現(xiàn)延遲:
死信+延遲消息處理
死信:RabbitMQ的隊(duì)列中的消息,滿足以下條件任意其一就會(huì)成為死信消息:
1.消息被拒絕
2.消息過期
3.隊(duì)列滿了
死信交換器:專門用來轉(zhuǎn)發(fā)隊(duì)列中的死信消息,將死信消息轉(zhuǎn)發(fā)到指定的隊(duì)列中
十分鐘未支付,取消訂單?
十分鐘之后,消息會(huì)過期,過期后,通過死信交換器轉(zhuǎn)發(fā)隊(duì)列中的死信消息,將死信消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,由消費(fèi)者去處理。
我們可以通過死信+死信交換器實(shí)現(xiàn)延遲消息處理
RabbitMQ實(shí)現(xiàn)延遲消息處理有2種方式:
1、死信+死信交換器 代碼實(shí)現(xiàn)(可控,更方便一些)
2、延遲消息插件
1、死信+死信交換器 代碼實(shí)現(xiàn)(可控,更方便一些)
發(fā)送消息,到隊(duì)列1,(產(chǎn)生延遲隊(duì)列,產(chǎn)生死信)
一個(gè)消息,過一段時(shí)間,實(shí)現(xiàn)消費(fèi)。
核心:
1.隊(duì)列 是2個(gè)隊(duì)列,第1個(gè)隊(duì)列: 目的 產(chǎn)生死信(1.設(shè)置有效期2.不設(shè)置消費(fèi)者),借助死信交換器,把產(chǎn)生的死信發(fā)到指定的隊(duì)列中
第二個(gè)對了:目的 消費(fèi)死信,這里獲取的信息,時(shí)間就是延遲的,延遲的就是上面的有效期
2.1個(gè)交換器
死信交換器,整個(gè)系統(tǒng)一般就一個(gè),可以用來轉(zhuǎn)發(fā)各個(gè)功能產(chǎn)生的死信,是Direct類型的交換,通過RK進(jìn)行消息匹配到對應(yīng)的隊(duì)列中。
RabbitMQ的消息的有效期有2種設(shè)置方式:
1.設(shè)置隊(duì)列上的有效期,整個(gè)隊(duì)列中所有消息都使用
2.可以在每個(gè)消息上設(shè)置有效期,這種適用于有多個(gè)不同有效期的消息
如果隊(duì)列和消息都有有效期,誰短聽誰的。
代碼:
RabbitMQ02
實(shí)現(xiàn):
1.pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
2.代碼:
config-RabbitMQConfig
package com.yd.rabbitmq02.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @author MaoXiqi
* @organization: Lucky
* @create 2023-10-16 11:35:54
* @description RabbitMQConfigApi
*/
@Configuration
public class RabbitMQConfig {
// 1.創(chuàng)建2個(gè)隊(duì)列
@Bean
public Queue createQ1() {
// 1.設(shè)置隊(duì)列 內(nèi)部消息有效期 設(shè)置死信交換器 設(shè)置RK
HashMap<String, Object> params = new HashMap<>();
// 設(shè)置隊(duì)列中每個(gè)消息的有效期 單位 毫秒
params.put("x-message-ttl", 3000);
// 設(shè)置對應(yīng)的死信交換器
params.put("x-dead-letter-exchange", "dead-ex-yd");
// 設(shè)置交換器匹配的路由名稱
params.put("x-dead-letter-routing-key", "test");
return QueueBuilder.durable("dl-q01").withArguments(params).build();
}
@Bean
public Queue createQ2() {
return new Queue("dl-q02");
}
// 2.創(chuàng)建1個(gè)交換器(1.fanout 2,direct 3.topic 4.header)-死信交換器direct類型
@Bean
public DirectExchange createDe() {
return new DirectExchange("dead-ex-yd");
}
// 3.創(chuàng)建1個(gè)綁定
@Bean
public Binding createBd1(DirectExchange de) {
return BindingBuilder.bind(createQ2()).to(de).with("test");
}
}
2.1.創(chuàng)建兩個(gè)隊(duì)列
1、設(shè)置隊(duì)列,內(nèi)部消息有效期 設(shè)置死信交換器 設(shè)置RK 注意:參數(shù)名是固定的,值根據(jù)業(yè)務(wù)需求去改,值 單位是毫秒
2.2創(chuàng)建1個(gè)交換器(1.fanout 2.direct 3.tipic
4.header)-死信交換器direct類型
2.3.創(chuàng)建一個(gè)綁定
controller-DeadController
@GetMapping("send")
public String sendDead(String msg) {
System.out.println("發(fā)送消息," + msg + ",發(fā)送時(shí)間:" + System.currentTimeMillis());
template.convertAndSend("", "dl-q01", msg);
return "ok";
}
發(fā)送
監(jiān)聽消息-主要是為了消費(fèi):
listener-DeadListener
@RabbitListener(queues = "dl-q02")
public void handler(String m) {
System.out.println("延遲消息,"+m+",接受時(shí)間:" +System.currentTimeMillis());
}
yml:
spring:
rabbitmq:
host: 121.36.5.100
port: 5672
username: guest
password: guest
server:
port: 8082
測試:
RabbitMQ事務(wù):
數(shù)據(jù)庫中事務(wù):保證數(shù)據(jù)一致性,特別是多個(gè)操作要么都成功,要么都失敗
RabbitMQ事務(wù):一次性發(fā)多條消息,需要開啟事務(wù),
RabbitMQ也有自己的事務(wù),如果本次
使用步驟:
源碼:RabbitMQ02
1.創(chuàng)建配置類
2.使用基于事務(wù)發(fā)送
3.監(jiān)聽消息
1.創(chuàng)建配置類
config-RabbitMQTranConfig
1.準(zhǔn)備一個(gè)隊(duì)列
2.創(chuàng)建事務(wù)管理器
// 創(chuàng)建事務(wù)管理器
@Bean
public RabbitTransactionManager createTran(ConnectionFactory factory) {
return new RabbitTransactionManager(factory);
}
2.controller-TranController
開啟事務(wù)
發(fā)送消息
// 事務(wù)
@Transactional // 需要開啟SpringBoot的事務(wù)機(jī)制
@GetMapping("sendmsg")
public String sendMsg(String msg, int count) {
// 開啟 RabbitMQ的通道的事務(wù)
template.setChannelTransacted(true);
// 發(fā)送消息
for (int i = 0; i < count; i++) {
template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
// 出錯(cuò),看看 事務(wù)是否生效
if (i==2) {
System.out.println(1/0);
}
}
return "ok";
}
3.listener-DeadListener
// 事務(wù)
@RabbitListener(queues = "yd-tran-q01")
public void handler2(String msg) {
System.out.println("監(jiān)聽消息"+msg);
// 處理業(yè)務(wù)邏輯 出錯(cuò)了
}
手動(dòng)ACK
RabbitMQ怎么防止消息丟失:
1.發(fā)送端沒有發(fā)送過去
解決:
1.用事務(wù)
2.confirm消息確認(rèn)機(jī)制 萬能:轉(zhuǎn)人工處理
2.MQ服務(wù)器丟失,MQ服務(wù)器蹦了,
解決:開啟持久化
3.消費(fèi)端消息丟失:
解決:自動(dòng)應(yīng)答,改成開始手動(dòng)ACK
消息確認(rèn)機(jī)制:默認(rèn)是自動(dòng)確認(rèn)
消息的發(fā)送和接收是異步
RabbitMQ如何防止消息丟失:
代碼:
config-RabbitMqTranConfig
//消費(fèi)消息的?動(dòng)應(yīng)答
@Bean
public Queue createQ4() {
return new Queue("yd-ack-q01");
}
controller-DeadController
// 事務(wù)
@Transactional // 需要開啟SpringBoot的事務(wù)機(jī)制
@GetMapping("sendmsg")
public String sendMsg(String msg, int count) {
// 開啟 RabbitMQ的通道的事務(wù)
template.setChannelTransacted(true);
// 發(fā)送消息
for (int i = 0; i < count; i++) {
template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
// 出錯(cuò),看看 事務(wù)是否生效
if (i==2) {
System.out.println(1/0);
}
}
return "ok";
}
listener-DeadListener
一般設(shè)置個(gè)上限,比如最多三次
@RabbitListener(queues = "yd-ack-q01")
public void handler3(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
//消費(fèi)者獲取消息,默認(rèn)采用的自動(dòng)應(yīng)答,就是獲取就應(yīng)答,這樣MQ服務(wù)器就刪除消息
//還可以手動(dòng)應(yīng)答:結(jié)果:1.成功(MQ刪除)2.失敗(MQ消息)
System.out.println("收到ACK消息,監(jiān)聽消息:" + msg);
//拒絕消息 參數(shù)說明:1.消息id 2.結(jié)果 true 成功 false 拒絕 3.是否把消息添加回隊(duì)列中
channel.basicNack(tag,false,true);
//成功消息 參數(shù)說明:1.消息id 2.結(jié)果 true 成功 false 拒絕
//channel.basicAck(tag,true);
// 處理業(yè)務(wù)邏輯 出錯(cuò)了
}
消費(fèi)消息的手動(dòng)應(yīng)答:
RabbitMQ默認(rèn)的消費(fèi)者消息獲取模式采用的是手動(dòng)應(yīng)答
但是這種有缺陷,可能會(huì)出現(xiàn),消息獲取了但是業(yè)務(wù)出了問題,導(dǎo)致MQ也自動(dòng)刪除了消息,最終導(dǎo)致業(yè)務(wù)沒有執(zhí)行
所以為了解決這種問題,可以開啟手動(dòng)應(yīng)答模式,結(jié)合自己的業(yè)務(wù)執(zhí)行情況,如果業(yè)務(wù)執(zhí)行成功,那么就成功應(yīng)答,如果失敗了,就拒絕消息,同時(shí)把消息再加回隊(duì)列,這樣就可以再次消息再次處理(加個(gè)上限)
測試
RabbitMQ如何保證消息的冪等性:
冪等性就是重復(fù)消費(fèi)。
解決:
1.生成一個(gè)全局id,存入redis或者數(shù)據(jù)庫,在消費(fèi)者消費(fèi)消息之前,查詢一下該消息是否有小費(fèi)過。
2.
用戶充值,重復(fù)消費(fèi),相當(dāng)如充值了多次,是一定要杜絕的。
不要返回值的時(shí)候,可以用RabbitMQ替代OpenFegin,因?yàn)橄M(fèi)完就不回了。MQ默認(rèn)是單向的
短信發(fā)信可以用MQ,這個(gè)業(yè)務(wù)要做,做起來可能會(huì)很耗時(shí),中間要經(jīng)過運(yùn)營商,過程不可控
RabbitMQ應(yīng)用場景
消息通信,發(fā)送消息和接收消息,是異步
1.實(shí)現(xiàn)服務(wù)通信
用在微服務(wù)中,實(shí)現(xiàn)2個(gè)服務(wù)的通信,這種不帶返回值的,只是為了執(zhí)行另一個(gè)服務(wù)的方法執(zhí)行
2.解決耗時(shí)操作
比如:郵件、短信、第三方接口等,比較耗時(shí)
3.解耦
4.提升性能
5.重復(fù)代碼封裝
6.削峰填谷(訂單先下到redis中,再通過MQ和延遲隊(duì)列,慢慢的從redis搬到mysql中)文章來源:http://www.zghlxwxcb.cn/news/detail-715311.html
關(guān)鍵詞:異步、解耦、延遲文章來源地址http://www.zghlxwxcb.cn/news/detail-715311.html
到了這里,關(guān)于任務(wù)調(diào)度框架-如何實(shí)現(xiàn)定時(shí)任務(wù)+RabbitMQ事務(wù)+手動(dòng)ACK的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!