欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

在python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)傳輸?shù)姆椒?/h1>
 更新時(shí)間:2018年12月27日 10:37:06   作者:真夢(mèng)行路  
今天小編就為大家分享一篇在python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)傳輸?shù)姆椒?,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧

背景:

為了滿足各個(gè)平臺(tái)間數(shù)據(jù)的傳輸,以及能確保歷史性和實(shí)時(shí)性。先選用kafka作為不同平臺(tái)數(shù)據(jù)傳輸?shù)闹修D(zhuǎn)站,來滿足我們對(duì)跨平臺(tái)數(shù)據(jù)發(fā)送與接收的需要。

kafka簡(jiǎn)介:

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息。

總之:kafka做為中轉(zhuǎn)站有以下功能:

1.生產(chǎn)者(產(chǎn)生數(shù)據(jù)或者說是從外部接收數(shù)據(jù))

2.消費(fèi)著(將接收到的數(shù)據(jù)轉(zhuǎn)花為自己所需用的格式)

環(huán)境:

1.python3.5.x

2.kafka1.4.3

3.pandas

準(zhǔn)備開始:

1.kafka的安裝

pip install kafka-python

python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行傳輸

2.檢驗(yàn)kafka是否安裝成功

python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行傳輸

3.pandas的安裝

pip install pandas

4.kafka數(shù)據(jù)的傳輸

直接擼代碼:

# -*- coding: utf-8 -*-
'''
@author: 真夢(mèng)行路
@file: kafka.py
@time: 2018/9/3 10:20
'''
import sys
import json
import pandas as pd
import os
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
 
KAFAKA_HOST = "xxx.xxx.x.xxx" #服務(wù)器端口地址
KAFAKA_PORT = 9092    #端口號(hào)
KAFAKA_TOPIC = "topic0"  #topic
 
data=pd.read_csv(os.getcwd()+'\\data\\1.csv')
key_value=data.to_json()
class Kafka_producer():
 '''
 生產(chǎn)模塊:根據(jù)不同的key,區(qū)分消息
 '''
 
 def __init__(self, kafkahost, kafkaport, kafkatopic, key):
  self.kafkaHost = kafkahost
  self.kafkaPort = kafkaport
  self.kafkatopic = kafkatopic
  self.key = key
  self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
   kafka_host=self.kafkaHost,
   kafka_port=self.kafkaPort)
  )
 
 def sendjsondata(self, params):
  try:
   parmas_message = params  #注意dumps
   producer = self.producer
   producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
   producer.flush()
  except KafkaError as e:
   print(e)
 
 
class Kafka_consumer():
 
 
 def __init__(self, kafkahost, kafkaport, kafkatopic, groupid,key):
  self.kafkaHost = kafkahost
  self.kafkaPort = kafkaport
  self.kafkatopic = kafkatopic
  self.groupid = groupid
  self.key = key
  self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
          bootstrap_servers='{kafka_host}:{kafka_port}'.format(
           kafka_host=self.kafkaHost,
           kafka_port=self.kafkaPort)
          )
 
 def consume_data(self):
  try:
   for message in self.consumer:
    yield message
  except KeyboardInterrupt as e:
   print(e)
 
def sortedDictValues(adict):
 items = adict.items()
 items=sorted(items,reverse=False)
 return [value for key, value in items]
 
def main(xtype, group, key):
 '''
 測(cè)試consumer和producer
 '''
 if xtype == "p":
  # 生產(chǎn)模塊
  producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
  print("===========> producer:", producer)
  params =key_value
  producer.sendjsondata(params)
 
 
 if xtype == 'c':
  # 消費(fèi)模塊
  consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group,key)
  print("===========> consumer:", consumer)
 
  message = consumer.consume_data()
  for msg in message:
   msg=msg.value.decode('utf-8')
   python_data=json.loads(msg) ##這是一個(gè)字典
   key_list=list(python_data)
   test_data=pd.DataFrame()
   for index in key_list:
    print(index)
    if index=='Month':
     a1=python_data[index]
     data1 = sortedDictValues(a1)
     test_data[index]=data1
    else:
     a2 = python_data[index]
     data2 = sortedDictValues(a2)
     test_data[index] = data2
     print(test_data)
 
 
 
   # print('value---------------->', python_data)
   # print('msg---------------->', msg)
   # print('key---------------->', msg.kry)
   # print('offset---------------->', msg.offset)
 
 
 
