欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python操作kafka實(shí)踐的示例代碼

 更新時(shí)間:2019年06月19日 09:17:42   作者:Small_office  
這篇文章主要介紹了python操作kafka實(shí)踐的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

1、先看最簡(jiǎn)單的場(chǎng)景,生產(chǎn)者生產(chǎn)消息,消費(fèi)者接收消息,下面是生產(chǎn)者的簡(jiǎn)單代碼。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='xxxx:x')

msg_dict = {
  "sleep_time": 10,
  "db_config": {
    "database": "test_1",
    "host": "xxxx",
    "user": "root",
    "password": "root"
  },
  "table": "msg",
  "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()

下面是消費(fèi)者的簡(jiǎn)單代碼:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

下面是結(jié)果:

2、如果想要完成負(fù)載均衡,就需要知道kafka的分區(qū)機(jī)制,同一個(gè)主題,可以為其分區(qū),在生產(chǎn)者不指定分區(qū)的情況,kafka會(huì)將多個(gè)消息分發(fā)到不同的分區(qū),消費(fèi)者訂閱時(shí)候如果不指定服務(wù)組,會(huì)收到所有分區(qū)的消息,如果指定了服務(wù)組,則同一服務(wù)組的消費(fèi)者會(huì)消費(fèi)不同的分區(qū),如果2個(gè)分區(qū)兩個(gè)消費(fèi)者的消費(fèi)者組消費(fèi),則,每個(gè)消費(fèi)者消費(fèi)一個(gè)分區(qū),如果有三個(gè)消費(fèi)者的服務(wù)組,則會(huì)出現(xiàn)一個(gè)消費(fèi)者消費(fèi)不到數(shù)據(jù);如果想要消費(fèi)同一分區(qū),則需要用不同的服務(wù)組。以此為原理,我們對(duì)消費(fèi)者做如下修改:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

然后我們開(kāi)兩個(gè)消費(fèi)者進(jìn)行消費(fèi),生產(chǎn)者分別往0分區(qū)和1分區(qū)發(fā)消息結(jié)果如下,可以看到,一個(gè)消費(fèi)者只能消費(fèi)0分區(qū),另一個(gè)只能消費(fèi)1分區(qū):


3、kafka提供了偏移量的概念,允許消費(fèi)者根據(jù)偏移量消費(fèi)之前遺漏的內(nèi)容,這基于kafka名義上的全量存儲(chǔ),可以保留大量的歷史數(shù)據(jù),歷史保存時(shí)間是可配置的,一般是7天,如果偏移量定位到了已刪除的位置那也會(huì)有問(wèn)題,但是這種情況可能很?。幻總€(gè)保存的數(shù)據(jù)文件都是以偏移量命名的,當(dāng)前要查的偏移量減去文件名就是數(shù)據(jù)在該文件的相對(duì)位置。要指定偏移量消費(fèi)數(shù)據(jù),需要指定該消費(fèi)者要消費(fèi)的分區(qū),否則代碼會(huì)找不到分區(qū)而無(wú)法消費(fèi),代碼如下:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 獲取test主題的分區(qū)信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

因?yàn)橹付ǖ谋阋肆繛?,所以從一開(kāi)始插入的數(shù)據(jù)都可以查到,而且因?yàn)橹付朔謪^(qū),指定的分區(qū)結(jié)果都可以消費(fèi),結(jié)果如下:

4、有時(shí)候,我們并不需要實(shí)時(shí)獲取數(shù)據(jù),因?yàn)檫@樣可能會(huì)造成性能瓶頸,我們只需要定時(shí)去獲取隊(duì)列里的數(shù)據(jù)然后批量處理就可以,這種情況,我們可以選擇主動(dòng)拉取數(shù)據(jù)

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
  msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息
  print msg
  time.sleep(2)
  index += 1
  print '--------poll index is %s----------' % index

