欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python消費kafka數(shù)據(jù)批量插入到es的方法

 更新時間:2018年12月27日 09:52:04   作者:亮亮愛吃蝦  
今天小編就為大家分享一篇python消費kafka數(shù)據(jù)批量插入到es的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧

1、es的批量插入

這是為了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch來實現(xiàn)批量操作,先安裝依賴包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch 
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type


  def set_date(self,data): 
    # 批量處理 
    # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
    self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消費kafka

1.因為kafka是0.8,pykafka不支持zk,只能用get_simple_consumer來實現(xiàn)

2.為了實現(xiàn)多個應(yīng)用同時消費而且不重消費,所以一個應(yīng)用消費一個partition

3. 為是確保消費數(shù)據(jù)量在不滿足10000這個批量值,能在一個時間范圍內(nèi)插入到es中,這里設(shè)置consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。

4.退出等待消費阻塞后導(dǎo)致無法再消費數(shù)據(jù),因此在獲取self.consumer 的外層加入了while True 一個死循環(huán)

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime


class KafkaPython:
  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")
  logger_data = logging.getLogger("data")

  def __init__(self):
    self.server = ConfigUtil().get("kafka","kafka_server")
    self.topic = ConfigUtil().get("kafka","topic")
    self.group = ConfigUtil().get("kafka","group")
    self.partition_id = int(ConfigUtil().get("kafka","partition"))
    self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
    self.consumer = None
    self.hosts = ConfigUtil().get("es","hosts")
    self.index_name = ConfigUtil().get("es","index_name")
    self.type_name = ConfigUtil().get("es","type_name")


  def getConnect(self):
    client = KafkaClient(self.server)
    topic = client.topics[self.topic]
    p = topic.partitions
    ps={p.get(self.partition_id)}

    self.consumer = topic.get_simple_consumer(
      consumer_group=self.group,
      auto_commit_enable=True,
      consumer_timeout_ms=self.consumer_timeout_ms,
      # num_consumer_fetchers=1,
      # consumer_id='test1',
      partitions=ps
      )
    self.starttime = datetime.datetime.now()


  def beginConsumer(self):
    print("beginConsumer kafka-python")
    imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
    #創(chuàng)建ACTIONS 
    count = 0
    ACTIONS = [] 

    while True:
      endtime = datetime.datetime.now()
      print (endtime - self.starttime).seconds
      for message in self.consumer:
        if message is not None:
          try:
            count = count + 1
            # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
            # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
            action = { 
              "_index": self.index_name, 
              "_type": self.type_name, 
              "_source": message.value
            }
            ACTIONS.append(action)
            if len(ACTIONS) >= 10000:
              imprtEsData.set_date(ACTIONS)
              ACTIONS = []
              self.consumer.commit_offsets()
              endtime = datetime.datetime.now()
              print (endtime - self.starttime).seconds
              #break
          except (Exception) as e:
            # self.consumer.commit_offsets()
            print(e)
            self.logger.error(e)
            self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
            # self.logger_data.error(message.value+"\n")
          # self.consumer.commit_offsets()


      if len(ACTIONS) > 0:
        self.logger.info("等待時間超過,consumer_timeout_ms,把集合數(shù)據(jù)插入es")
        imprtEsData.set_date(ACTIONS)
        ACTIONS = []
        self.consumer.commit_offsets()




  def disConnect(self):
    self.consumer.close()


from elasticsearch import Elasticsearch 
from elasticsearch.helpers import bulk
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type


  def set_date(self,data): 
    # 批量處理 
    success = bulk(self.es, data, index=self.index, raise_on_error=True) 
    self.logger.info(success) 

3、運行

if __name__ == '__main__':
  kp = KafkaPython()
  kp.getConnect()
  kp.beginConsumer()
  # kp.disConnect()

注:簡單的寫了一個從kafka中讀取數(shù)據(jù)到一個list里,當(dāng)數(shù)據(jù)達(dá)到一個閾值時,在批量插入到 es的插件

現(xiàn)在還在批量的壓測中。。。

以上這篇python消費kafka數(shù)據(jù)批量插入到es的方法就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 剛學(xué)完怎么用Python實現(xiàn)定時任務(wù),轉(zhuǎn)頭就跑去撩妹!

    剛學(xué)完怎么用Python實現(xiàn)定時任務(wù),轉(zhuǎn)頭就跑去撩妹!

    朋友問我有沒有定時任務(wù)的模塊,并且越簡單越好.剛好前今天就研究了一下定時任務(wù)模塊,于是就告訴他使用方式,令我沒有想到的是,這貨他學(xué)會了之后,居然買了一個服務(wù)器給女朋友發(fā)消息,發(fā)消息,發(fā)消息……重要的事情說三遍,需要的朋友可以參考下
    2021-06-06
  • 詳解程序意外中斷自動重啟shell腳本(以Python為例)

    詳解程序意外中斷自動重啟shell腳本(以Python為例)

    這篇文章主要介紹了詳解程序意外中斷自動重啟shell腳本(以Python為例),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • python3常用的數(shù)據(jù)清洗方法(小結(jié))

    python3常用的數(shù)據(jù)清洗方法(小結(jié))

    這篇文章主要介紹了python3常用的數(shù)據(jù)清洗方法(小結(jié)),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-10-10
  • python使用beautifulsoup4爬取酷狗音樂代碼實例

    python使用beautifulsoup4爬取酷狗音樂代碼實例

    這篇文章主要介紹了python使用beautifulsoup4爬取酷狗音樂代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12
  • Python Tkinter實例——模擬擲骰子

    Python Tkinter實例——模擬擲骰子

    這篇文章主要介紹了Python利用tkinter模塊模擬擲骰子的實例,幫助大家更好的理解和使用python,感興趣的朋友可以了解下
    2020-10-10
  • 對python的unittest架構(gòu)公共參數(shù)token提取方法詳解

    對python的unittest架構(gòu)公共參數(shù)token提取方法詳解

    今天小編就為大家分享一篇對python的unittest架構(gòu)公共參數(shù)token提取方法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-12-12
  • Python使用進(jìn)程Process模塊管理資源

    Python使用進(jìn)程Process模塊管理資源

    這篇文章主要介紹了Python使用進(jìn)程Process模塊管理資源,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-03-03
  • 利用Python的裝飾器解決Bottle框架中用戶驗證問題

    利用Python的裝飾器解決Bottle框架中用戶驗證問題

    這篇文章主要介紹了Python的Bottle框架中解決用戶驗證問題,代碼基于Python2.x版本,需要的朋友可以參考下
    2015-04-04
  • Python函數(shù)實現(xiàn)學(xué)員管理系統(tǒng)

    Python函數(shù)實現(xiàn)學(xué)員管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Python函數(shù)實現(xiàn)學(xué)員管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • Python數(shù)據(jù)抓取爬蟲代理防封IP方法

    Python數(shù)據(jù)抓取爬蟲代理防封IP方法

    在本篇內(nèi)容里小編給大家分享了關(guān)于Python數(shù)據(jù)抓取爬蟲代理防封IP方法講解,需要的朋友們可以跟著學(xué)習(xí)下。
    2018-12-12

最新評論