国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

librdkafka的rdk:broker-1線程cpu百分百問題分析

這篇具有很好參考價值的文章主要介紹了librdkafka的rdk:broker-1線程cpu百分百問題分析。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

問題調(diào)用棧:

(gdb) bt
#0  0x000000000068307c in rd_kafka_q_pop_serve (rkq=0x1ff31a0, timeout_ms=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:373
#1  0x0000000000683130 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
#2  0x000000000066abcf in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x1ff28e0, timeout_ms=<optimized out>) at rdkafka_broker.c:2510
#3  0x000000000066ac84 in rd_kafka_broker_serve (rkb=rkb@entry=0x1ff28e0, abs_timeout=abs_timeout@entry=71168163411018) at rdkafka_broker.c:2532
#4  0x000000000066b157 in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x1ff28e0, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2617
#5  0x000000000066c796 in rd_kafka_broker_thread_main (arg=arg@entry=0x1ff28e0) at rdkafka_broker.c:3571
#6  0x00000000006b8d87 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:583
#7  0x00007f80a07e1eb5 in start_thread () from /lib64/libpthread.so.0
#8  0x00007f80a05098fd in clone () from /lib64/libc.so.6

相關(guān)代碼(rd_kafka_q_pop):

rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms,
                               int32_t version) {
	return rd_kafka_q_pop_serve(rkq, timeout_ms, version,
                                    RD_KAFKA_Q_CB_RETURN,
                                    NULL, NULL);
}

通過 gdb 觀察到 timeout_ms 值為 1,也就是 1 毫秒,這是導致 cpu 百分百的原因:

(gdb) f 1
#1  0x0000000000683130 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:399
399     in rdkafka_queue.c
(gdb) info args
rkq = <optimized out>
timeout_ms = <optimized out>
version = 0
(gdb) info reg
rax            0x0      0
rbx            0x0      0
rcx            0x64ab3c08       1688943624
rdx            0x7f7f537b4240   140184838160960
rsi            0x1ff31a0        33501600
rdi            0x1ff31c8        33501640
rbp            0x1ff28e0        0x1ff28e0
rsp            0x7f7f537b4288   0x7f7f537b4288
r8             0x7      7
r9             0x4748105a       1195905114
r10            0x7b     123
r11            0x1da063a5132567 8339124156310887
r12            0x7f7f537b42f0   140184838161136
r13            0x40ba2119744a   71168163411018
r14            0x0      0
r15            0x4      4
rip            0x683130 0x683130 <rd_kafka_q_serve>
eflags         0x246    [ PF ZF IF ]
cs             0x33     51
ss             0x2b     43
ds             0x0      0
es             0x0      0
fs             0x0      0
gs             0x0      0
(gdb) p *(rd_kafka_q_t*)0x1ff31c8
$1 = {rkq_lock = {__data = {__lock = 0, __count = 4289904, __owner = 2144952, __nusers = 0, __kind = 2144952, __spins = 0, __elision = 0, __list = {__prev = 0x20bab8, 
        __next = 0x1ff31a0}}, 
    __size = "\000\000\000\000puA\000\270\272 \000\000\000\000\000\270\272 \000\000\000\000\000\270\272 \000\000\000\000\000\240\061\377\001\000\000\000", 
    __align = 18424997382979584}, rkq_cond = {__data = {__lock = 0, __futex = 0, __total_seq = 0, __wakeup_seq = 0, __woken_seq = 33501696, __mutex = 0x0, __nwaiters = 0, 
      __broadcast_seq = 0}, __size = '\000' <repeats 25 times>, "\062\377\001", '\000' <repeats 19 times>, __align = 0}, rkq_fwdq = 0x300000001, rkq_q = {
    tqh_first = 0x1ff1930, tqh_last = 0x0}, rkq_qlen = 0, rkq_qsize = 0, rkq_refcnt = 9087760, rkq_flags = 0, rkq_rk = 0x0, rkq_qio = 0x261, rkq_serve = 0x23, 
  rkq_opaque = 0x0, rkq_name = 0x1 <Address 0x1 out of bounds>}