結(jié)果如下,可以看到,每次拉取到的都是前面生產(chǎn)的數(shù)據(jù),可能是多條的列表,也可能沒(méi)有數(shù)據(jù),如果沒(méi)有數(shù)據(jù),則拉取到的為空:

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • python 5個(gè)頂級(jí)異步框架推薦

    python 5個(gè)頂級(jí)異步框架推薦

    這篇文章主要介紹了python5個(gè)頂級(jí)的異步框架,幫助大家更好的利用python進(jìn)行web開(kāi)發(fā),感興趣的朋友可以了解下
    2020-09-09
  • Mac版Python3安裝/升級(jí)的方式

    Mac版Python3安裝/升級(jí)的方式

    這篇文章主要介紹了Mac版Python3安裝/升級(jí)的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 使用Python的matplotlib庫(kù)繪制柱狀圖

    使用Python的matplotlib庫(kù)繪制柱狀圖

    這篇文章主要介紹了使用Python的matplotlib庫(kù)繪制柱狀圖,Matplotlib是Python中最常用的可視化工具之一,可以非常方便地創(chuàng)建海量類型地2D圖表和一些基本的3D圖表,可根據(jù)數(shù)據(jù)集自行定義x,y軸,繪制圖形,需要的朋友可以參考下
    2023-07-07
  • python四個(gè)坐標(biāo)點(diǎn)對(duì)圖片區(qū)域最小外接矩形進(jìn)行裁剪

    python四個(gè)坐標(biāo)點(diǎn)對(duì)圖片區(qū)域最小外接矩形進(jìn)行裁剪

    在圖像裁剪操作中,opencv和pillow兩個(gè)庫(kù)都具有相應(yīng)的函數(shù),如果想要對(duì)目標(biāo)的最小外接矩形進(jìn)行裁剪該如何操作呢?本文就來(lái)詳細(xì)的介紹一下
    2021-06-06
  • python使用pika庫(kù)調(diào)用rabbitmq參數(shù)使用詳情

    python使用pika庫(kù)調(diào)用rabbitmq參數(shù)使用詳情

    這篇文章主要介紹了python使用pika庫(kù)調(diào)用rabbitmq參數(shù)使用詳情,文章通過(guò)展開(kāi)文章主題分享了三種方式,具有一定的參考價(jià)值,需要的朋友可以參考一下
    2022-08-08
  • python dataframe向下向上填充,fillna和ffill的方法

    python dataframe向下向上填充,fillna和ffill的方法

    今天小編就為大家分享一篇python dataframe向下向上填充,fillna和ffill的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-11-11
  • python實(shí)現(xiàn)列表推導(dǎo)式與生成器

    python實(shí)現(xiàn)列表推導(dǎo)式與生成器

    列表推導(dǎo)式和生成器都是Python中處理集合的強(qiáng)大工具,列表推導(dǎo)式用于快速生成列表,而生成器表達(dá)式則提供了一種節(jié)約內(nèi)存的方式來(lái)處理大型數(shù)據(jù)集,下面就來(lái)介紹一下python實(shí)現(xiàn)列表推導(dǎo)式與生成器,感興趣的可以了解一下
    2024-09-09
  • 詳解python的super()的作用和原理

    詳解python的super()的作用和原理

    這篇文章主要介紹了python的super()的作用和原理,super(), 在類的繼承里面super()非常常用, 它解決了子類調(diào)用父類方法的一些問(wèn)題, 父類多次被調(diào)用時(shí)只執(zhí)行一次, 優(yōu)化了執(zhí)行邏輯,下面我們就來(lái)詳細(xì)看一下
    2020-10-10
  • Python?pandas找出、刪除重復(fù)的數(shù)據(jù)實(shí)例

    Python?pandas找出、刪除重復(fù)的數(shù)據(jù)實(shí)例

    在面試中很可能遇到給定一個(gè)含有重復(fù)元素的列表,刪除其中重復(fù)的元素,下面這篇文章主要給大家介紹了關(guān)于Python?pandas找出、刪除重復(fù)數(shù)據(jù)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-07-07
  • 使用Python讀寫(xiě)及壓縮和解壓縮文件的示例

    使用Python讀寫(xiě)及壓縮和解壓縮文件的示例

    Python的os模塊中提供了基本的文件讀寫(xiě)方法,而zipfile模塊則針對(duì)文件的壓縮和解壓縮操作,這里我們就來(lái)看一下使用Python讀寫(xiě)及壓縮和解壓縮文件的示例:
    2016-07-07

最新評(píng)論