背景
目前在職的公司,維護著Spring Cloud分布式微服務項目有25+個。其中有10個左右微服務都寫有定時任務邏輯,采用Spring @Scheduled這種方式。
Spring @Scheduled定時任務的缺點:
- 不支持集群:為避免重復執(zhí)行,需引入分布式鎖
- 死板不靈活:不支持手動執(zhí)行,單次執(zhí)行,補償執(zhí)行,修改任務參數(shù),暫停任務,刪除任務,修改調度時間,失敗重試
- 無報警機制:任務失敗之后沒有報警機制,邏輯執(zhí)行異常記錄ERROR日志接入Prometheus告警這種方式不算,這算是日志層面的告警,而不是任務層面的告警機制
- 不支持分片任務:處理有序數(shù)據(jù)時,多機器分片執(zhí)行任務處理不同數(shù)據(jù)
- ……
基于此,考慮引入輕量級分布式定時調度框架XXL-JOB,即把定時任務遷到XXL-JOB平臺。
關于XXL-JOB,可參考之前的blog。
設計方案
考慮到我們有10+個SC分布式應用,30+個定時任務。如果每個應用都需要遷移改造的話,則每個應用都需要配置XXL-JOB相關的信息。當然,這可以通過Apollo namespace共享繼承機制來實現(xiàn)。題外話:有空的話,后面會寫一篇Apollo namespace配置繼承的blog。
也就是說,我可以在一個應用里(一個應用對應著一個Apollo namespace)的Apollo里維護好XXL-JOB的配置信息,其他應用通過復用此應用(的Apollo)來實現(xiàn)配置復用。
但是每個應用還得新增一個配置類,配置類怎么實現(xiàn)復用呢?這也能解決。解決方案就是在commons組件庫里維護配置類(需要引入Spring @Configuration注解,即引入spring-context
依賴包),然后每個應用的Spring Boot啟動類里需要掃描到此配置類。
還得改造一下30+個定時任務對應的30+個@@Component定時任務類,所有的定時任務應用都需要引入maven依賴。
還得手動在XXL-JOB里新增定時任務類。
看起來還不錯的方案,但是不排除不同的應用有同名的配置,遇到同名的配置,則需要修改配置命名。Spring Boot啟動類改造可能會帶來未知的問題。
最后的最后,考慮到我們所有的應用都需要經(jīng)過Gateway網(wǎng)關服務來轉發(fā),不管是對內的應用,還是對外的應用,對外的應用有包括C端,B端,和第三方客戶。故而有下面的最終方案。
實現(xiàn)方案
在對內的網(wǎng)關應用里,引入maven依賴:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0</version>
</dependency>
新增如下XXL-JOB配置類:
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.port:9999}")
private int port;
@Value("${xxl.job.accessToken:default_token}")
private String accessToken;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
executor.setAdminAddresses(adminAddresses);
executor.setAppname(appName);
executor.setPort(port);
executor.setAccessToken(accessToken);
return executor;
}
}
對應的,需要在Apollo里新增如下配置。其中有些配置是固定不變的,可以放在本地配置文件里;未來有可能變化的,放在Apollo里。
這里的appname實際上就是XXL-JOB的執(zhí)行器:
gateway服務是以pod形式運行在k8s集群里,不言而喻,采用自動注冊這種方式。
網(wǎng)關服務里新增定時任務解析,請求轉發(fā)配置類:
@Slf4j
@Component
public class XxlJobLogicConfig {
private static final String URL = "url:";
private static final String METHOD = "method:";
private static final String DATA = "data:";
private static final String GET = "GET";
private static final String POST = "POST";
@XxlJob("httpJobHandler")
public void httpJobHandler() {
// 參數(shù)解析及校驗
String jobParam = XxlJobHelper.getJobParam();
if (StringUtils.isBlank(jobParam)) {
XxlJobHelper.log("param[" + jobParam + "] invalid");
XxlJobHelper.handleFail();
return;
}
String[] httpParams = jobParam.split("\n");
String url = "";
String method = "";
String data = "null";
for (String httpParam : httpParams) {
if (httpParam.startsWith(URL)) {
url = httpParam.substring(httpParam.indexOf(URL) + URL.length()).trim();
}
if (httpParam.startsWith(METHOD)) {
method = httpParam.substring(httpParam.indexOf(METHOD) + METHOD.length()).trim().toUpperCase();
}
if (httpParam.startsWith(DATA)) {
data = httpParam.substring(httpParam.indexOf(DATA) + DATA.length()).trim();
}
}
if (StringUtils.isBlank(url)) {
XxlJobHelper.log("url[" + url + "] invalid");
XxlJobHelper.handleFail();
return;
}
if (!GET.equals(method) && !POST.equals(method)) {
XxlJobHelper.log("method[" + method + "] invalid");
XxlJobHelper.handleFail();
return;
}
log.info("xxlJob調度請求url={},請求method={},請求數(shù)據(jù)data={}", url, method, data);
// 判斷是否為POST請求
boolean isPostMethod = POST.equals(method);
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// 設置具體的方法,也就是具體的定時任務
connection.setRequestMethod(method);
// POST請求需要output
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(900 * 1000);
connection.setConnectTimeout(600 * 1000);
// connection:Keep-Alive 表示在一次http請求中,服務器進行響應后,不再直接斷開TCP連接,而是將TCP連接維持一段時間。
// 在這段時間內,如果同一客戶端再次向服務端發(fā)起http請求,便可以復用此TCP連接,向服務端發(fā)起請求。
connection.setRequestProperty("connection", "keep_alive");
// Content-Type 表示客戶端向服務端發(fā)送的數(shù)據(jù)的媒體類型(MIME類型)
connection.setRequestProperty("content-type", "application/json;charset=UTF-8");
// Accept-Charset 表示客戶端希望服務端返回的數(shù)據(jù)的媒體類型(MIME類型)
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// gateway請求轉發(fā)到其他應用
connection.connect();
// 如果是POST請求,則判斷定時任務是否含有執(zhí)行參數(shù)
if (isPostMethod && StringUtils.isNotBlank(data)) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
// 寫參數(shù)
dataOutputStream.write(data.getBytes(Charset.defaultCharset()));
dataOutputStream.flush();
dataOutputStream.close();
}
int responseCode = connection.getResponseCode();
// 判斷請求轉發(fā)、定時任務觸發(fā)是否成功
if (responseCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + responseCode + ") Invalid");
}
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), Charset.defaultCharset()));
StringBuilder stringBuilder = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
stringBuilder.append(line);
}
String responseMsg = stringBuilder.toString();
log.info("xxlJob調度執(zhí)行返回數(shù)據(jù)={}", responseMsg);
XxlJobHelper.log(responseMsg);
} catch (Exception e) {
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e) {
XxlJobHelper.log(e);
}
}
}
}
稍微有點麻煩的是,每個Spring Cloud應用都需要手動新增一個ScheduleController:
/**
* 定時任務入口,所有服務的@RequestMapping滿足/schedule/appName這種格式,方便統(tǒng)一管理
**/
@RestController
@RequestMapping("/schedule/search")
public class ScheduleController {
@Resource
private ChineseEnglishStoreSchedule chineseEnglishStoreSchedule;
@GetMapping("/chineseEnglishStoreSchedule")
public Response<Boolean> chineseEnglishStoreSchedule() {
chineseEnglishStoreSchedule.execute();
return Response.success(true);
}
}
另外,需要在gateway網(wǎng)關服務里新增路由轉發(fā)規(guī)則:
每個有定時任務,且準備接入XXL-JOB平臺的SC微服務,都需要新增類似上面截圖里的4條配置信息。
優(yōu)點:所有帶有定時任務的服務一目了然,方便統(tǒng)一維護和管理。
這種方案無需改造具體的某個Schedule類:
@JobHander(value = "autoJobHandler")
public class AutoJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String... params) {
try {
// 既有的業(yè)務邏輯
// 執(zhí)行成功
return ReturnT.SUCCESS;
} catch (Exception e) {
logger.error("execute error id:{}, error info:{}", id, e);
return ReturnT.FAIL;
}
return ReturnT.SUCCESS;
}
}
最后都省卻不了的一個步驟,在XXL-JOB admin管理平臺新增一個個任務:文章來源:http://www.zghlxwxcb.cn/news/detail-665597.html
驗證
任務調度的執(zhí)行日志:
ELK日志查詢平臺里也可以搜索到邏輯代碼里打印的日志。文章來源地址http://www.zghlxwxcb.cn/news/detail-665597.html
參考
到了這里,關于Spring@Scheduled定時任務接入XXL-JOB的一種方案(基于SC Gateway)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!