python消費kafka數(shù)據(jù)批量插入到es的方法
1、es的批量插入
這是為了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch來實現(xiàn)批量操作,先安裝依賴包,sudo pip install Elasticsearch2
from elasticsearch import Elasticsearch
class ImportEsData:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type
def set_date(self,data):
# 批量處理
# es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消費kafka
1.因為kafka是0.8,pykafka不支持zk,只能用get_simple_consumer來實現(xiàn)
2.為了實現(xiàn)多個應用同時消費而且不重消費,所以一個應用消費一個partition
3. 為是確保消費數(shù)據(jù)量在不滿足10000這個批量值,能在一個時間范圍內(nèi)插入到es中,這里設置consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。
4.退出等待消費阻塞后導致無法再消費數(shù)據(jù),因此在獲取self.consumer 的外層加入了while True 一個死循環(huán)
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime
class KafkaPython:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
logger_data = logging.getLogger("data")
def __init__(self):
self.server = ConfigUtil().get("kafka","kafka_server")
self.topic = ConfigUtil().get("kafka","topic")
self.group = ConfigUtil().get("kafka","group")
self.partition_id = int(ConfigUtil().get("kafka","partition"))
self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
self.consumer = None
self.hosts = ConfigUtil().get("es","hosts")
self.index_name = ConfigUtil().get("es","index_name")
self.type_name = ConfigUtil().get("es","type_name")
def getConnect(self):
client = KafkaClient(self.server)
topic = client.topics[self.topic]
p = topic.partitions
ps={p.get(self.partition_id)}
self.consumer = topic.get_simple_consumer(
consumer_group=self.group,
auto_commit_enable=True,
consumer_timeout_ms=self.consumer_timeout_ms,
# num_consumer_fetchers=1,
# consumer_id='test1',
partitions=ps
)
self.starttime = datetime.datetime.now()
def beginConsumer(self):
print("beginConsumer kafka-python")
imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
#創(chuàng)建ACTIONS
count = 0
ACTIONS = []
while True:
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
for message in self.consumer:
if message is not None:
try:
count = count + 1
# print(str(message.partition.id)+","+str(message.offset)+","+str(count))
# self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
action = {
"_index": self.index_name,
"_type": self.type_name,
"_source": message.value
}
ACTIONS.append(action)
if len(ACTIONS) >= 10000:
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
#break
except (Exception) as e:
# self.consumer.commit_offsets()
print(e)
self.logger.error(e)
self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
# self.logger_data.error(message.value+"\n")
# self.consumer.commit_offsets()
if len(ACTIONS) > 0:
self.logger.info("等待時間超過,consumer_timeout_ms,把集合數(shù)據(jù)插入es")
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
def disConnect(self):
self.consumer.close()
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type
def set_date(self,data):
# 批量處理
success = bulk(self.es, data, index=self.index, raise_on_error=True)
self.logger.info(success)
3、運行
if __name__ == '__main__': kp = KafkaPython() kp.getConnect() kp.beginConsumer() # kp.disConnect()
注:簡單的寫了一個從kafka中讀取數(shù)據(jù)到一個list里,當數(shù)據(jù)達到一個閾值時,在批量插入到 es的插件
現(xiàn)在還在批量的壓測中。。。
以上這篇python消費kafka數(shù)據(jù)批量插入到es的方法就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
- 在python環(huán)境下運用kafka對數(shù)據(jù)進行實時傳輸?shù)姆椒?/a>
- kafka-python批量發(fā)送數(shù)據(jù)的實例
- 對python操作kafka寫入json數(shù)據(jù)的簡單demo分享
- python3實現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中
- python kafka 多線程消費者&手動提交實例
- python 消費 kafka 數(shù)據(jù)教程
- python3連接kafka模塊pykafka生產(chǎn)者簡單封裝代碼
- python每5分鐘從kafka中提取數(shù)據(jù)的例子
- python操作kafka實踐的示例代碼
- 快速上手Python Kafka庫安裝攻略
相關文章
剛學完怎么用Python實現(xiàn)定時任務,轉(zhuǎn)頭就跑去撩妹!
朋友問我有沒有定時任務的模塊,并且越簡單越好.剛好前今天就研究了一下定時任務模塊,于是就告訴他使用方式,令我沒有想到的是,這貨他學會了之后,居然買了一個服務器給女朋友發(fā)消息,發(fā)消息,發(fā)消息……重要的事情說三遍,需要的朋友可以參考下2021-06-06
詳解程序意外中斷自動重啟shell腳本(以Python為例)
這篇文章主要介紹了詳解程序意外中斷自動重啟shell腳本(以Python為例),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-07-07
python使用beautifulsoup4爬取酷狗音樂代碼實例
這篇文章主要介紹了python使用beautifulsoup4爬取酷狗音樂代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-12-12
對python的unittest架構公共參數(shù)token提取方法詳解
今天小編就為大家分享一篇對python的unittest架構公共參數(shù)token提取方法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12
Python函數(shù)實現(xiàn)學員管理系統(tǒng)
這篇文章主要為大家詳細介紹了Python函數(shù)實現(xiàn)學員管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-07-07

