python?實(shí)時(shí)獲取kafka消費(fèi)隊(duì)列信息示例詳解
安裝 pykafka
pip install pykafka
一、消費(fèi)kafka消息
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
"""kafka 消費(fèi)者; 動(dòng)態(tài)傳參,非配置文件傳入;
kafka 的消費(fèi)者應(yīng)該盡量和生產(chǎn)者保持在不同的節(jié)點(diǎn)上;否則容易將程序陷入死循環(huán)中;
"""
_encode = "UTF-8"
def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
""" 初始化kafka的消費(fèi)者;
1. 設(shè)置默認(rèn) kafka 的主題, 節(jié)點(diǎn)地址, 消費(fèi)者組 id(不傳入的時(shí)候使用默認(rèn)的值)
2. 當(dāng)需要設(shè)置特定參數(shù)的時(shí)候可以直接在 kwargs 直接傳入,進(jìn)行解包傳入原始函數(shù);
Args:
topics: str; kafka 的消費(fèi)主題;
bootstrap_server: list; kafka 的消費(fèi)者地址;
group_id: str; kafka 的消費(fèi)者分組 id,默認(rèn)是 start_task 主要是接收并啟動(dòng)任務(wù)的消費(fèi)者,僅此一個(gè)消費(fèi)者組id;
"""
if bootstrap_server is None:
bootstrap_server = bootstrap_servers
self.client = KafkaClient(hosts=bootstrap_server)
# 選擇要消費(fèi)的topic
vpn_topic = self.client.topics[topics]
self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
consumer_timeout_ms=200,
auto_commit_enable=True,# 自動(dòng)提交偏移量
auto_offset_reset=OffsetType.LATEST) #LATEST 獲取當(dāng)前偏移量最新消息 EARLIEST從頭開(kāi)始獲取信息
def recv(self):
"""
接收消費(fèi)中的數(shù)據(jù)
Returns:
"""
return self.consumer
def main():
"""
kafka消費(fèi)隊(duì)列入口
:param topic:
:return:
"""
obj = KConsumer(topics="topics_name")
while True:
for message in obj.recv():
data = eval(message.value.decode('utf-8'))
handler_data(data)
if __name__ == '__main__':
main()二、生產(chǎn)者推送消息
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # 可接受多個(gè)client
# 查看所有的topic
# print(client.topics)
topic = client.topics['test_78'] # 選擇一個(gè)topic
message = "test message2 test message2"
with topic.get_sync_producer() as producer:
producer.produce(bytes(message, encoding='utf8')) #python3需要編碼
print(message)到此這篇關(guān)于python 實(shí)時(shí)獲取kafka消費(fèi)隊(duì)列信息的文章就介紹到這了,更多相關(guān)python kafka消費(fèi)隊(duì)列信息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
allure結(jié)合python生成測(cè)試報(bào)告教程
這篇文章主要介紹了allure結(jié)合python生成測(cè)試報(bào)告教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
使用python驗(yàn)證代理ip是否可用的實(shí)現(xiàn)方法
驗(yàn)證代理IP是否可用。原理是使用代理IP訪問(wèn)指定網(wǎng)站,如果返回狀態(tài)為200,表示這個(gè)代理是可以使用的。這篇文章重點(diǎn)給大家介紹使用python驗(yàn)證代理ip是否可用的實(shí)現(xiàn)方法,感興趣的朋友一起看看吧2018-07-07
python模塊itsdangerous簡(jiǎn)單介紹
這篇文章主要介紹了python模塊itsdangerous簡(jiǎn)單介紹,本文通過(guò)案例分析給大家詳細(xì)講解,對(duì)python模塊itsdangerous相關(guān)知識(shí)感興趣的朋友一起看看吧2022-11-11
python中使用正則表達(dá)式的后向搜索肯定模式(推薦)
這篇文章主要介紹了python里使用正則表達(dá)式的后向搜索肯定模式,本文通過(guò)代碼介紹的非常詳細(xì),包括語(yǔ)法介紹,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-11-11
Python3實(shí)現(xiàn)發(fā)送QQ郵件功能(附件)
這篇文章主要為大家詳細(xì)介紹了Python3實(shí)現(xiàn)發(fā)送QQ郵件功能,附件方面,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-12-12
Python數(shù)據(jù)結(jié)構(gòu)之列表與元組詳解
序列是Python中最基本的數(shù)據(jù)結(jié)構(gòu)。序列中的每個(gè)元素都分配一個(gè)數(shù)字 - 它的位置,或索引,第一個(gè)索引是0,第二個(gè)索引是1,依此類推,元組與列表類似,不同之處在于元組的元素不能修改。元組使用小括號(hào),列表使用方括號(hào)2021-10-10
Python實(shí)現(xiàn)單項(xiàng)鏈表的最全教程
單向鏈表也叫單鏈表,是鏈表中最簡(jiǎn)單的一種形式,它的每個(gè)節(jié)點(diǎn)包含兩個(gè)域,一個(gè)信息域(元素域)和一個(gè)鏈接域,這個(gè)鏈接指向鏈表中的下一個(gè)節(jié)點(diǎn),而最后一個(gè)節(jié)點(diǎn)的鏈接域則指向一個(gè)空值,這篇文章主要介紹了Python實(shí)現(xiàn)單項(xiàng)鏈表,需要的朋友可以參考下2023-01-01
Python數(shù)據(jù)分析之真實(shí)IP請(qǐng)求Pandas詳解
這篇文章主要給大家介紹了Python數(shù)據(jù)分析之真實(shí)IP請(qǐng)求Pandas,文中通過(guò)示例嗲嗎給大家介紹的很詳細(xì),相信對(duì)大家的學(xué)習(xí)或者理解具有一定的參考借鑒價(jià)值,有需要的朋友們可以參考借鑒,下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2016-11-11

