背景
我們希望實現(xiàn)全鏈路信息,但是代碼中一般都會異步的線程處理。
解決思路
我們可以對以前的 Runable 和 Callable 進(jìn)行增強。
可以使用 ali 已經(jīng)存在的實現(xiàn)方式。
TransmittableThreadLocal (TTL) 解決異步執(zhí)行時上下文傳遞的問題
核心的實現(xiàn)思路如下:
1)異步執(zhí)行前,把當(dāng)前線程的 MDC 信息放入執(zhí)行對象中。
2)異步執(zhí)行時,把執(zhí)行對象中的信息放入 MDC 等信息。
3) 異步執(zhí)行后,清空執(zhí)行對象。
問題
Runable 和 Callable 只是接口,沒有額外信息,所以需要進(jìn)行增強。
實現(xiàn)方式
接口定義
package com.github.houbb.heaven.support.concurrent.context;
import java.util.Map;
/**
* 跨線程處理類
*
* @since 0.3.0
*/
public interface CrossThreadProcessor {
/**
* 初始化上下文
* @param contextMap 上下文
*/
void initContext(Map<String, Object> contextMap);
/**
* 執(zhí)行之前
* @param contextMap 上下文
*/
void beforeExecute(Map<String, Object> contextMap);
/**
* 執(zhí)行之后
* @param contextMap 上下文
*/
void afterExecute(Map<String, Object> contextMap);
}
對可執(zhí)行接口進(jìn)行增強
package com.github.houbb.heaven.support.concurrent.context;
import com.github.houbb.heaven.util.lang.SpiUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* 跨線程處理
*
* 作用:用來跨線程處理傳遞信息,比如 async,線程池等。
*
* 比如在 aop 中,直接處理。
*
* <pre>
* Object[] args = point.args();
* Object arg0 = args[0];
*
* // 直接轉(zhuǎn)換為當(dāng)前的對象
* if(arg0 instanceOf Runnable) {
* args[0] = new CrossThreadWrapper((Runnable)arg0);
* } else if(arg0 instanceOf Callable) {
* args[0] = new CrossThreadWrapper((Callable)arg0);
* }
*
* // 繼續(xù)處理
* </pre>
* @param <T> 泛型
* @since 0.3.0
*/
public class CrossThreadWrapper<T> implements Runnable, Callable<T> {
private Runnable runnable;
private Callable<T> callable;
/**
* 通過 spi 獲取所有的實現(xiàn)類
*/
private static List<CrossThreadProcessor> processorList = new ArrayList<>();
/**
* 上下文
*/
private final Map<String, Object> context = new HashMap<>();
static {
processorList = SpiUtil.getClassImplList(CrossThreadProcessor.class);
}
public CrossThreadWrapper(Runnable runnable) {
// 任務(wù)執(zhí)行之前
this.initContext();
this.runnable = runnable;
}
public CrossThreadWrapper(Callable<T> callable) {
this.initContext();
this.callable = callable;
}
@Override
public void run() {
try {
beforeExecute();
this.runnable.run();
} finally {
afterExecute();
}
}
@Override
public T call() throws Exception {
try {
beforeExecute();
return this.callable.call();
} finally {
afterExecute();
}
}
/**
* 初始化上下文
*/
protected void initContext() {
for(CrossThreadProcessor processor : processorList) {
processor.initContext(context);
}
}
/**
* 執(zhí)行前
*/
protected void beforeExecute() {
for(CrossThreadProcessor processor : processorList) {
processor.beforeExecute(context);
}
}
/**
* 執(zhí)行之后
*/
protected void afterExecute() {
for(CrossThreadProcessor processor : processorList) {
processor.afterExecute(context);
}
}
}
用法
實現(xiàn)接口
我們只需要實現(xiàn) CrossThreadProcessor 接口。
然后 spi 中配置,服務(wù)會自動發(fā)現(xiàn)。文章來源:http://www.zghlxwxcb.cn/news/detail-724905.html
aop
可以在 spring aop 中,對以前的方法執(zhí)行進(jìn)行增強。文章來源地址http://www.zghlxwxcb.cn/news/detail-724905.html
到了這里,關(guān)于分布式鏈路追蹤如何跨線程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!