對python操作kafka寫入json數(shù)據(jù)的簡單demo分享
更新時間:2018年12月27日 10:00:23 作者:Liu-YanLin
今天小編就為大家分享一篇對python操作kafka寫入json數(shù)據(jù)的簡單demo,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
如下所示:
安裝kafka支持庫pip install kafka-python
from kafka import KafkaProducer import json ''' 生產(chǎn)者demo 向test_lyl2主題中循環(huán)寫入10條json數(shù)據(jù) 注意事項:要寫入json數(shù)據(jù)需加上value_serializer參數(shù),如下代碼 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'] ) for i in range(10): data={ "name":"李四", "age":23, "gender":"男", "id":i } producer.send('test_lyl2', data) producer.close()
from kafka import KafkaConsumer import json ''' 消費者demo 消費test_lyl2主題中的數(shù)據(jù) 注意事項:如需以json格式讀取數(shù)據(jù)需加上value_deserializer參數(shù) ''' consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1", bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'], auto_offset_reset='earliest',value_deserializer=json.loads ) for message in consumer: print(message.value)
以上這篇對python操作kafka寫入json數(shù)據(jù)的簡單demo分享就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
您可能感興趣的文章:
- 在python環(huán)境下運用kafka對數(shù)據(jù)進行實時傳輸?shù)姆椒?/a>
- kafka-python批量發(fā)送數(shù)據(jù)的實例
- python3實現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中
- python消費kafka數(shù)據(jù)批量插入到es的方法
- python kafka 多線程消費者&手動提交實例
- python 消費 kafka 數(shù)據(jù)教程
- python3連接kafka模塊pykafka生產(chǎn)者簡單封裝代碼
- python每5分鐘從kafka中提取數(shù)據(jù)的例子
- python操作kafka實踐的示例代碼
- 快速上手Python Kafka庫安裝攻略
相關文章
Python接口自動化淺析logging封裝及實戰(zhàn)操作
本篇文章主要給大家介紹將了logging常用配置放入yaml配置文件、logging日志封裝及結合登錄用例,講解日志如何在接口測試中運用的實例操作2021-08-08Python調(diào)用Google?Bard的圖文詳解
Google?Bard?是一種開源數(shù)據(jù)可視化和探索工具,可為?開發(fā)人員?提供支持,本文主要為大家介紹了Python調(diào)用Google?Bard的方法,需要的可以參考下2023-08-08Python腳本在Appium庫上對移動應用實現(xiàn)自動化測試
這篇文章主要介紹了使用Python的Appium庫對移動應用實現(xiàn)自動化測試的教程,屬于Python腳本的一個自動化應用,需要的朋友可以參考下2015-04-04