国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

記一次kafka消息積壓的排查

這篇具有很好參考價(jià)值的文章主要介紹了記一次kafka消息積壓的排查。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

kafka消息積壓報(bào)警,首先進(jìn)行了自查,這個現(xiàn)象頻頻出現(xiàn),之前每次都是先重新分配分區(qū)或者回溯(消息可丟棄防止大量積壓消費(fèi)跟不上)。
根據(jù)手冊首先排查下消息拉取是否正常,看到了消息拉取線程是waiting狀態(tài),然后看到kafka這塊邏輯是消費(fèi)線程阻塞了拉取線程。

對比了其他消費(fèi)者,消費(fèi)線程都是在runing和waiting中切換,但是當(dāng)前消費(fèi)者的消費(fèi)狀態(tài)一直處于runing,阻塞了消息拉取線程。

問題定位成功,然后去看了線程的棧信息,發(fā)現(xiàn)是里面的邏輯卡在了socket.read,當(dāng)即想到了socket的超時(shí),去看了代碼邏輯,是httpclinet,果然沒有設(shè)置超時(shí)時(shí)間。

按照定義解釋為如果sockettimeout設(shè)置為0的話,應(yīng)該是等待無限長的時(shí)間(直到進(jìn)程重啟),這里有個老哥用個更詳細(xì)的排查https://cloud.tencent.com/developer/news/698654。
所以解決方案就是在請求是設(shè)置一下:
使用的是fluent api

import org.apache.http.client.fluent.Request;
 Request request = Request.Post(uri).connectTimeout(1000).socketTimeout(1000);
            String response = request.execute().returnContent().asString();

后面考慮到這個請求量比較大,可能會影響交易流程(這次的問題查詢是一個同步信息接口),因此決定不使用公共連接池,寫法如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-843087.html

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;

/**
 * @version 1.0
 */
public class HttpFluentUtil {
    private Logger logger = LoggerFactory.getLogger(HttpFluentUtil.class);
    private final static int MaxPerRoute = 100;
    private final static int MaxTotal = 200;
    final static PoolingHttpClientConnectionManager CONNMGR;
    final static HttpClient CLIENT;
    final static Executor executor;

    static {
        LayeredConnectionSocketFactory ssl = null;
        try {
            ssl = SSLConnectionSocketFactory.getSystemSocketFactory();
        } catch (final SSLInitializationException ex) {
            final SSLContext sslcontext;
            try {
                sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
                sslcontext.init(null, null, null);
                ssl = new SSLConnectionSocketFactory(sslcontext);
            } catch (final SecurityException ignore) {
            } catch (final KeyManagementException ignore) {
            } catch (final NoSuchAlgorithmException ignore) {
            }
        }

        final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("http", PlainConnectionSocketFactory.getSocketFactory())
                .register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory()).build();

        CONNMGR = new PoolingHttpClientConnectionManager(sfr);
        CONNMGR.setDefaultMaxPerRoute(MaxPerRoute);
        CONNMGR.setMaxTotal(MaxTotal);
        CLIENT = HttpClientBuilder.create().setConnectionManager(CONNMGR).build();
        executor = Executor.newInstance(CLIENT);
    }

    public static String Get(String uri, int connectTimeout, int socketTimeout) throws IOException {
        return executor.execute(Request.Get(uri).connectTimeout(connectTimeout).socketTimeout(socketTimeout))
                .returnContent().asString();
    }

    public static String Post(String uri, int connectTimeout, int socketTimeout)
            throws IOException {
        return executor.execute(Request.Post(uri).socketTimeout(socketTimeout)
                ).returnContent().asString();
    }
}

到了這里,關(guān)于記一次kafka消息積壓的排查的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    01. 創(chuàng)建消費(fèi)者 在讀取消息之前,需要先創(chuàng)建一個KafkaConsumer對象。創(chuàng)建KafkaConsumer對象與創(chuàng)建KafkaProducer對象非常相似——把想要傳給消費(fèi)者的屬性放在Properties對象里。 為簡單起見,這里只提供4個必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進(jìn)行的,這樣每個節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(28)
  • 【新星計(jì)劃】Kafka分布式發(fā)布訂閱消息系統(tǒng)

    【新星計(jì)劃】Kafka分布式發(fā)布訂閱消息系統(tǒng)

    ? 目錄 Kafka分布式發(fā)布訂閱消息系統(tǒng) 1. 概述 1.1 點(diǎn)對點(diǎn)消息傳遞模式 1.2 發(fā)布-訂閱消息傳遞模式 1.3 Kafka特點(diǎn) 1.4 kafka拓?fù)鋱D 2. Kafka工作原理 2.1 Kafka核心組件介紹 2.2 Kafka工作流程分析 2.2.1 生產(chǎn)者生產(chǎn)消息過程 2.2.2 消費(fèi)者消費(fèi)消息過程 2.2.3 Kafka Topics 2.2.4 Kafka Partition 2.2.4 Kafka

    2024年02月08日
    瀏覽(27)
  • 分布式消息隊(duì)列Kafka(四)- 消費(fèi)者

    分布式消息隊(duì)列Kafka(四)- 消費(fèi)者

    1.Kafka消費(fèi)方式 2.Kafka消費(fèi)者工作流程 (1)總體工作流程 (2)消費(fèi)者組工作流程 3.消費(fèi)者API (1)單個消費(fèi)者消費(fèi) 實(shí)現(xiàn)代碼 (2)單個消費(fèi)者指定分區(qū)消費(fèi) 代碼實(shí)現(xiàn): (3)消費(fèi)者組消費(fèi) 復(fù)制上面CustomConsumer三個,同時(shí)去訂閱統(tǒng)一個主題,消費(fèi)數(shù)據(jù),發(fā)現(xiàn)一個分區(qū)只能被一個

    2023年04月26日
    瀏覽(33)
  • 分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者的消費(fèi)位移

    分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者的消費(fèi)位移

    01. Kafka 分區(qū)位移 對于Kafka中的分區(qū)而言,它的每條消息都有唯一的offset,用來表示消息在分區(qū)中對應(yīng)的位置。偏移量從0開始,每個新消息的偏移量比前一個消息的偏移量大1。 每條消息在分區(qū)中的位置信息由一個叫位移(Offset)的數(shù)據(jù)來表征。分區(qū)位移總是從 0 開始,假設(shè)一

    2024年02月12日
    瀏覽(27)
  • 分布式應(yīng)用之zookeeper集群+消息隊(duì)列Kafka

    分布式應(yīng)用之zookeeper集群+消息隊(duì)列Kafka

    ? ? ? ?ZooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個開源的實(shí)現(xiàn),是Hadoop和Hbase的重要組件。它是一個為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。為分布式框架提供協(xié)調(diào)服務(wù)的

    2024年02月06日
    瀏覽(139)
  • zookeeper+kafka分布式消息隊(duì)列集群的部署

    zookeeper+kafka分布式消息隊(duì)列集群的部署

    目錄 一、zookeeper 1.Zookeeper 定義 2.Zookeeper 工作機(jī)制 3.Zookeeper 特點(diǎn) 4.Zookeeper 數(shù)據(jù)結(jié)構(gòu) 5.Zookeeper 應(yīng)用場景 (1)統(tǒng)一命名服務(wù) (2)統(tǒng)一配置管理 (3)統(tǒng)一集群管理 (4)服務(wù)器動態(tài)上下線 6.Zookeeper 選舉機(jī)制 (1)第一次啟動選舉機(jī)制 (2)非第一次啟動選舉機(jī)制 7.部署zookeepe

    2024年02月14日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包