欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python?實時獲取kafka消費隊列信息示例詳解

 更新時間:2023年07月24日 08:40:03   作者:xiaoming0018  
這篇文章主要介紹了python實時獲取kafka消費隊列信息,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

安裝 pykafka

pip install pykafka

一、消費kafka消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
    """kafka 消費者; 動態(tài)傳參,非配置文件傳入;
      kafka 的消費者應該盡量和生產(chǎn)者保持在不同的節(jié)點上;否則容易將程序陷入死循環(huán)中;
     """
    _encode = "UTF-8"
    def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
        """ 初始化kafka的消費者;
           1. 設置默認 kafka 的主題, 節(jié)點地址, 消費者組 id(不傳入的時候使用默認的值)
           2. 當需要設置特定參數(shù)的時候可以直接在 kwargs 直接傳入,進行解包傳入原始函數(shù);
         Args:
           topics: str; kafka 的消費主題;
           bootstrap_server: list; kafka 的消費者地址;
           group_id: str; kafka 的消費者分組 id,默認是 start_task 主要是接收并啟動任務的消費者,僅此一個消費者組id;
         """
        if bootstrap_server is None:
            bootstrap_server = bootstrap_servers
        self.client = KafkaClient(hosts=bootstrap_server)
        # 選擇要消費的topic
        vpn_topic = self.client.topics[topics]
        self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
                                                      consumer_timeout_ms=200,
                                                      auto_commit_enable=True,# 自動提交偏移量
                                                      auto_offset_reset=OffsetType.LATEST)  #LATEST 獲取當前偏移量最新消息  EARLIEST從頭開始獲取信息
    def recv(self):
        """
         接收消費中的數(shù)據(jù)
         Returns:
         """
        return self.consumer
def main():
    """
    kafka消費隊列入口
    :param topic:
    :return:
    """
    obj = KConsumer(topics="topics_name")
    while True:
        for message in obj.recv():
            data = eval(message.value.decode('utf-8'))
            handler_data(data)
if __name__ == '__main__':
    main()

二、生產(chǎn)者推送消息

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="10.XX0.XX0.XX4:9092")  # 可接受多個client
# 查看所有的topic
# print(client.topics)
topic = client.topics['test_78'] # 選擇一個topic
message = "test message2 test message2"
with topic.get_sync_producer() as producer:
    producer.produce(bytes(message, encoding='utf8')) #python3需要編碼
    print(message)

到此這篇關于python 實時獲取kafka消費隊列信息的文章就介紹到這了,更多相關python kafka消費隊列信息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • allure結(jié)合python生成測試報告教程

    allure結(jié)合python生成測試報告教程

    這篇文章主要介紹了allure結(jié)合python生成測試報告教程,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 使用python驗證代理ip是否可用的實現(xiàn)方法

    使用python驗證代理ip是否可用的實現(xiàn)方法

    驗證代理IP是否可用。原理是使用代理IP訪問指定網(wǎng)站,如果返回狀態(tài)為200,表示這個代理是可以使用的。這篇文章重點給大家介紹使用python驗證代理ip是否可用的實現(xiàn)方法,感興趣的朋友一起看看吧
    2018-07-07
  • pytorch中的廣播語義

    pytorch中的廣播語義

    這篇文章主要介紹了pytorch中的廣播語義,pytorch的廣播語義即broadcasting semantics,和numpy的很像,下面文章介紹更多相關內(nèi)容的介紹,需要的小伙伴可以參考一下
    2022-03-03
  • python模塊itsdangerous簡單介紹

    python模塊itsdangerous簡單介紹

    這篇文章主要介紹了python模塊itsdangerous簡單介紹,本文通過案例分析給大家詳細講解,對python模塊itsdangerous相關知識感興趣的朋友一起看看吧
    2022-11-11
  • python中使用正則表達式的后向搜索肯定模式(推薦)

    python中使用正則表達式的后向搜索肯定模式(推薦)

    這篇文章主要介紹了python里使用正則表達式的后向搜索肯定模式,本文通過代碼介紹的非常詳細,包括語法介紹,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2017-11-11
  • Python3實現(xiàn)發(fā)送QQ郵件功能(附件)

    Python3實現(xiàn)發(fā)送QQ郵件功能(附件)

    這篇文章主要為大家詳細介紹了Python3實現(xiàn)發(fā)送QQ郵件功能,附件方面,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • Python數(shù)據(jù)結(jié)構(gòu)之列表與元組詳解

    Python數(shù)據(jù)結(jié)構(gòu)之列表與元組詳解

    序列是Python中最基本的數(shù)據(jù)結(jié)構(gòu)。序列中的每個元素都分配一個數(shù)字 - 它的位置,或索引,第一個索引是0,第二個索引是1,依此類推,元組與列表類似,不同之處在于元組的元素不能修改。元組使用小括號,列表使用方括號
    2021-10-10
  • python實現(xiàn)尼姆游戲

    python實現(xiàn)尼姆游戲

    這篇文章通過詳細的python代碼實現(xiàn)尼姆游戲,小編覺得挺不錯的?,F(xiàn)在分享給大家,也給大家做個參考,需要的朋友可以收藏下。一起跟隨小編過來看看吧
    2021-12-12
  • Python實現(xiàn)單項鏈表的最全教程

    Python實現(xiàn)單項鏈表的最全教程

    單向鏈表也叫單鏈表,是鏈表中最簡單的一種形式,它的每個節(jié)點包含兩個域,一個信息域(元素域)和一個鏈接域,這個鏈接指向鏈表中的下一個節(jié)點,而最后一個節(jié)點的鏈接域則指向一個空值,這篇文章主要介紹了Python實現(xiàn)單項鏈表,需要的朋友可以參考下
    2023-01-01
  • Python數(shù)據(jù)分析之真實IP請求Pandas詳解

    Python數(shù)據(jù)分析之真實IP請求Pandas詳解

    這篇文章主要給大家介紹了Python數(shù)據(jù)分析之真實IP請求Pandas,文中通過示例嗲嗎給大家介紹的很詳細,相信對大家的學習或者理解具有一定的參考借鑒價值,有需要的朋友們可以參考借鑒,下面來一起學習學習吧。
    2016-11-11

最新評論