kafka消息積壓報(bào)警,首先進(jìn)行了自查,這個現(xiàn)象頻頻出現(xiàn),之前每次都是先重新分配分區(qū)或者回溯(消息可丟棄防止大量積壓消費(fèi)跟不上)。
根據(jù)手冊首先排查下消息拉取是否正常,看到了消息拉取線程是waiting狀態(tài),然后看到kafka這塊邏輯是消費(fèi)線程阻塞了拉取線程。
對比了其他消費(fèi)者,消費(fèi)線程都是在runing和waiting中切換,但是當(dāng)前消費(fèi)者的消費(fèi)狀態(tài)一直處于runing,阻塞了消息拉取線程。
問題定位成功,然后去看了線程的棧信息,發(fā)現(xiàn)是里面的邏輯卡在了socket.read,當(dāng)即想到了socket的超時(shí),去看了代碼邏輯,是httpclinet,果然沒有設(shè)置超時(shí)時(shí)間。
按照定義解釋為如果sockettimeout設(shè)置為0的話,應(yīng)該是等待無限長的時(shí)間(直到進(jìn)程重啟),這里有個老哥用個更詳細(xì)的排查https://cloud.tencent.com/developer/news/698654。
所以解決方案就是在請求是設(shè)置一下:
使用的是fluent api文章來源:http://www.zghlxwxcb.cn/news/detail-843087.html
import org.apache.http.client.fluent.Request;
Request request = Request.Post(uri).connectTimeout(1000).socketTimeout(1000);
String response = request.execute().returnContent().asString();
后面考慮到這個請求量比較大,可能會影響交易流程(這次的問題查詢是一個同步信息接口),因此決定不使用公共連接池,寫法如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-843087.html
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
/**
* @version 1.0
*/
public class HttpFluentUtil {
private Logger logger = LoggerFactory.getLogger(HttpFluentUtil.class);
private final static int MaxPerRoute = 100;
private final static int MaxTotal = 200;
final static PoolingHttpClientConnectionManager CONNMGR;
final static HttpClient CLIENT;
final static Executor executor;
static {
LayeredConnectionSocketFactory ssl = null;
try {
ssl = SSLConnectionSocketFactory.getSystemSocketFactory();
} catch (final SSLInitializationException ex) {
final SSLContext sslcontext;
try {
sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
sslcontext.init(null, null, null);
ssl = new SSLConnectionSocketFactory(sslcontext);
} catch (final SecurityException ignore) {
} catch (final KeyManagementException ignore) {
} catch (final NoSuchAlgorithmException ignore) {
}
}
final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory()).build();
CONNMGR = new PoolingHttpClientConnectionManager(sfr);
CONNMGR.setDefaultMaxPerRoute(MaxPerRoute);
CONNMGR.setMaxTotal(MaxTotal);
CLIENT = HttpClientBuilder.create().setConnectionManager(CONNMGR).build();
executor = Executor.newInstance(CLIENT);
}
public static String Get(String uri, int connectTimeout, int socketTimeout) throws IOException {
return executor.execute(Request.Get(uri).connectTimeout(connectTimeout).socketTimeout(socketTimeout))
.returnContent().asString();
}
public static String Post(String uri, int connectTimeout, int socketTimeout)
throws IOException {
return executor.execute(Request.Post(uri).socketTimeout(socketTimeout)
).returnContent().asString();
}
}
到了這里,關(guān)于記一次kafka消息積壓的排查的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!