python消費kafka數(shù)據(jù)批量插入到es的方法
1、es的批量插入
這是為了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch來實現(xiàn)批量操作,先安裝依賴包,sudo pip install Elasticsearch2
from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量處理 # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()}) self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消費kafka
1.因為kafka是0.8,pykafka不支持zk,只能用get_simple_consumer來實現(xiàn)
2.為了實現(xiàn)多個應(yīng)用同時消費而且不重消費,所以一個應(yīng)用消費一個partition
3. 為是確保消費數(shù)據(jù)量在不滿足10000這個批量值,能在一個時間范圍內(nèi)插入到es中,這里設(shè)置consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。
4.退出等待消費阻塞后導(dǎo)致無法再消費數(shù)據(jù),因此在獲取self.consumer 的外層加入了while True 一個死循環(huán)
#!/usr/bin/python # -*- coding: UTF-8 -*- from pykafka import KafkaClient import logging import logging.config from ConfigUtil import ConfigUtil import datetime class KafkaPython: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") logger_data = logging.getLogger("data") def __init__(self): self.server = ConfigUtil().get("kafka","kafka_server") self.topic = ConfigUtil().get("kafka","topic") self.group = ConfigUtil().get("kafka","group") self.partition_id = int(ConfigUtil().get("kafka","partition")) self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms")) self.consumer = None self.hosts = ConfigUtil().get("es","hosts") self.index_name = ConfigUtil().get("es","index_name") self.type_name = ConfigUtil().get("es","type_name") def getConnect(self): client = KafkaClient(self.server) topic = client.topics[self.topic] p = topic.partitions ps={p.get(self.partition_id)} self.consumer = topic.get_simple_consumer( consumer_group=self.group, auto_commit_enable=True, consumer_timeout_ms=self.consumer_timeout_ms, # num_consumer_fetchers=1, # consumer_id='test1', partitions=ps ) self.starttime = datetime.datetime.now() def beginConsumer(self): print("beginConsumer kafka-python") imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name) #創(chuàng)建ACTIONS count = 0 ACTIONS = [] while True: endtime = datetime.datetime.now() print (endtime - self.starttime).seconds for message in self.consumer: if message is not None: try: count = count + 1 # print(str(message.partition.id)+","+str(message.offset)+","+str(count)) # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count)) action = { "_index": self.index_name, "_type": self.type_name, "_source": message.value } ACTIONS.append(action) if len(ACTIONS) >= 10000: imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() endtime = datetime.datetime.now() print (endtime - self.starttime).seconds #break except (Exception) as e: # self.consumer.commit_offsets() print(e) self.logger.error(e) self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n") # self.logger_data.error(message.value+"\n") # self.consumer.commit_offsets() if len(ACTIONS) > 0: self.logger.info("等待時間超過,consumer_timeout_ms,把集合數(shù)據(jù)插入es") imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() def disConnect(self): self.consumer.close() from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量處理 success = bulk(self.es, data, index=self.index, raise_on_error=True) self.logger.info(success)
3、運行
if __name__ == '__main__': kp = KafkaPython() kp.getConnect() kp.beginConsumer() # kp.disConnect()
注:簡單的寫了一個從kafka中讀取數(shù)據(jù)到一個list里,當(dāng)數(shù)據(jù)達(dá)到一個閾值時,在批量插入到 es的插件
現(xiàn)在還在批量的壓測中。。。
以上這篇python消費kafka數(shù)據(jù)批量插入到es的方法就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
- 在python環(huán)境下運用kafka對數(shù)據(jù)進(jìn)行實時傳輸?shù)姆椒?/a>
- kafka-python批量發(fā)送數(shù)據(jù)的實例
- 對python操作kafka寫入json數(shù)據(jù)的簡單demo分享
- python3實現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中
- python kafka 多線程消費者&手動提交實例
- python 消費 kafka 數(shù)據(jù)教程
- python3連接kafka模塊pykafka生產(chǎn)者簡單封裝代碼
- python每5分鐘從kafka中提取數(shù)據(jù)的例子
- python操作kafka實踐的示例代碼
- 快速上手Python Kafka庫安裝攻略
相關(guān)文章
剛學(xué)完怎么用Python實現(xiàn)定時任務(wù),轉(zhuǎn)頭就跑去撩妹!
朋友問我有沒有定時任務(wù)的模塊,并且越簡單越好.剛好前今天就研究了一下定時任務(wù)模塊,于是就告訴他使用方式,令我沒有想到的是,這貨他學(xué)會了之后,居然買了一個服務(wù)器給女朋友發(fā)消息,發(fā)消息,發(fā)消息……重要的事情說三遍,需要的朋友可以參考下2021-06-06詳解程序意外中斷自動重啟shell腳本(以Python為例)
這篇文章主要介紹了詳解程序意外中斷自動重啟shell腳本(以Python為例),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07python3常用的數(shù)據(jù)清洗方法(小結(jié))
這篇文章主要介紹了python3常用的數(shù)據(jù)清洗方法(小結(jié)),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10python使用beautifulsoup4爬取酷狗音樂代碼實例
這篇文章主要介紹了python使用beautifulsoup4爬取酷狗音樂代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12對python的unittest架構(gòu)公共參數(shù)token提取方法詳解
今天小編就為大家分享一篇對python的unittest架構(gòu)公共參數(shù)token提取方法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12Python函數(shù)實現(xiàn)學(xué)員管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Python函數(shù)實現(xiàn)學(xué)員管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-07-07