前言
okey,上一篇文章我們提到了,如何實現(xiàn)它的一個清單的一個代理。這里的話我們來捋一捋我們的這個執(zhí)行流程是啥:
所以的話,我們的我們這里今天要做的是這個執(zhí)行器的一個執(zhí)行。當然這里的話,我們也是分兩個部分,因為這個執(zhí)行器的話,是分兩個部分的,一個是正常的任務執(zhí)行,還有一個是這個宕機之后,我們對任務的一個恢復的處理。
執(zhí)行器流程
提交流程
那么在這里的話,我得先說說這個執(zhí)行器提交的流程,因為這個不說清楚的話,就比較麻煩了。
首先我們先來看到這幾個類:
然后的話,我們的流程是這樣的:
1. ExecuteMagaer 負責創(chuàng)建TaskWrapper
2. TaskWrapper 里面包含了代理對象,執(zhí)行代理對象的執(zhí)行方法
3. 將TaskWraaper 提交到線程池里面去
所以的話,是通過這三個環(huán)節(jié),最終任務提交到了我們的這個線程池里面,然后進行執(zhí)行。
線程池實現(xiàn)
okey,這里的話,我們當然需要去有一個線程池,但是這個線程池的話,有個特點,那就是:
- 如果你有ID,那么相同ID的任務排隊執(zhí)行
- 如果沒有ID,那么就直接異步執(zhí)行
這樣做的話有啥好處嘛,好處就是,假設這個ID是你的UserID,在用戶下單的時候,就算重復下單,由于兩次賬單是順序執(zhí)行的,第一個賬單執(zhí)行完畢之后,改變了狀態(tài),假設此時你對商品ID上鎖了,那么第二個賬單執(zhí)行的時候,發(fā)現(xiàn)商品ID鎖住了,就不會繼續(xù)無腦執(zhí)行了。主要是實現(xiàn)更加細致的操作。
package com.huterox.todoscheduler.common.impl;
import com.huterox.todoscheduler.common.SerializationUtils;
import com.huterox.todoscheduler.common.TaskManager;
import com.huterox.todoscheduler.config.Configuration;
import com.huterox.todoscheduler.core.wapper.TaskWrapper;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
/**
* 任務管理器
* */
public class DefaultTaskManager implements TaskManager, Serializable {
private final ThreadPoolExecutor executor;
private final Map<String, BlockingQueue<Runnable>> taskQueues;
private final Object lock;
public DefaultTaskManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>());
taskQueues = new HashMap<>();
lock = new Object();
}
public DefaultTaskManager() {
executor = new ThreadPoolExecutor(
Configuration.corePoolSize,
Configuration.maximumPoolSize,
Configuration.keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
taskQueues = new HashMap<>();
lock = new Object();
}
@Override
public void submitTask(TaskWrapper task, String id) {
if (id == null || id.isEmpty()) {
executor.execute(task); // 直接執(zhí)行任務
//然后保存當前的一個狀態(tài)
saveStatus();
} else {
synchronized (lock) {
BlockingQueue<Runnable> queue = taskQueues.computeIfAbsent(id, k -> new LinkedBlockingQueue<>());
if (queue.isEmpty()) {
// 之前沒有相同ID的任務在執(zhí)行,直接提交到線程池執(zhí)行
executor.execute(() -> {
try {
task.run(); // 執(zhí)行任務
} finally {
submitNextTask(id); // 執(zhí)行完畢后提交下一個任務
saveStatus();
}
});
} else {
// 將任務加入隊列中,等待前面的任務執(zhí)行完畢后再執(zhí)行
queue.offer(() -> {
try {
task.run(); // 執(zhí)行任務
} finally {
submitNextTask(id); // 執(zhí)行完畢后提交下一個任務
saveStatus();
}
});
}
}
}
}
@Override
public void saveStatus() {
//保存當前的一個狀態(tài)
SerializationUtils.serializeObject(this,"runningTask","task.ser");
}
private void submitNextTask(String id) {
synchronized (lock) {
BlockingQueue<Runnable> queue = taskQueues.get(id);
if (queue != null && !queue.isEmpty()) {
executor.execute(queue.poll()); // 提交下一個任務
} else {
taskQueues.remove(id); // 隊列為空時移除對應的ID
}
}
}
public ThreadPoolExecutor getExecutor() {
return executor;
}
public Map<String, BlockingQueue<Runnable>> getTaskQueues() {
return taskQueues;
}
public Object getLock() {
return lock;
}
@Override
public void shutdown() {
executor.shutdown();
}
}
這里面的代碼細節(jié),我就不說了,因為不難,再說篇幅太大了,還有好多東西要說呢。
執(zhí)行器實現(xiàn)
ok,我們說了這個流程,我們來看到這個執(zhí)行器是如何實現(xiàn)的。
接口
首先的話,我們是有一個接口的:
package com.huterox.todoscheduler.core.execute;
/**
* 我們核心的調度器,通過TodoListFactory可以得到可以執(zhí)行的任務清單
* */
public interface ExecuteCore {
void execute(String ListName);
String getClsId();
void run();
//服務器意外宕機之后,恢復這個任務的時候要進行的操作
void repair();
}
看到這個接口的話,有好幾個方法,首先是執(zhí)行提交的,然后是run,這個主要是這個原因:
package com.huterox.todoscheduler.core.wapper;
import com.huterox.todoscheduler.core.enumType.ExecuteType;
import com.huterox.todoscheduler.core.execute.ExecuteCore;
import java.io.Serializable;
public class TaskWrapper implements Runnable, Serializable {
private ExecuteCore executeCore;
private ExecuteType executeType = ExecuteType.Run;
public TaskWrapper() {
}
public ExecuteCore getExecuteCore() {
return executeCore;
}
public ExecuteType getExecuteType() {
return executeType;
}
public void setExecuteType(ExecuteType executeType) {
this.executeType = executeType;
}
public void setExecuteCore(ExecuteCore executeCore) {
this.executeCore = executeCore;
}
public TaskWrapper(ExecuteCore executeCore) {
this.executeCore = executeCore;
}
@Override
public void run() {
if(executeType==ExecuteType.Run){
executeCore.run();
}else if(executeType==ExecuteType.Repair){
executeCore.repair();
}
}
}
這個TaskWrapper是實現(xiàn)了Runable接口,里面有run,所以就索性這樣寫了。
狀態(tài)標志
之后的話,我們的項目到這里的話,只是實現(xiàn)了一個正向的過程,就是當項目宕機的時候,我們要盡可能去恢復任務清單的一個執(zhí)行,或者狀態(tài),比如你買東西的接口,后面執(zhí)行退款代碼的時候,服務器宕機了,那么這個時候,我要盡可能去恢復這個宕機退款的執(zhí)行。當然這里面要考慮的東西要多得多,小項目的話,你要相信第三方組件要比自己寫的代碼靠譜(狗頭)
這里主要是這兩個:
package com.huterox.todoscheduler.core.enumType;
import java.io.Serializable;
/**
* 當前的任務清單執(zhí)行的情況
* */
public enum TodoListStateType implements Serializable {
CreatFailed,
Running,
Fine,
Error,
Repairing;
}
package com.huterox.todoscheduler.core.enumType;
import java.io.Serializable;
public enum TodoItemStateType implements Serializable {
/**
* 需要重新運行啟動,適用于強一致的清單
* */
Again,
Running,
Error,
/**
* 運行正常
* */
Fine,
/**
* 只需要執(zhí)行修復,適用于弱一致的清單
* */
Repairing;
}
執(zhí)行周期實現(xiàn)
之后的話,就是我們要實現(xiàn)一個完整的執(zhí)行周期了:
清單代理創(chuàng)建
在執(zhí)行的時候 ,我們先要去創(chuàng)建這個代理對象。但是這個代理對象的話,也有創(chuàng)建前執(zhí)行處理器,之后處理器等待。所以這里也要進行一個處理:
@Override
public void execute(String ListName) {
//1. 先拿到可執(zhí)行清單對象
todoListExBean = TodoListFactory.getInstance(ListName);
//2. 查看返回的結果
if(todoListExBean==null){
//說明創(chuàng)建就失敗了,這個失敗是清單對象都沒有起來
this.running = false;
}else {
if(todoListExBean.getTodoListStateType()== TodoListStateType.CreatFailed){
//創(chuàng)建失敗了
if(todoListExBean.getTodoListElementType()== TodoListElementType.NothingConsistency
|| todoListExBean.getTodoListElementType()== TodoListElementType.WeakConsistency
){
//在創(chuàng)建階段,只要你不是強一致性,那么我就不管你,如果創(chuàng)建都失敗了
System.err.println("任務清單創(chuàng)建失敗,取消執(zhí)行");
}else if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
//強一致,這個時候,直接進入失敗隊列,這個時候也是創(chuàng)建失敗,但是這個失敗是指
//清單項目創(chuàng)建的時候有問題,清單對象起來了
TodoListFailedList.addFailedWithSerializable(todoListExBean);
this.running = false;
}
}
}
}
清單項執(zhí)行
之后的 話,才是執(zhí)行我們的 方法。執(zhí)行之后結束。
然后對于一個清單項,它其實有這樣的幾個過程:
- 清單項創(chuàng)建前(在Factory的時候就可以看到)
- 當前清單項執(zhí)行前
- 當前清單項執(zhí)行時刻
- 當前清單項執(zhí)行后
- 清單項執(zhí)行異常
一個完整的清單周期包括:
- 清單創(chuàng)建前(創(chuàng)建之后是立刻執(zhí)行的,因此沒有執(zhí)行前這個方法)
- 清單項周期
- 清單結束
- 清單執(zhí)行異常
@Override
public void run() {
if(!this.running){
System.err.println("執(zhí)行失敗,停止執(zhí)行該任務清單");
return;
}
todoListExBean.setExTimes(todoListExBean.getExTimes()+1);
todoListExBean.setTodoListStateType(TodoListStateType.Running);
//在這里完成方法任務清單項的執(zhí)行
//同時在這里完成狀態(tài)的持久化處理,方便恢復狀態(tài)
Map<Integer, TodoItemExBean> sortedMap = todoListExBean.getSortedMap();
//開始遍歷執(zhí)行清單項
for(Map.Entry<Integer, TodoItemExBean> entry:sortedMap.entrySet()){
Integer key = entry.getKey();
TodoItemExBean entryValue = entry.getValue();
entryValue.setTodoItemStateType(TodoItemStateType.Running);
//這里開始按照生命周期執(zhí)行代碼
try {
if (entryValue.getTodoItemBeforeRunningHandler()!=null)
{
//執(zhí)行第一個運行時,運行前的代碼
TodoItemBeforeRunningHandler todoItemBeforeRunningHandler = entryValue.
getTodoItemBeforeRunningHandler();
boolean concierge = todoItemBeforeRunningHandler.concierge(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
if(!concierge){
//沒有滿足通過條件,需要跳過或者終止
if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
System.err.println("任務清單:"+todoListExBean.getTodoListName()
+"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
"方法:"+entryValue.getWrapperMethod().getName()+"未通過運行時執(zhí)行前Handler"
+"正在前往執(zhí)行下一個任務項"
);
}else {
//查看當前任務清單類型,如果是那種強一致性的,那就加入失敗隊列,等待重啟
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
//強一致,這個時候,直接進入失敗隊列
entryValue.setTodoItemStateType(TodoItemStateType.Again);
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else if(
todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
){
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
}
return;
}
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
//這個時候由于沒有滿足條件,那么這個時候要執(zhí)行對應的恢復函數(shù)
todoItemBeforeRunningHandler.repair(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
}
}
//執(zhí)行運行時刻代碼
Method wrapperMethod = entryValue.getWrapperMethod();
wrapperMethod.setAccessible(true);
Parameter[] parameters = wrapperMethod.getParameters();
// 構造參數(shù)數(shù)組
Object[] argsArray = new Object[parameters.length];
for (int i = 0; i < parameters.length; i++) {
Parameter parameter = parameters[i];
if (parameter.getType() == ListStateWrapper.class) {
// 設置特定參數(shù)的值
argsArray[i] = todoListExBean.getStateWrapper();
}else if(parameter.getType() == ItemStateWrapper.class){
argsArray[i] = entryValue.getStateWrapper();
}
}
// 執(zhí)行方法
Object result = wrapperMethod.invoke(entryValue, argsArray);
//執(zhí)行后置處理,這個執(zhí)行流程和前置是一樣的
if (entryValue.getTodoItemAfterRunningHandler()!=null){
TodoItemAfterRunningHandler todoItemAfterRunningHandler = entryValue.getTodoItemAfterRunningHandler();
boolean concierge = todoItemAfterRunningHandler.concierge(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
if(!concierge){
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
System.err.println("任務清單:"+todoListExBean.getTodoListName()
+"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
"方法:"+entryValue.getWrapperMethod().getName()+"未通過運行時執(zhí)行前Handler"
+"正在前往執(zhí)行下一個任務項"
);
}else {
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
//強一致,這個時候,直接進入失敗隊列
entryValue.setTodoItemStateType(TodoItemStateType.Again);
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else if(
todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
){
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
}
return;
}
//這個時候由于沒有滿足條件,那么這個時候要執(zhí)行對應的恢復函數(shù)
todoItemAfterRunningHandler.repair(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
}
}
} catch (Exception e) {
e.printStackTrace();
//對錯誤進行處理
entryValue.setTodoItemStateType(TodoItemStateType.Error);
if(entryValue.getTodoItemErrorHandler()!=null){
try {
TodoItemErrorHandler todoItemErrorHandler = entryValue.getTodoItemErrorHandler();
todoItemErrorHandler.concierge(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
}catch (Exception e1){
e1.printStackTrace();
//如果這個都執(zhí)行失敗了,那真的沒救了
//加入失敗列表看看了,只能,如果是一定要執(zhí)行的話
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else {
System.err.println("任務強制終止");
}
return;
}
}
}
//此時這個任務清單的小項目才算執(zhí)行正常
entryValue.setTodoItemStateType(TodoItemStateType.Fine);
}
//清單項目是執(zhí)行完畢了,那么接下來是這個清單的后置處理部分
if(todoListExBean.getTodoListAfterHandler()!=null) {
TodoListAfterHandler todoListAfterHandler = todoListExBean.getTodoListAfterHandler();
try {
boolean concierge = todoListAfterHandler.concierge(todoListExBean.getStateWrapper());
if(!concierge) {
todoListExBean.setTodoListStateType(TodoListStateType.Repairing);
//這個時候由于沒有滿足條件,那么這個時候要執(zhí)行對應的恢復函數(shù)
todoListAfterHandler.repair(
todoListExBean.getStateWrapper()
);
}
} catch (Exception e) {
e.printStackTrace();
todoListExBean.setTodoListStateType(TodoListStateType.Error);
//對錯誤進行處理
if(todoListExBean.getTodoListErrorHandler()!=null){
try {
TodoListErrorHandler todoListErrorHandler = todoListExBean.getTodoListErrorHandler();
todoListErrorHandler.concierge(
todoListExBean.getStateWrapper()
);
}catch (Exception e1){
e1.printStackTrace();
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
todoListExBean.setTodoListStateType(TodoListStateType.Error);
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else {
System.err.println("任務強制終止");
}
}
}
}
}
todoListExBean.setTodoListStateType(TodoListStateType.Fine);
}
那么這里的核心代碼其實就是這一塊兒。當然這個家伙的實現(xiàn)還有一部分,關于這個狀態(tài)恢復的。文章來源:http://www.zghlxwxcb.cn/news/detail-641438.html
總結
那么這篇博文就到這里,今天還有一篇,這里再重復一次(第二次了),我們的項目地址是:https://gitee.com/Huterox/htodo-sechudling文章來源地址http://www.zghlxwxcb.cn/news/detail-641438.html
到了這里,關于造個輪子-任務調度執(zhí)行小框架-任務清單執(zhí)行器實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!