python?實時獲取kafka消費隊列信息示例詳解
更新時間:2023年07月24日 08:40:03 作者:xiaoming0018
這篇文章主要介紹了python實時獲取kafka消費隊列信息,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
安裝 pykafka
pip install pykafka
一、消費kafka消息
#!/usr/bin/env python # -*- coding: utf-8 -*- from pykafka import KafkaClient from pykafka.common import OffsetType from vpn_data_handler import handler_data bootstrap_servers = '10.*.**.**:9092' group_id = 'test1' class KConsumer(object): """kafka 消費者; 動態(tài)傳參,非配置文件傳入; kafka 的消費者應該盡量和生產(chǎn)者保持在不同的節(jié)點上;否則容易將程序陷入死循環(huán)中; """ _encode = "UTF-8" def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None): """ 初始化kafka的消費者; 1. 設置默認 kafka 的主題, 節(jié)點地址, 消費者組 id(不傳入的時候使用默認的值) 2. 當需要設置特定參數(shù)的時候可以直接在 kwargs 直接傳入,進行解包傳入原始函數(shù); Args: topics: str; kafka 的消費主題; bootstrap_server: list; kafka 的消費者地址; group_id: str; kafka 的消費者分組 id,默認是 start_task 主要是接收并啟動任務的消費者,僅此一個消費者組id; """ if bootstrap_server is None: bootstrap_server = bootstrap_servers self.client = KafkaClient(hosts=bootstrap_server) # 選擇要消費的topic vpn_topic = self.client.topics[topics] self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id, consumer_timeout_ms=200, auto_commit_enable=True,# 自動提交偏移量 auto_offset_reset=OffsetType.LATEST) #LATEST 獲取當前偏移量最新消息 EARLIEST從頭開始獲取信息 def recv(self): """ 接收消費中的數(shù)據(jù) Returns: """ return self.consumer def main(): """ kafka消費隊列入口 :param topic: :return: """ obj = KConsumer(topics="topics_name") while True: for message in obj.recv(): data = eval(message.value.decode('utf-8')) handler_data(data) if __name__ == '__main__': main()
二、生產(chǎn)者推送消息
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # 可接受多個client # 查看所有的topic # print(client.topics) topic = client.topics['test_78'] # 選擇一個topic message = "test message2 test message2" with topic.get_sync_producer() as producer: producer.produce(bytes(message, encoding='utf8')) #python3需要編碼 print(message)
到此這篇關于python 實時獲取kafka消費隊列信息的文章就介紹到這了,更多相關python kafka消費隊列信息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python3實現(xiàn)發(fā)送QQ郵件功能(附件)
這篇文章主要為大家詳細介紹了Python3實現(xiàn)發(fā)送QQ郵件功能,附件方面,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12Python數(shù)據(jù)結(jié)構(gòu)之列表與元組詳解
序列是Python中最基本的數(shù)據(jù)結(jié)構(gòu)。序列中的每個元素都分配一個數(shù)字 - 它的位置,或索引,第一個索引是0,第二個索引是1,依此類推,元組與列表類似,不同之處在于元組的元素不能修改。元組使用小括號,列表使用方括號2021-10-10Python數(shù)據(jù)分析之真實IP請求Pandas詳解
這篇文章主要給大家介紹了Python數(shù)據(jù)分析之真實IP請求Pandas,文中通過示例嗲嗎給大家介紹的很詳細,相信對大家的學習或者理解具有一定的參考借鑒價值,有需要的朋友們可以參考借鑒,下面來一起學習學習吧。2016-11-11