(gdb) p *(int*)0x1ff31a0
$2 = 1

繼續(xù)跟蹤,問題發(fā)生在函數(shù) cnd_timedwait_abs:

(gdb) b cnd_timedwait_abs
Breakpoint 1 at 0x6b92c0: file tinycthread_extra.c, line 95.
(gdb) c
Continuing.

Breakpoint 1, cnd_timedwait_abs (cnd=cnd@entry=0x1ff31c8, mtx=mtx@entry=0x1ff31a0, tspec=tspec@entry=0x7f7f537b4240) at tinycthread_extra.c:95
95      tinycthread_extra.c: No such file or directory.
(gdb) p *tspec
$3 = {tv_sec = 1688943624, tv_nsec = 1000000000}

函數(shù) cnd_timedwait_abs 源碼:

int cnd_timedwait_abs (cnd_t *cnd, mtx_t *mtx, const struct timespec *tspec) {
        if (tspec->tv_sec == RD_POLL_INFINITE)
                return cnd_wait(cnd, mtx);
        else if (tspec->tv_sec == RD_POLL_NOWAIT)
                return thrd_timedout;

        return cnd_timedwait(cnd, mtx, tspec); // 走這里來了
}

底層調(diào)用的是 Posix 的 pthread_cond_timedwait 函數(shù):

#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
      pthread_mutex_t *restrict mutex,
      const struct timespec *restrict abstime);

函數(shù) pthread_cond_timedwait 的參數(shù) abstime 是個絕對時間,不是相對時間。初始化如下:

rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
                                     int32_t version,
                                     rd_kafka_q_cb_type_t cb_type,
                                     rd_kafka_q_serve_cb_t *callback,
                                     void *opaque) {
    struct timespec timeout_tspec;

    rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
    
    if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, &timeout_tspec) == thrd_timedout) {
        mtx_unlock(&rkq->rkq_lock);
    }
}

static RD_INLINE void rd_timeout_init_timespec (struct timespec *tspec, int timeout_ms) {
    if (timeout_ms == RD_POLL_INFINITE ||
        timeout_ms == RD_POLL_NOWAIT) {
        tspec->tv_sec = timeout_ms;
        tspec->tv_nsec = 0;
    } else {
        timespec_get(tspec, TIME_UTC); // 這里
        tspec->tv_sec  += timeout_ms / 1000;
        tspec->tv_nsec += (timeout_ms % 1000) * 1000000;
        if (tspec->tv_nsec > 1000000000) {
            tspec->tv_nsec -= 1000000000;
            tspec->tv_sec++;
        }
    }
}

函數(shù) timespec_get:

/* If TIME_UTC is missing, provide it and provide a wrapper for
   timespec_get. */
#ifndef TIME_UTC
#define TIME_UTC 1
#define _TTHREAD_EMULATE_TIMESPEC_GET_

int _tthread_timespec_get(struct timespec *ts, int base);
#define timespec_get _tthread_timespec_get
#endif

#if defined(_TTHREAD_EMULATE_TIMESPEC_GET_)
int _tthread_timespec_get(struct timespec *ts, int base)
{
#if defined(_TTHREAD_WIN32_)
  struct _timeb tb;
#elif !defined(CLOCK_REALTIME)
  struct timeval tv;
#endif

  if (base != TIME_UTC) // 約束為 UTC,即世界統(tǒng)一時間
  {
    return 0;
  }

#if defined(_TTHREAD_WIN32_)
  _ftime_s(&tb);
  ts->tv_sec = (time_t)tb.time;
  ts->tv_nsec = 1000000L * (long)tb.millitm;
#elif defined(CLOCK_REALTIME)
  base = (clock_gettime(CLOCK_REALTIME, ts) == 0) ? base : 0;
#else
  gettimeofday(&tv, NULL);
  ts->tv_sec = (time_t)tv.tv_sec;
  ts->tv_nsec = 1000L * (long)tv.tv_usec;
#endif

  return base;
}
#endif /* _TTHREAD_EMULATE_TIMESPEC_GET_ */

回過頭看函數(shù) cnd_timedwait_abs 的參數(shù) tspec 的值:

