SpringCloud GateWay+RocketMQ實(shí)現(xiàn)API訪問(wèn)日志收集
需求背景
產(chǎn)品經(jīng)理突然找到我說(shuō),咱們這個(gè)產(chǎn)品貌似沒(méi)有實(shí)現(xiàn)之前舊的系統(tǒng)平臺(tái)操作日志了;希望我盡快實(shí)現(xiàn)這個(gè)需求,以應(yīng)對(duì)一些檢查;因?yàn)闀r(shí)間關(guān)系再加上人員問(wèn)題,跟我原先規(guī)劃得有些背道而馳
草擬方案
1.寫一個(gè)AOP日志Starter,再需要的模塊中引入,對(duì)應(yīng)方法去標(biāo)記注解,工程量比較大,目前所有的模塊的都得逐步去添加,個(gè)人比較懶,因此該方案?jìng)溥x
2. 在網(wǎng)關(guān)層通過(guò)全局?jǐn)r截器Filter攔截所有請(qǐng)求,通過(guò)MQ記錄日志,再通過(guò)監(jiān)聽(tīng)MQ實(shí)現(xiàn)日志入庫(kù),因?yàn)樵鹊募軜?gòu)已經(jīng)有MQ了,所以覺(jué)得這種方案更快捷,因?yàn)閿]起袖子往下干
具體實(shí)現(xiàn)(推薦使用方式1)
之前一直在看如何去獲取請(qǐng)求體;各種區(qū)分MediaType跟Method對(duì)應(yīng)不同的讀取方式,解析重新構(gòu)建請(qǐng)求往下游傳遞,中間出現(xiàn)了各種問(wèn)題;沒(méi)有解決的一個(gè)情況是有幾個(gè)接口都是base64圖片傳參的,早前的通過(guò)BodyInserter 去重新構(gòu)建請(qǐng)求體跟獲取響應(yīng)體,遇到這幾個(gè)接口都會(huì)出現(xiàn)報(bào)錯(cuò)
private Mono<Void> writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
.flatMap(body -> {
gatewayLog.setRequestBody(body);
return Mono.just(body);
});
// 通過(guò) BodyInserter 插入 body, 避免 request body 只能獲取一次
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
// 重新封裝請(qǐng)求
ServerHttpRequest decoratedRequest = requestDecorate(exchange,headers,outputMessage);
// 處理響應(yīng)日志
ServerHttpResponseDecorator decoratedResponse =recordResponse(exchange,gatewayLog);
return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build()).then(Mono.fromRunnable(() -> { writeAccessLog(gatewayLog);}));}));
}
報(bào)錯(cuò)IllegalReferenceCountException異常(io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1)。具體如下
[TID:N/A] 2023-03-27 11:38:34.767 ERROR 30056 --- [ctor-http-nio-4] r.n.channel.ChannelOperationsHandler : [id: 0x53e73793, L:/192.168.1.53:6868 ! R:/192.168.1.62:56218] Error was received while reading the incoming data. The connection will be closed.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.51.Final.jar:4.1.51.Final]
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92) ~[netty-codec-http-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:344) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:487) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.51.Final.jar:4.1.51.Fina
造成這個(gè)問(wèn)題的點(diǎn)有一種說(shuō)法:DataBufferUtils.release(buffer)在低版本spring-core下是有問(wèn)題,詳見(jiàn):https://github.com/spring-projects/spring-framework/issues/26060;如果依賴的spring-cloud-starter-gateway版本較低,可以單獨(dú)升spring-core的版本spring-core升級(jí)為5.2.13.RELEASE及以上【本人嘗試后還是報(bào)錯(cuò)但不是上面的錯(cuò)誤了,沒(méi)再去定位】
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.2.13.RELEASE</version>
</dependency>
后來(lái)更改了requeset.getBody()的方式也可以完成日志實(shí)現(xiàn),完整代碼如下:
@Component
@Slf4j
@RequiredArgsConstructor
public class GatewayLogFilterBak230329 implements GlobalFilter, Ordered {
private final ApplicationEventPublisher applicationEventPublisher;
private static final String CONTENT_TYPE = "application/json";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 請(qǐng)求路徑
String requestPath = request.getPath().pathWithinApplication().value();
// 獲取路由信息
Route route = getGatewayRoute(exchange);
String ipAddress = IpUtils.getIp(request);
GatewayLog gatewayLog = new GatewayLog();
gatewayLog.setDevice(IpUtils.getServerDevices(request));
gatewayLog.setProtocol(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestPath(requestPath);
gatewayLog.setTargetServer(route.getUri().toString());
gatewayLog.setStartTime(new Date().getTime());
gatewayLog.setIp(ipAddress);
Map<String, Object> headers = new HashMap<>();
for (String key : request.getHeaders().keySet()) {
headers.put(key, request.getHeaders().getFirst(key));
}
gatewayLog.setHeaders(JSON.toJSONString(headers));
MediaType mediaType = request.getHeaders().getContentType();
if (request.getHeaders().getContentType() != null) {
gatewayLog.setRequestContentType(request.getHeaders().getContentType().toString());
}
if (mediaType != null && (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType))) {
return writeBodyLog(exchange, chain, gatewayLog);
} else {
return writeBasicLog(exchange, chain, gatewayLog);
}
}
@Override
public int getOrder() {
// 過(guò)濾器鏈路上的排序要在NettyWriteResponseFilter(這個(gè)攔截器默認(rèn)是-1)之前
return -2;
}
/**
* 獲取路由信息
*
* @param exchange
* @return
*/
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
private Mono<Void> writeBasicLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
gatewayLog.setRequestBody(getUrlParamsByMap(queryParams));
//獲取響應(yīng)體
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(gatewayLog);
}));
}
/**
* 解決 request body 只能讀取一次問(wèn)題,
* 參考: org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory
*
* @param exchange
* @param chain
* @param gatewayLog
* @return
*/
private Mono<Void> writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
ServerHttpRequest request = exchange.getRequest();
return DataBufferUtils.join(request.getBody())
.flatMap(d -> Mono.just(Optional.of(d))).defaultIfEmpty(Optional.empty())
.flatMap(optional -> {
try {
URI uri = request.getURI();
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
byte[] bodyBytes = null;
if (optional.isPresent()) {
byte[] oldBytes = new byte[optional.get().readableByteCount()];
optional.get().read(oldBytes);
bodyBytes = oldBytes;
}
// 無(wú)Body請(qǐng)求重寫
if (ArrayUtils.isEmpty(bodyBytes)) {
return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(request.mutate().uri(uri).build()) {
@Override
public HttpHeaders getHeaders() {
return headers;
}
}).response(recordResponseLog(exchange, gatewayLog)).build());
}
String body = new String(bodyBytes, StandardCharsets.UTF_8);
gatewayLog.setRequestBody(body);
final byte[] finalBodyBytes = bodyBytes;
return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(request.mutate().uri(uri).build()) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(finalBodyBytes);
DataBufferUtils.retain(buffer);
return Flux.just(buffer);
}
@Override
public HttpHeaders getHeaders() {
return headers;
}
}).response(recordResponseLog(exchange, gatewayLog)).build()).then(Mono.fromRunnable(() -> {
writeAccessLog(gatewayLog);
}));
} catch (Exception ex) {
return chain.filter(exchange);
} finally {
if (optional.isPresent()) {
DataBufferUtils.release(optional.get());
}
}
});
}
/**
* 打印日志
*
* @param gatewayLog 網(wǎng)關(guān)日志
*/
private void writeAccessLog(GatewayLog gatewayLog) {
applicationEventPublisher.publishEvent(new GatewayLogEvent(this, gatewayLog));
}
/**
* 記錄響應(yīng)日志
* 通過(guò) DataBufferFactory 解決響應(yīng)體分段傳輸問(wèn)題。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Date responseTime = new Date();
gatewayLog.setEndTime(responseTime.getTime());
// 計(jì)算執(zhí)行時(shí)間
long executeTime = (responseTime.getTime() - gatewayLog.getStartTime());
gatewayLog.setExecuteTime(executeTime);
gatewayLog.setStatus(response.getStatusCode().value() == 200 ? "成功" : "失敗");
// 獲取響應(yīng)類型,如果是 json 就打印
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (Objects.equals(this.getStatusCode(), HttpStatus.OK)
&& StringUtils.isNotBlank(originalResponseContentType)
&& originalResponseContentType.contains(CONTENT_TYPE)) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多個(gè)流集合,解決返回體分段傳輸
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 釋放掉內(nèi)存
DataBufferUtils.release(join);
String responseResult = new String(content, StandardCharsets.UTF_8);
gatewayLog.setResponseData(responseResult);
return bufferFactory.wrap(content);
}));
}
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
/**
* 將map參數(shù)轉(zhuǎn)換成url參數(shù)
*
* @param map
* @return
*/
private String getUrlParamsByMap(MultiValueMap<String, String> map) {
if (ObjectUtils.isEmpty(map)) {
return "";
}
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue().get(0));
sb.append("&");
}
String s = sb.toString();
if (s.endsWith("&")) {
s = StringUtils.substringBeforeLast(s, "&");
}
return s;
}
}
后來(lái)也參考了蠻多大牛文章,自己也去找了很久gateway源代碼,終于在這個(gè)工具類ServerWebExchangeUtils中發(fā)現(xiàn)有更好實(shí)現(xiàn)的點(diǎn),cacheRequestBody()這個(gè)方法,英文注釋大概是說(shuō)可以緩存請(qǐng)求正文到這個(gè)屬性中;后續(xù)可以通過(guò)獲取屬性的方式獲取到請(qǐng)求正文;拿著這個(gè)方法去百度果然有人也是這么解決請(qǐng)求體的問(wèn)題,直接上代碼。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-665352.html
方式一:通過(guò)兩個(gè)攔截器終于實(shí)現(xiàn)了日志記錄
@Slf4j
@Component
// 頂級(jí)過(guò)濾器用來(lái)緩存請(qǐng)求正文
public class CacheGlobalRequestBodyFilter implements Ordered, GatewayFilter, GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return ServerWebExchangeUtils
.cacheRequestBody(
exchange,
(request) -> chain.filter(
exchange.mutate().request(request).build()));
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class GatewayLogFilter implements GlobalFilter, Ordered {
private final ApplicationEventPublisher applicationEventPublisher;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 請(qǐng)求路徑
String requestPath = request.getPath().pathWithinApplication().value();
// 獲取路由信息
Route route = getGatewayRoute(exchange);
String ipAddress = IpUtils.getIp(request);
GatewayLog gatewayLog = new GatewayLog();
gatewayLog.setDevice(IpUtils.getServerDevices(request));
gatewayLog.setProtocol(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestPath(requestPath);
gatewayLog.setTargetServer(route.getUri().toString());
gatewayLog.setStartTime(new Date().getTime());
gatewayLog.setIp(ipAddress);
Map<String, Object> headers = new HashMap<>();
for (String key : request.getHeaders().keySet()) {
headers.put(key, request.getHeaders().getFirst(key));
}
gatewayLog.setHeaders(JSON.toJSONString(headers));
if (request.getHeaders().getContentType() != null) {
gatewayLog.setRequestContentType(request.getHeaders().getContentType().toString());
}
// GatewayUtil.getRequestBodyContent(exchange)這里實(shí)際上就是一個(gè)獲取 exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR)屬性而已
gatewayLog.setRequestBody(GatewayUtil.getRequestBodyContent(exchange));
//獲取響應(yīng)體
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(gatewayLog);
}));
}
@Override
public int getOrder() {
return 0;
}
/**
* 獲取路由信息
*
* @param exchange
* @return
*/
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
/**
* 打印日志
*
* @param gatewayLog 網(wǎng)關(guān)日志
*/
private void writeAccessLog(GatewayLog gatewayLog) {
applicationEventPublisher.publishEvent(new GatewayLogEvent(this, gatewayLog));
}
/**
* 記錄響應(yīng)日志
* 通過(guò) DataBufferFactory 解決響應(yīng)體分段傳輸問(wèn)題。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Date responseTime = new Date();
gatewayLog.setEndTime(responseTime.getTime());
// 執(zhí)行時(shí)間
long executeTime = (responseTime.getTime() - gatewayLog.getStartTime());
gatewayLog.setExecuteTime(executeTime);
gatewayLog.setStatus(response.getStatusCode().value() == 200 ? "成功" : "失敗");
// 獲取響應(yīng)類型json
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (Objects.equals(this.getStatusCode(), HttpStatus.OK) && StringUtils.isNotBlank(originalResponseContentType)
&& originalResponseContentType.contains(MediaType.APPLICATION_JSON_VALUE)) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多個(gè)流集合,解決返回體分段傳輸
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 釋放掉內(nèi)存
DataBufferUtils.release(join);
String responseResult = new String(content, StandardCharsets.UTF_8);
gatewayLog.setResponseData(responseResult);
return bufferFactory.wrap(content);
}));
}
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
}
方式二 通過(guò)AdaptCachedBodyGlobalFilter實(shí)現(xiàn)請(qǐng)求體緩存
@Component
@RequiredArgsConstructor
public class GatewayCommonConfig{
private final GatewayProperties gatewayProperties;
private final ApplicationContext applicationContext;
@PostConstruct
public void init(){
//發(fā)布對(duì)應(yīng)路由的EnableBodyCachingEvent事件
gatewayProperties.getRoutes().forEach(e->{
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), e.getId());
//發(fā)布事件
applicationContext.publishEvent(enableBodyCachingEvent);
});
}
}
然后 就可以在自定義的攔截器中通過(guò)request.getBody()獲取請(qǐng)求體了文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-665352.html
方式一參考鏈接:
方式一參考鏈接:
方式二參考鏈接:
到了這里,關(guān)于SpringCloud GateWay網(wǎng)關(guān)通過(guò)全局?jǐn)r截器GlobalFilter實(shí)現(xiàn)API日志的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!