国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

python 實時獲取kafka消費隊列信息

這篇具有很好參考價值的文章主要介紹了python 實時獲取kafka消費隊列信息。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

安裝 pykafka文章來源地址http://www.zghlxwxcb.cn/news/detail-587367.html

pip install pykafka

一、消費kafka消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data


bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'


class KConsumer(object):
    """kafka 消費者; 動態(tài)傳參,非配置文件傳入;
      kafka 的消費者應(yīng)該盡量和生產(chǎn)者保持在不同的節(jié)點上;否則容易將程序陷入死循環(huán)中;
     """
    _encode = "UTF-8"

    def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
        """ 初始化kafka的消費者;
           1. 設(shè)置默認 kafka 的主題, 節(jié)點地址, 消費者組 id(不傳入的時候使用默認的值)
           2. 當需要設(shè)置特定參數(shù)的時候可以直接在 kwargs 直接傳入,進行解包傳入原始函數(shù);
         Args:
           topics: str; kafka 的消費主題;
           bootstrap_server: list; kafka 的消費者地址;
           group_id: str; kafka 的消費者分組 id,默認是 start_task 主要是接收并啟動任務(wù)的消費者,僅此一個消費者組id;
         """
        if bootstrap_server is None:
            bootstrap_server = bootstrap_servers
        self.client = KafkaClient(hosts=bootstrap_server)
        # 選擇要消費的topic
        vpn_topic = self.client.topics[topics]
        self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
                                                      consumer_timeout_ms=200,
                                                      auto_commit_enable=True,# 自動提交偏移量
                                                      auto_offset_reset=OffsetType.LATEST)  #LATEST 獲取當前偏移量最新消息  EARLIEST從頭開始獲取信息

    def recv(self):
        """
         接收消費中的數(shù)據(jù)
         Returns:
         """
        return self.consumer


def main():
    """
    kafka消費隊列入口
    :param topic:
    :return:
    """
    obj = KConsumer(topics="topics_name")
    while True:
        for message in obj.recv():
            data = eval(message.value.decode('utf-8'))
            handler_data(data)


if __name__ == '__main__':
    main()

二、生產(chǎn)者推送消息

#!/usr/bin/python
# -*- coding:utf-8 -*-

from pykafka import KafkaClient

client = KafkaClient(hosts="10.XX0.XX0.XX4:9092")  # 可接受多個client
# 查看所有的topic
# print(client.topics)

topic = client.topics['test_78'] # 選擇一個topic

message = "test message2 test message2"
with topic.get_sync_producer() as producer:
    producer.produce(bytes(message, encoding='utf8')) #python3需要編碼

    print(message)

到了這里,關(guān)于python 實時獲取kafka消費隊列信息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    1. Kafka 消費者是什么? 消費者負責(zé)訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費理念中還有一層消費組的概念,每個消費者都有一個對應(yīng)的消費組。當消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    最簡單的提交方式是讓消費者自動提交偏移量,自動提交 offset 的相關(guān)參數(shù): enable.auto.commit:是否開啟自動提交 offset 功能,默認為 true; auto.commit.interval.ms:自動提交 offset 的時間間隔,默認為5秒; 如果 enable.auto.commit 被設(shè)置為true,那么每過5秒,消費者就會自動提交 poll() 返

    2024年02月12日
    瀏覽(32)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    01. 創(chuàng)建消費者 在讀取消息之前,需要先創(chuàng)建一個KafkaConsumer對象。創(chuàng)建KafkaConsumer對象與創(chuàng)建KafkaProducer對象非常相似——把想要傳給消費者的屬性放在Properties對象里。 為簡單起見,這里只提供4個必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • 分布式消息隊列Kafka(四)- 消費者

    分布式消息隊列Kafka(四)- 消費者

    1.Kafka消費方式 2.Kafka消費者工作流程 (1)總體工作流程 (2)消費者組工作流程 3.消費者API (1)單個消費者消費 實現(xiàn)代碼 (2)單個消費者指定分區(qū)消費 代碼實現(xiàn): (3)消費者組消費 復(fù)制上面CustomConsumer三個,同時去訂閱統(tǒng)一個主題,消費數(shù)據(jù),發(fā)現(xiàn)一個分區(qū)只能被一個

    2023年04月26日
    瀏覽(33)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準備 創(chuàng)建主題 test 有5個分區(qū),準備 3 個消費者并進行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 消息隊列-Kafka-消費方如何分區(qū)與分區(qū)重平衡

    消息隊列-Kafka-消費方如何分區(qū)與分區(qū)重平衡

    消費分區(qū) 資料來源于網(wǎng)絡(luò) 消費者訂閱的入口:KafkaConsumer#subscribe 消費者消費的入口:KafkaConsumer#poll 處理流程: 對元數(shù)據(jù)重平衡處理:KafkaConsumer#updateAssignmentMetadataIfNeeded 協(xié)調(diào)器的拉取處理:onsumerCoordinator#poll 執(zhí)行已完成的【消費進度】提交請求的回調(diào)函數(shù):invokeCompletedOff

    2024年04月14日
    瀏覽(23)
  • 分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    01. Kafka 消費者分區(qū)再均衡是什么? 消費者群組里的消費者共享主題分區(qū)的所有權(quán)。當一個新消費者加入群組時,它將開始讀取一部分原本由其他消費者讀取的消息。當一個消費者被關(guān)閉或發(fā)生崩潰時,它將離開群組,原本由它讀取的分區(qū)將由群組里的其他消費者讀取。 分區(qū)

    2024年02月12日
    瀏覽(31)
  • 流批一體計算引擎-4-[Flink]消費kafka實時數(shù)據(jù)

    流批一體計算引擎-4-[Flink]消費kafka實時數(shù)據(jù)

    Python3.6.9 Flink 1.15.2消費Kafaka Topic PyFlink基礎(chǔ)應(yīng)用之kafka 通過PyFlink作業(yè)處理Kafka數(shù)據(jù) PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系統(tǒng)中安裝了多個版本的python3 。 二、環(huán)境變量path作用順序 三、安裝Pyflink 1.3.2 配置Flink Kafka連接 (1)在https://mvnr

    2024年02月06日
    瀏覽(35)
  • 掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實時消費Kafka數(shù)據(jù)的案例是探索實時數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實用,而且對于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個在 有界 數(shù)據(jù)流和 無界 數(shù)據(jù)流上進行有狀態(tài)計算分布式處理引擎和框架。Flink 設(shè)計旨

    2024年02月03日
    瀏覽(31)
  • 【消息隊列】細說Kafka消費者的分區(qū)分配和重平衡

    【消息隊列】細說Kafka消費者的分區(qū)分配和重平衡

    我們直到在性能設(shè)計中異步模式,一般要么是采用pull,要么采用push。而兩種方式各有優(yōu)缺點。 pull :說白了就是通過消費端進行主動拉去數(shù)據(jù),會根據(jù)自身系統(tǒng)處理能力去獲取消息,上有Broker系統(tǒng)無需關(guān)注消費端的消費能力。kafka采用pull模式 push : Broker主動推送消息到消費端

    2024年02月12日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包