(gdb) p *tspec
$3 = {tv_sec = 1688943624, tv_nsec = 1000000000}

將 tv_sec 轉(zhuǎn)為可讀的值:

時間戳(秒)	1688943624
ISO 8601	2023-07-09T23:00:24.000Z
日期時間(UTC)	2023-07-09 23:00:24
日期時間(本地)	2023-07-10 07:00:24

而本地的實際時間為:

# date +'%Y-%m-%d %H:%M:%S'
2023-07-13 16:15:27

# date +'%s'
1689236277

# expr 1689236277 - 1688943624
292653
# expr 292653 / 3600
81

很明顯 tspec 不對,。

準備進一步分析時,遇到 gdb 的 bug 了:

(gdb) b gettimeofday
Breakpoint 1 at 0x7fddd8ba9650 (2 locations)
(gdb) b clock_gettime
Breakpoint 2 at gnu-indirect-function resolver at 0x7fddd8c08800 (3 locations)
(gdb) c
Continuing.
../../gdb/elfread.c:1052: internal-error: elf_gnu_ifunc_resolver_return_stop: Assertion `b->loc->next == NULL' failed.
A problem internal to GDB has been detected,
further debugging may prove unreliable.
Quit this debugging session? (y or n)

進一步驗證是發(fā)生在 clock_gettime,單獨斷點 gettimeofday 沒有問題,而且不會進入 gettimeofday。

$ man clock_gettime

clock_getres(), clock_gettime(), clock_settime():
      _POSIX_C_SOURCE >= 199309L
      
#  ifdef __USE_POSIX199309
/* Identifier for system-wide realtime clock.  */
#   define CLOCK_REALTIME

找了個正常的,查看:

(gdb) p timeout_tspec
$2 = {tv_sec = 1689237800, tv_nsec = 687268217}

# expr 1689237800 - 1688943624
294176
# expr 294176 / 3600
81

暫時懷疑 clock_gettime 調(diào)用出問題了,實現(xiàn)在最新的代碼中沒有變化:https://github.com/confluentinc/librdkafka/blob/master/src/tinycthread.c。文章來源地址http://www.zghlxwxcb.cn/news/detail-562385.html

到了這里,關(guān)于librdkafka的rdk:broker-1線程cpu百分百問題分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka【問題 03】Connection to node -1 (/IP:9092) could not be established. Broker may not be available.

    此問題僅出現(xiàn)在云服務(wù)器上,非云服務(wù)器未出現(xiàn)過一下報錯: 非云服務(wù)器: 云服務(wù)器: 云服務(wù)器有兩個IP,監(jiān)聽IP為云服務(wù)器IP,而advertised監(jiān)聽IP為云服務(wù)器的外網(wǎng)IP。

    2024年02月05日
    瀏覽(22)
  • kafka入門,Kafka Broker工作流程、Broker重要參數(shù)(十一)

    kafka入門,Kafka Broker工作流程、Broker重要參數(shù)(十一)

    在zookeeper的服務(wù)端存儲的Kafka相關(guān)信息 1)/kafka/brokers/ids [0,1,2] 記錄有哪些服務(wù)器 2)/kafka/brokers/topics/first/partitions/0/state 記錄誰是leader,有哪些服務(wù)器可用 3)/kafka/controller 輔助選舉leader 1)broker啟動后在zk中注冊 2)controller誰先注冊,誰說了算 3)由選舉出來的Controller監(jiān)聽bro

    2024年02月11日
    瀏覽(59)
  • Kafka 實戰(zhàn) - Kafka Broker工作流程

    Apache Kafka Broker 在 Kafka 集群中扮演著核心角色,負責接收、存儲、復(fù)制及分發(fā)消息。以下是 Kafka Broker 的工作流程概覽: 1. 啟動與初始化 加載配置 :Kafka Broker 從 server.properties 文件加載配置參數(shù),包括 Broker ID、監(jiān)聽地址、日志目錄、ZooKeeper 連接信息等。 注冊到 ZooKeeper :

    2024年04月15日
    瀏覽(19)
  • 深入Kafka broker

    深入Kafka broker

    顆粒度, PRODUCE和FETCH中支持topic,partion等層級的顆粒度; 測試友好, 基于session_id和epoch確定一條拉取鏈路的fetch session; 全量增量結(jié)合, FetchRequest中的全量拉取和增量拉取; 基本結(jié)構(gòu): header+body。 常見header: api_key, api_version, corelation_id, client_id。與網(wǎng)絡(luò)協(xié)議類似, Kafka本身的協(xié)議也是分

    2024年01月22日
    瀏覽(18)
  • 「Kafka」Broker篇

    「Kafka」Broker篇

    主要講解的是在 Kafka 中是怎么存儲數(shù)據(jù)的,以及 Kafka 和 Zookeeper 之間如何進行數(shù)據(jù)溝通的。 Zookeeper 存儲的 Kafka 信息 啟動 Zookeeper 客戶端: 通過 ls 命令可以查看 kafka 相關(guān)信息: Kafka Broker 總體工作流程 模擬 Kafka 上下線,Zookeeper 中數(shù)據(jù)變化: 查看 /kafka/brokers/ids 路徑上的節(jié)

    2024年01月18日
    瀏覽(22)
  • 【kafka】——Broker

    【kafka】——Broker

    1 /kafka/brokers/dis 存儲broker的id,記錄有哪些服務(wù)器 2 /kafka/brokers/topics 存儲topic 相關(guān)信息 3 /kafka/consumers Kafka 0.9 版本之前 用于保存offset信息 Kafka 0.9 版本之后offser存儲在Kafka主題中 4 /kafka/controller 輔助選舉Leader 1 Broker 啟動后在Zookeeper中注冊 2 每個節(jié)點中的Contoller 搶先在Zookeepe

    2024年02月09日
    瀏覽(19)
  • 四、Kafka Broker

    四、Kafka Broker

    4.1.1 Zookeeper 存儲的 Kafka 信息 4.1.2 Kafka Broker 總體工作流程 自己的理解:其實就是將kafka的分區(qū),負載到集群中的各個節(jié)點上。 1、服役新節(jié)點 2、退役舊節(jié)點 1、副本的作用 2、Leader的選舉流程 選舉規(guī)則:在isr中存活為前提,按照AR中排在前面的優(yōu)先。例如ar[1,0,2], isr [1,0,

    2024年02月11日
    瀏覽(16)
  • Kafka - Broker 詳解

    Kafka - Broker 詳解

    目錄 零、前置 一、Kafka Broker 工作流程 1.Zookeeper 存儲的 Kafka 信息 2.Kafka Broker 總體工作流程 模擬 Kafka 上下線,Zookeeper 中數(shù)據(jù)變化 3.Broker 重要參數(shù) 二、生產(chǎn)經(jīng)驗 節(jié)點服役和退役 1.服役新節(jié)點 新節(jié)點準備 執(zhí)行負載均衡操作 生成負載均衡的計劃 執(zhí)行副本存儲計劃 驗證副本存

    2024年01月24日
    瀏覽(15)
  • Kafka及Kafka消費者的消費問題及線程問題

    Topic:是 Kafka 消息發(fā)布和訂閱的基本單元,同時也是消息的容器。Topic 中的消息被分割成多個分區(qū)進行存儲和處理。 Partition:是 Topic 分區(qū),將 Topic 細分成多個分區(qū),每個分區(qū)可以獨立地存儲在不同的 Broker 中,從而增加了消息的并發(fā)性、可擴展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    瀏覽(30)
  • Kafka-Broker工作流程

    Kafka-Broker工作流程

    ?kafka集群在啟動時,會將每個broker節(jié)點注冊到zookeeper中,每個broker節(jié)點都有一個controller,哪個controller先在zookeeper中注冊,哪個controller就負責監(jiān)聽brokers節(jié)點變化,當有分區(qū)的leader掛掉時,controller會監(jiān)聽到節(jié)點變化,然后去zookeeper中獲取isr,選舉新的leader,選舉的規(guī)則是:在

    2024年02月14日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包