python kafka 多線程消費(fèi)者&手動(dòng)提交實(shí)例
官方文檔:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import * from collections import OrderedDict threads = [] # col_dic, sql_dic = get() class MyThread(threading.Thread): def __init__(self, thread_name, topic, partition): threading.Thread.__init__(self) self.thread_name = thread_name # self.keyName = keyName self.partition = partition self.topic = topic def run(self): print("Starting " + self.name) Consumer(self.thread_name, self.topic, self.partition) def stop(self): sys.exit() def Consumer(thread_name, topic, partition): broker_list = '172.16.90.63:6667, 172.16.90.58:6667, 172.16.90.59:6667' ''' fetch_min_bytes(int) - 服務(wù)器為獲取請(qǐng)求而返回的最小數(shù)據(jù)量,否則請(qǐng)等待 fetch_max_wait_ms(int) - 如果沒(méi)有足夠的數(shù)據(jù)立即滿足fetch_min_bytes給出的要求,服務(wù)器在回應(yīng)提取請(qǐng)求之前將阻塞的最大時(shí)間量(以毫秒為單位) fetch_max_bytes(int) - 服務(wù)器應(yīng)為獲取請(qǐng)求返回的最大數(shù)據(jù)量。這不是絕對(duì)最大值,如果獲取的第一個(gè)非空分區(qū)中的第一條消息大于此值, 則仍將返回消息以確保消費(fèi)者可以取得進(jìn)展。注意:使用者并行執(zhí)行對(duì)多個(gè)代理的提取,因此內(nèi)存使用將取決于包含該主題分區(qū)的代理的數(shù)量。 支持的Kafka版本> = 0.10.1.0。默認(rèn)值:52428800(50 MB)。 enable_auto_commit(bool) - 如果為True,則消費(fèi)者的偏移量將在后臺(tái)定期提交。默認(rèn)值:True。 max_poll_records(int) - 單次調(diào)用中返回的最大記錄數(shù)poll()。默認(rèn)值:500 max_poll_interval_ms(int) - poll()使用使用者組管理時(shí)的調(diào)用之間的最大延遲 。這為消費(fèi)者在獲取更多記錄之前可以閑置的時(shí)間量設(shè)置了上限。 如果 poll()在此超時(shí)到期之前未調(diào)用,則認(rèn)為使用者失敗,并且該組將重新平衡以便將分區(qū)重新分配給另一個(gè)成員。默認(rèn)300000 ''' consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id="xiaofesi", client_id=thread_name, enable_auto_commit=False, fetch_min_bytes=1024*1024,#1M # fetch_max_bytes=1024 * 1024 * 1024 * 10, fetch_max_wait_ms=60000,#30s request_timeout_ms=305000, # consumer_timeout_ms=1, # max_poll_records=5000, # max_poll_interval_ms=60000 無(wú)該參數(shù) ) #查出數(shù)據(jù)庫(kù)上次保存的offset,此offset已經(jīng)是上次消費(fèi)最后一條的offset的offset+1,也就是這次消費(fèi)的起始位 dic = get_kafka(topic, partition) tp = TopicPartition(topic, partition) print(thread_name, tp, dic['offset']) #分配該消費(fèi)者的TopicPartition,也就是topic和partition,根據(jù)參數(shù),我是三個(gè)消費(fèi)者,三個(gè)線程,每個(gè)線程消費(fèi)者消費(fèi)一個(gè)分區(qū) consumer.assign([tp]) #重置此消費(fèi)者消費(fèi)的起始位 consumer.seek(tp, dic['offset']) print("程序首次運(yùn)行\(zhòng)t線程:", thread_name, "分區(qū):", partition, "偏移量:", dic['offset'], "\t開(kāi)始消費(fèi)...") num=0 #記錄該消費(fèi)者消費(fèi)次數(shù) # end_offset = consumer.end_offsets([tp])[tp] # print(end_offset) while True: args = OrderedDict() msg = consumer.poll(timeout_ms=60000) end_offset = consumer.end_offsets([tp])[tp] print('已保存的偏移量', consumer.committed(tp),'最新偏移量,',end_offset) if len(msg) > 0: print("線程:", thread_name, "分區(qū):", partition, "最大偏移量:", end_offset, "有無(wú)數(shù)據(jù),", len(msg)) lines=0 for data in msg.values(): for line in data: lines+=1 line = eval(line.value.decode('utf-8')) ''' do something ''' # 線程此批次消息條數(shù) print(thread_name,"lines",lines) #數(shù)據(jù)保存至數(shù)據(jù)庫(kù) is_succeed = save_to_db(args, thread_name) if is_succeed: #更新自己保存在數(shù)據(jù)庫(kù)中的各topic, partition的偏移量 is_succeed1 = update_offset(topic, partition, end_offset) #手動(dòng)提交偏移量 offsets格式:{TopicPartition:OffsetAndMetadata(offset_num,None)} consumer.commit(offsets={tp:(OffsetAndMetadata(end_offset,None))}) print(thread_name,"to db suss",num+1) if is_succeed1 == 0: #系統(tǒng)退出?這個(gè)還沒(méi)試 os.exit() ''' sys.exit() 只能退出該線程,也就是說(shuō)其它兩個(gè)線程正常運(yùn)行,主程序不退出 ''' else: os.exit() else: print(thread_name,'沒(méi)有數(shù)據(jù)') num+=1 print(thread_name,"第",num,"次") if __name__ == '__main__': try: t1 = MyThread("Thread-0", "test", 0) threads.append(t1) t2 = MyThread("Thread-1", "test", 1) threads.append(t2) t3 = MyThread("Thread-2", "test", 2) threads.append(t3) for t in threads: t.start() for t in threads: t.join() print("exit program with 0") except: print("Error: failed to run consumer program")
以上這篇python kafka 多線程消費(fèi)者&手動(dòng)提交實(shí)例就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- 在python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)傳輸?shù)姆椒?/a>
- kafka-python批量發(fā)送數(shù)據(jù)的實(shí)例
- 對(duì)python操作kafka寫入json數(shù)據(jù)的簡(jiǎn)單demo分享
- python3實(shí)現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中
- python消費(fèi)kafka數(shù)據(jù)批量插入到es的方法
- python 消費(fèi) kafka 數(shù)據(jù)教程
- python3連接kafka模塊pykafka生產(chǎn)者簡(jiǎn)單封裝代碼
- python每5分鐘從kafka中提取數(shù)據(jù)的例子
- python操作kafka實(shí)踐的示例代碼
- 快速上手Python Kafka庫(kù)安裝攻略
相關(guān)文章
Python推導(dǎo)式簡(jiǎn)單示例【列表推導(dǎo)式、字典推導(dǎo)式與集合推導(dǎo)式】
這篇文章主要介紹了Python推導(dǎo)式,結(jié)合簡(jiǎn)單實(shí)例形式分析了Python列表推導(dǎo)式、字典推導(dǎo)式與集合推導(dǎo)式基本使用方法,需要的朋友可以參考下2018-12-12關(guān)于Flask項(xiàng)目無(wú)法使用公網(wǎng)IP訪問(wèn)的解決方式
今天小編就為大家分享一篇關(guān)于Flask項(xiàng)目無(wú)法使用公網(wǎng)IP訪問(wèn)的解決方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-11-11利用python、tensorflow、opencv、pyqt5實(shí)現(xiàn)人臉實(shí)時(shí)簽到系統(tǒng)
這篇文章主要介紹了利用python、tensorflow、opencv、pyqt5實(shí)現(xiàn)人臉實(shí)時(shí)簽到系統(tǒng),本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-09-09Python中Jupyter notebook快捷鍵總結(jié)
在本篇文章里小編給大家整理的是一篇關(guān)于Python中Jupyter notebook快捷鍵總結(jié)內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-04-04Python 數(shù)據(jù)結(jié)構(gòu)之旋轉(zhuǎn)鏈表
這篇文章主要介紹了Python 數(shù)據(jù)結(jié)構(gòu)之旋轉(zhuǎn)鏈表的相關(guān)資料,需要的朋友可以參考下2017-02-02Python多線程編程(二):?jiǎn)?dòng)線程的兩種方法
這篇文章主要介紹了Python多線程編程(一):?jiǎn)?dòng)線程的兩種方法,本文講解了將函數(shù)傳遞進(jìn)Thread對(duì)象、繼承自threading.Thread類兩種方法,需要的朋友可以參考下2015-04-04使用python?matplotlib畫折線圖實(shí)例代碼
Matplotlib是一個(gè)Python工具箱,用于科學(xué)計(jì)算的數(shù)據(jù)可視化,下面這篇文章主要給大家介紹了關(guān)于如何使用python?matplotlib畫折線圖的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04詳解Python二維數(shù)組與三維數(shù)組切片的方法
這篇文章主要介紹了詳解Python二維數(shù)組與三維數(shù)組切片的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07