安裝 pykafka文章來源地址http://www.zghlxwxcb.cn/news/detail-587367.html
pip install pykafka
一、消費kafka消息
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
"""kafka 消費者; 動態(tài)傳參,非配置文件傳入;
kafka 的消費者應(yīng)該盡量和生產(chǎn)者保持在不同的節(jié)點上;否則容易將程序陷入死循環(huán)中;
"""
_encode = "UTF-8"
def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
""" 初始化kafka的消費者;
1. 設(shè)置默認 kafka 的主題, 節(jié)點地址, 消費者組 id(不傳入的時候使用默認的值)
2. 當需要設(shè)置特定參數(shù)的時候可以直接在 kwargs 直接傳入,進行解包傳入原始函數(shù);
Args:
topics: str; kafka 的消費主題;
bootstrap_server: list; kafka 的消費者地址;
group_id: str; kafka 的消費者分組 id,默認是 start_task 主要是接收并啟動任務(wù)的消費者,僅此一個消費者組id;
"""
if bootstrap_server is None:
bootstrap_server = bootstrap_servers
self.client = KafkaClient(hosts=bootstrap_server)
# 選擇要消費的topic
vpn_topic = self.client.topics[topics]
self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
consumer_timeout_ms=200,
auto_commit_enable=True,# 自動提交偏移量
auto_offset_reset=OffsetType.LATEST) #LATEST 獲取當前偏移量最新消息 EARLIEST從頭開始獲取信息
def recv(self):
"""
接收消費中的數(shù)據(jù)
Returns:
"""
return self.consumer
def main():
"""
kafka消費隊列入口
:param topic:
:return:
"""
obj = KConsumer(topics="topics_name")
while True:
for message in obj.recv():
data = eval(message.value.decode('utf-8'))
handler_data(data)
if __name__ == '__main__':
main()
二、生產(chǎn)者推送消息
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # 可接受多個client # 查看所有的topic # print(client.topics) topic = client.topics['test_78'] # 選擇一個topic message = "test message2 test message2" with topic.get_sync_producer() as producer: producer.produce(bytes(message, encoding='utf8')) #python3需要編碼 print(message)
文章來源:http://www.zghlxwxcb.cn/news/detail-587367.html
到了這里,關(guān)于python 實時獲取kafka消費隊列信息的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!