1、課程發(fā)布
為了提高網(wǎng)站的速度需要將課程信息進(jìn)行緩存,并且要將課程信息加入索引庫方便搜索,下圖顯示了課程發(fā)布后課程信息的流轉(zhuǎn)情況:
1、向內(nèi)容管理數(shù)據(jù)庫的課程發(fā)布表存儲(chǔ)課程發(fā)布信息,更新課程基本信息表中發(fā)布狀態(tài)為已發(fā)布。
2、向Redis存儲(chǔ)課程緩存信息。
3、向Elasticsearch存儲(chǔ)課程索引信息。
4、請(qǐng)求分布文件系統(tǒng)存儲(chǔ)課程靜態(tài)化頁面(即html頁面),實(shí)現(xiàn)快速瀏覽課程詳情頁面。
課程發(fā)布表的數(shù)據(jù)來源于課程預(yù)發(fā)布表,它們的結(jié)構(gòu)基本一樣,只是課程發(fā)布表中的狀態(tài)是課程發(fā)布狀態(tài),如下圖:
redis中的課程緩存信息是將課程發(fā)布表中的數(shù)據(jù)轉(zhuǎn)為json進(jìn)行存儲(chǔ)。
elasticsearch中的課程索引信息是根據(jù)搜索需要將課程名稱、課程介紹等信息進(jìn)行索引存儲(chǔ)。
MinIO中存儲(chǔ)了課程的靜態(tài)化頁面文件(html網(wǎng)頁),查看課程詳情是通過文件系統(tǒng)去瀏覽課程詳情頁面。
2、什么是分布式事務(wù)
一次課程發(fā)布操作需要向數(shù)據(jù)庫、redis、elasticsearch、MinIO寫四份數(shù)據(jù),這里存在分布式事務(wù)問題。
什么是分布式事務(wù)?
首先理解什么是本地事務(wù)?
平常我們?cè)诔绦蛑型ㄟ^spring去控制事務(wù)是利用數(shù)據(jù)庫本身的事務(wù)特性來實(shí)現(xiàn)的,因此叫數(shù)據(jù)庫事務(wù),由于應(yīng)用主要靠關(guān)系數(shù)據(jù)庫來控制事務(wù),此數(shù)據(jù)庫只屬于該應(yīng)用,所以基于本應(yīng)用自己的關(guān)系型數(shù)據(jù)庫的事務(wù)又被稱為本地事務(wù)。
本地事務(wù)具有ACID四大特性,數(shù)據(jù)庫事務(wù)在實(shí)現(xiàn)時(shí)會(huì)將一次事務(wù)涉及的所有操作全部納入到一個(gè)不可分割的執(zhí)行單元,該執(zhí)行單元中的所有操作 要么都成功,要么都失敗,只要其中任一操作執(zhí)行失敗,都將導(dǎo)致整個(gè)事務(wù)的回滾。
理解了本地事務(wù),什么是分布式事務(wù)?
現(xiàn)在的需求是課程發(fā)布操作后將數(shù)據(jù)寫入數(shù)據(jù)庫、redis、elasticsearch、MinIO四個(gè)地方,這四個(gè)地方已經(jīng)不限制在一個(gè)數(shù)據(jù)庫內(nèi),是由四個(gè)分散的服務(wù)去提供,與這四個(gè)服務(wù)去通信需要網(wǎng)絡(luò)通信,而網(wǎng)絡(luò)存在不可到達(dá)性,這種分布式系統(tǒng)環(huán)境下,通過與不同的服務(wù)進(jìn)行網(wǎng)絡(luò)通信去完成事務(wù)稱之為分布式事務(wù)。
3、 什么是CAP理論
控制分布式事務(wù)首先需要理解CAP理論,什么是CAP理論?
CAP是 Consistency、Availability、Partition tolerance三個(gè)詞語的縮寫,分別表示一致性、可用性、分區(qū)容忍性。
使用下邊的分布式系統(tǒng)結(jié)構(gòu) 進(jìn)行說明:
客戶端經(jīng)過網(wǎng)關(guān)訪問用戶服務(wù)的兩個(gè)結(jié)點(diǎn),一致性是指用戶不管訪問哪一個(gè)結(jié)點(diǎn)拿到的數(shù)據(jù)都是最新的,比如查詢小明的信息,不能出現(xiàn)在數(shù)據(jù)沒有改變的情況下兩次查詢結(jié)果不一樣。
可用性是指任何時(shí)候查詢用戶信息都可以查詢到結(jié)果,但不保證查詢到最新的數(shù)據(jù)。
分區(qū)容忍性也叫分區(qū)容錯(cuò)性,當(dāng)系統(tǒng)采用分布式架構(gòu)時(shí)由于網(wǎng)絡(luò)通信異常導(dǎo)致請(qǐng)求中斷、消息丟失,但系統(tǒng)依然對(duì)外提供服務(wù)。
CAP理論要強(qiáng)調(diào)的是在分布式系統(tǒng)中這三點(diǎn)不可能全部滿足,由于是分布式系統(tǒng)就要滿足分區(qū)容忍性,因?yàn)榉?wù)之間難免出現(xiàn)網(wǎng)絡(luò)異常,不能因?yàn)榫植烤W(wǎng)絡(luò)異常導(dǎo)致整個(gè)系統(tǒng)不可用。
滿足P那么C和A不能同時(shí)滿足:
比如我們添加一個(gè)用戶小明的信息,該信息先添加到結(jié)點(diǎn)1中,再同步到結(jié)點(diǎn)2中,如下圖:
如果要滿足C一致性,必須等待小明的信息同步完成系統(tǒng)才可用(否則會(huì)出現(xiàn)請(qǐng)求到結(jié)點(diǎn)2時(shí)查詢不到數(shù)據(jù),違反了一致性),在信息同步過程中系統(tǒng)是不可用的,所以滿足C的同時(shí)無法滿足A。
如果要滿足A可用性,要時(shí)刻保證系統(tǒng)可用就不用等待信息同步完成,此時(shí)系統(tǒng)的一致性無法滿足。
所以在分布式系統(tǒng)中進(jìn)行分布式事務(wù)控制,要么保證CP、要么保證AP。
4、什么是BASE理論?
BASE 是 Basically Available(基本可用)、Soft state(軟狀態(tài))和 Eventually consistent (最終一致性)三個(gè)短語的縮寫。
基本可用:當(dāng)系統(tǒng)無法滿足全部可用時(shí)保證核心服務(wù)可用即可,比如一個(gè)外賣系統(tǒng),每到中午12點(diǎn)左右系統(tǒng)并發(fā)量很高,此時(shí)要保證下單流程涉及的服務(wù)可用,其它服務(wù)暫時(shí)不可用。
軟狀態(tài):是指可以存在中間狀態(tài),比如:打印自己的社保統(tǒng)計(jì)情況,該操作不會(huì)立即出現(xiàn)結(jié)果,而是提示你打印中,請(qǐng)?jiān)赬XX時(shí)間后查收。雖然出現(xiàn)了中間狀態(tài),但最終狀態(tài)是正確的。
最終一致性:退款操作后沒有及時(shí)到賬,經(jīng)過一定的時(shí)間后賬戶到賬,舍棄強(qiáng)一致性,滿足最終一致性。
5、分布式事務(wù)控制有哪些常用的技術(shù)方案?
實(shí)現(xiàn)CP就是要實(shí)現(xiàn)強(qiáng)一致性:
使用Seata框架基于AT模式實(shí)現(xiàn)
使用Seata框架基于TCC模式實(shí)現(xiàn)。
實(shí)現(xiàn)AP則要保證最終數(shù)據(jù)一致性:
使用消息隊(duì)列通知的方式去實(shí)現(xiàn),通知失敗自動(dòng)重試,達(dá)到最大失敗次數(shù)需要人工處理;
使用任務(wù)調(diào)度的方案,啟動(dòng)任務(wù)調(diào)度將課程信息由數(shù)據(jù)庫同步到elasticsearch、MinIO、redis中。
6、課程發(fā)布的事務(wù)控制方案
學(xué)習(xí)了這么多的理論,回到課程發(fā)布,執(zhí)行課程發(fā)布操作后要向數(shù)據(jù)庫、redis、elasticsearch、MinIO寫四份數(shù)據(jù),這個(gè)場(chǎng)景用哪種方案?
滿足CP?
如果要滿足CP就表示課程發(fā)布操作后向數(shù)據(jù)庫、redis、elasticsearch、MinIO寫四份數(shù)據(jù),只要有一份寫失敗其它的全部回滾。
滿足AP?
課程發(fā)布操作后,先更新數(shù)據(jù)庫中的課程發(fā)布狀態(tài),更新后向redis、elasticsearch、MinIO寫課程信息,只要在一定時(shí)間內(nèi)最終向redis、elasticsearch、MinIO寫數(shù)據(jù)成功即可。
目前我們已經(jīng)有了任務(wù)調(diào)度的技術(shù)積累,這里選用任務(wù)調(diào)度的方案去實(shí)現(xiàn)分布式事務(wù)控制,課程發(fā)布滿足AP即可。
下圖是具體的技術(shù)方案:
1、在內(nèi)容管理服務(wù)的數(shù)據(jù)庫中添加一個(gè)消息表,消息表和課程發(fā)布表在同一個(gè)數(shù)據(jù)庫。
2、點(diǎn)擊課程發(fā)布通過本地事務(wù)向課程發(fā)布表寫入課程發(fā)布信息,同時(shí)向消息表寫課程發(fā)布的消息。通過數(shù)據(jù)庫進(jìn)行控制,只要課程發(fā)布表插入成功消息表也插入成功,消息表的數(shù)據(jù)就記錄了某門課程發(fā)布的任務(wù)。
3、啟動(dòng)任務(wù)調(diào)度系統(tǒng)定時(shí)調(diào)度內(nèi)容管理服務(wù)去定時(shí)掃描消息表的記錄。
4、當(dāng)掃描到課程發(fā)布的消息時(shí)即開始完成向redis、elasticsearch、MinIO同步數(shù)據(jù)的操作。
5、同步數(shù)據(jù)的任務(wù)完成后刪除消息表記錄。
時(shí)序圖如下:
下圖是課程發(fā)布操作的流程:
1、執(zhí)行發(fā)布操作,內(nèi)容管理服務(wù)存儲(chǔ)課程發(fā)布表的同時(shí)向消息表添加一條“課程發(fā)布任務(wù)”。這里使用本地事務(wù)保證課程發(fā)布信息保存成功,同時(shí)消息表也保存成功。
2、任務(wù)調(diào)度服務(wù)定時(shí)調(diào)度內(nèi)容管理服務(wù)掃描消息表,由于課程發(fā)布操作后向消息表插入一條課程發(fā)布任務(wù),此時(shí)掃描到一條任務(wù)。
3、拿到任務(wù)開始執(zhí)行任務(wù),分別向redis、elasticsearch及文件系統(tǒng)存儲(chǔ)數(shù)據(jù)。
4、任務(wù)完成后刪除消息表記錄。
7、課程發(fā)布接口開發(fā)
@Transactional
@Override
public void publish(Long companyId, Long courseId) {
//查詢預(yù)發(fā)布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if(coursePublishPre == null){
XueChengPlusException.cast("課程沒有審核記錄,無法發(fā)布");
}
//狀態(tài)
String status = coursePublishPre.getStatus();
//課程如果沒有審核通過不允許發(fā)布
if(!status.equals("202004")){
XueChengPlusException.cast("課程沒有審核通過不允許發(fā)布");
}
//向課程發(fā)布表寫入數(shù)據(jù)
CoursePublish coursePublish = new CoursePublish();
BeanUtils.copyProperties(coursePublishPre,coursePublish);
//先查詢課程發(fā)布,如果有則更新,沒有再添加
CoursePublish coursePublishObj = coursePublishMapper.selectById(courseId);
if(coursePublishObj == null){
coursePublishMapper.insert(coursePublish);
}else{
coursePublishMapper.updateById(coursePublish);
}
//向消息表寫入數(shù)據(jù)
// mqMessageService.addMessage("course_publish",String.valueOf(courseId),null,null);
saveCoursePublishMessage(courseId);
//將預(yù)發(fā)布表數(shù)據(jù)刪除
coursePublishPreMapper.deleteById(courseId);
}
8、課程發(fā)布消息sdk
課程發(fā)布操作執(zhí)行后需要掃描消息表的記錄,有關(guān)消息表處理的有哪些?
上圖中紅色框內(nèi)的都是與消息處理相關(guān)的操作:
1、新增消息表
2、掃描消息表。
3、更新消息表。
4、刪除消息表。
使用消息表這種方式實(shí)現(xiàn)最終事務(wù)一致性的地方除了課程發(fā)布還有其它業(yè)務(wù)場(chǎng)景。
如果在每個(gè)地方都實(shí)現(xiàn)一套針對(duì)消息表定時(shí)掃描、處理的邏輯基本上都是重復(fù)的,軟件的可復(fù)用性太低,成本太高。如何解決這個(gè)問題
?
針對(duì)這個(gè)問題可以想到將消息處理相關(guān)的邏輯做成一個(gè)通用的東西。
是做成通用的服務(wù),還是做成通用的代碼組件呢?
通用的服務(wù)是完成一個(gè)通用的獨(dú)立功能,并提供獨(dú)立的網(wǎng)絡(luò)接口,比如:項(xiàng)目中的文件系統(tǒng)服務(wù),提供文件的分布式存儲(chǔ)服務(wù)。
代碼組件也是完成一個(gè)通用的獨(dú)立功能,通常會(huì)提供API的方式供外部系統(tǒng)使用,比如:fastjson、Apache commons工具包等。
如果將消息處理做成一個(gè)通用的服務(wù),該服務(wù)需要連接多個(gè)數(shù)據(jù)庫,因?yàn)樗獟呙栉⒎?wù)數(shù)據(jù)庫下的消息表,并且要提供與微服務(wù)通信的網(wǎng)絡(luò)接口,單就針對(duì)當(dāng)前需求而言開發(fā)成本有點(diǎn)高。
如果將消息處理做一個(gè)SDK工具包相比通用服務(wù)不僅可以解決將消息處理通用化的需求,還可以降低成本。
所以,本項(xiàng)目確定將對(duì)消息表相關(guān)的處理做成一個(gè)SDK組件供各微服務(wù)使用,如下圖所示:
下邊對(duì)消息SDK的設(shè)計(jì)內(nèi)容進(jìn)行說明:sdk需要提供執(zhí)行任務(wù)的邏輯嗎
?
拿課程發(fā)布任務(wù)舉例,執(zhí)行課程發(fā)布任務(wù)是要向redis、索引庫等同步數(shù)據(jù),其它任務(wù)的執(zhí)行邏輯是不同的,所以執(zhí)行任務(wù)在sdk中不用實(shí)現(xiàn)任務(wù)邏輯,只需要提供一個(gè)抽象方法由具體的執(zhí)行任務(wù)方去實(shí)現(xiàn)。如何保證任務(wù)的冪等性
?
在視頻處理章節(jié)介紹的視頻處理的冪等性方案,這里可以采用類似方案,任務(wù)執(zhí)行完成后會(huì)從消息表刪除,如果消息的狀態(tài)是完成或不存在消息表中則不用執(zhí)行。如何保證任務(wù)不重復(fù)執(zhí)行
?
采用和視頻處理章節(jié)一致方案,除了保證任務(wù)的冪等性外,任務(wù)調(diào)度采用分片廣播,根據(jù)分片參數(shù)去獲取任務(wù),另外阻塞調(diào)度策略為丟棄任務(wù)。
注意:這里是信息同步類任務(wù),即使任務(wù)重復(fù)執(zhí)行也沒有關(guān)系,不再使用搶占任務(wù)的方式保證任務(wù)不重復(fù)執(zhí)行。
還有一個(gè)問題,根據(jù)消息表記錄是否存在或消息表中的任務(wù)狀態(tài)去保證任務(wù)的冪等性,如果一個(gè)任務(wù)有好幾個(gè)小任務(wù),比如:課程發(fā)布任務(wù)需要執(zhí)行三個(gè)同步操作:存儲(chǔ)課程到redis、存儲(chǔ)課程到索引庫,存儲(chǔ)課程頁面到文件系統(tǒng)。如果其中一個(gè)小任務(wù)已經(jīng)完成也不應(yīng)該去重復(fù)執(zhí)行。這里該如何設(shè)計(jì)?文章來源:http://www.zghlxwxcb.cn/news/detail-512653.html
將小任務(wù)作為任務(wù)的不同的階段,在消息表中設(shè)計(jì)階段狀態(tài)。文章來源地址http://www.zghlxwxcb.cn/news/detail-512653.html
9、課程發(fā)布任務(wù)調(diào)度
package com.xuecheng.content.service.jobhandler;
import com.xuecheng.base.exception.XueChengPlusException;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MessageProcessAbstract;
import com.xuecheng.messagesdk.service.MqMessageService;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class CoursePublishTask extends MessageProcessAbstract {
//任務(wù)調(diào)度入口
@XxlJob("CoursePublishJobHandler")
public void coursePublishJobHandler() throws Exception {
// 分片參數(shù)
int shardIndex = XxlJobHelper.getShardIndex();//執(zhí)行器的序號(hào),從0開始
int shardTotal = XxlJobHelper.getShardTotal();//執(zhí)行器總數(shù)
//調(diào)用抽象類的方法執(zhí)行任務(wù)
process(shardIndex,shardTotal, "course_publish",30,60);
}
//執(zhí)行課程發(fā)布任務(wù)的邏輯,如果此方法拋出異常說明任務(wù)執(zhí)行失敗
@Override
public boolean execute(MqMessage mqMessage) {
//從mqMessage拿到課程id
Long courseId = Long.parseLong(mqMessage.getBusinessKey1());
//課程靜態(tài)化上傳到minio
generateCourseHtml(mqMessage,courseId);
//向elasticsearch寫索引數(shù)據(jù)
saveCourseIndex(mqMessage,courseId);
//向redis寫緩存
//返回true表示任務(wù)完成
return true;
}
//生成課程靜態(tài)化頁面并上傳至文件系統(tǒng)
private void generateCourseHtml(MqMessage mqMessage,long courseId){
//消息id
Long taskId = mqMessage.getId();
MqMessageService mqMessageService = this.getMqMessageService();
//做任務(wù)冪等性處理
//查詢數(shù)據(jù)庫取出該階段執(zhí)行狀態(tài)
int stageOne = mqMessageService.getStageOne(taskId);
if(stageOne>0){
log.debug("課程靜態(tài)化任務(wù)完成,無需處理...");
return ;
}
//開始進(jìn)行課程靜態(tài)化
int i=1/0;//制造一個(gè)異常表示任務(wù)執(zhí)行中有問題
//..任務(wù)處理完成寫任務(wù)狀態(tài)為完成
mqMessageService.completedStageOne(taskId);
}
//保存課程索引信息 第二個(gè)階段任務(wù)
private void saveCourseIndex(MqMessage mqMessage,long courseId){
//任務(wù)id
Long taskId = mqMessage.getId();
MqMessageService mqMessageService = this.getMqMessageService();
//取出第二個(gè)階段狀態(tài)
int stageTwo = mqMessageService.getStageTwo(taskId);
//任務(wù)冪等性處理
if(stageTwo>0){
log.debug("課程索引信息已寫入,無需執(zhí)行...");
return;
}
//查詢課程信息,調(diào)用搜索服務(wù)添加索引 ...
//完成本階段的任務(wù)
mqMessageService.completedStageTwo(taskId);
}
}
到了這里,關(guān)于學(xué)成在線----day8的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!