if __name__ == '__main__':
 main(xtype='p',group='py_test',key=None)
 main(xtype='c',group='py_test',key=None)

python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行傳輸

數(shù)據(jù)1.csv如下所示:

python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行傳輸

幾點(diǎn)注意:

1、一定要有一個(gè)服務(wù)器的端口地址,不要用本機(jī)的ip或者亂寫一個(gè)ip不然程序會(huì)報(bào)錯(cuò)。(我開始就是拿本機(jī)ip懟了半天,總是報(bào)錯(cuò))

2、注意數(shù)據(jù)的傳輸格式以及編碼問題(二進(jìn)制傳輸),數(shù)據(jù)先轉(zhuǎn)成json數(shù)據(jù)格式傳輸,然后將json格式轉(zhuǎn)為需要格式。(不是json格式的注意dumps)

例中,dataframe->json->dataframe

3、例中dict轉(zhuǎn)dataframe,也可以用簡(jiǎn)單方法直接轉(zhuǎn)。

eg: type(data) ==>dict,data=pd.Dateframe(data)

以上這篇在python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)傳輸?shù)姆椒ň褪切【幏窒斫o大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • OpenCV-Python模板匹配人眼的實(shí)例

    OpenCV-Python模板匹配人眼的實(shí)例

    模板匹配是指在當(dāng)前圖像A內(nèi)尋找與圖像B最相似的部分,本文詳細(xì)的介紹了OpenCV-Python模板匹配人眼的實(shí)例,感興趣的可以了解一下
    2021-06-06
  • Python 實(shí)現(xiàn)PS濾鏡中的徑向模糊特效

    Python 實(shí)現(xiàn)PS濾鏡中的徑向模糊特效

    這篇文章主要介紹了Python 實(shí)現(xiàn) PS 濾鏡中的徑向模糊特效,幫助大家更好的利用python處理圖片,感興趣的朋友可以了解下
    2020-12-12
  • pytorch加載訓(xùn)練好的模型用來測(cè)試或者處理方式

    pytorch加載訓(xùn)練好的模型用來測(cè)試或者處理方式

    這篇文章主要介紹了pytorch加載訓(xùn)練好的模型用來測(cè)試或者處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Python讀寫TOML文件的示例代碼

    Python讀寫TOML文件的示例代碼

    TOML?文件表示(Tom's?Obvious,最小語言),它的語法主要由鍵=值對(duì)組成,本文主要為大家詳細(xì)介紹了如何使用Python讀寫TOML文件,感興趣的小伙伴可以了解下
    2023-08-08
  • Python驗(yàn)證的50個(gè)常見正則表達(dá)式

    Python驗(yàn)證的50個(gè)常見正則表達(dá)式

    這篇文章主要給大家介紹了關(guān)于利用Python驗(yàn)證的50個(gè)常見正則表達(dá)式的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • pandas將DataFrame的幾列數(shù)據(jù)合并成為一列

    pandas將DataFrame的幾列數(shù)據(jù)合并成為一列

    本文主要介紹了pandas將DataFrame的幾列數(shù)據(jù)合并成為一列,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • python中的字符串類型解讀

    python中的字符串類型解讀

    這篇文章主要介紹了python中的字符串類型,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-06-06
  • Python?ORM框架之SQLAlchemy?的基礎(chǔ)用法

    Python?ORM框架之SQLAlchemy?的基礎(chǔ)用法

    這篇文章主要介紹了Python?ORM框架之SQLAlchemy?的基礎(chǔ)用法,ORM全稱?Object?Relational?Mapping對(duì)象關(guān)系映射,更多詳細(xì)內(nèi)容需要的小伙伴課題參考下面文章介紹。希望對(duì)你的學(xué)習(xí)有所幫助
    2022-03-03
  • Python利用openpyxl類實(shí)現(xiàn)在Excel中繪制樂高圖案

    Python利用openpyxl類實(shí)現(xiàn)在Excel中繪制樂高圖案

    在商場(chǎng)看到一個(gè)超級(jí)瑪麗的樂高圖,感覺使用excel的顏色填充也能畫出來。所以本文將借助openpyxl類實(shí)現(xiàn)在Excel中繪制樂高圖案,需要的可以參考一下
    2022-12-12
  • django修改models重建數(shù)據(jù)庫的操作

    django修改models重建數(shù)據(jù)庫的操作

    這篇文章主要介紹了django修改models重建數(shù)據(jù)庫的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-03-03

最新評(píng)論