Laravel中Kafka的使用詳解
本文并沒有kafka的安裝教程,本文是針對已經(jīng)安裝kafka及其配置好kafka的php拓展并且使用laravel框架進行開發(fā)項目,配置一個可供laravel框架使用的生產(chǎn)及消費者類.
以下代碼修改自本站的YII框架關(guān)于kafka類的代碼,經(jīng)過測試使用在本人的項目中,可正常運行,larvael版本:5.6 代碼放置larvael框架位置:app/Tools/Kafka.php
<?php namespace App\Tools; use Illuminate\Config\Repository; use Illuminate\Support\Facades\DB; use Monolog\Logger; use Monolog\Handler\StreamHandler; use Illuminate\Http\Request; class Kafka { public $broker_list = '127.0.0.1';//配置kafka,可以用逗號隔開多個kafka public $topic = 'test';//管道名稱 public $partition = 0; protected $producer = null; protected $consumer = null; public function __construct() { if (empty($this->broker_list)) { throw new InvalidConfigException("broker not config"); } $rk = new \RdKafka\Producer(); if (empty($rk)) { throw new InvalidConfigException("producer error"); } $rk->setLogLevel(LOG_DEBUG); if (!$rk->addBrokers($this->broker_list)) { throw new InvalidConfigException("producer error"); } $this->producer = $rk; } /** * 生產(chǎn)者 * @param array $messages * @return mixed */ public function send($messages = [],$topic) { $topic = $this->producer->newTopic($topic); return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages)); } /** * 消費者 */ public function consumer($object, $callback){ $conf = new \RdKafka\Conf(); $conf->set('group.id', 0); $conf->set('metadata.broker.list', $this->broker_list); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe([$this->topic]); echo "waiting for messages.....\n"; while(true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "message payload...."; $object->$callback($message->payload); break; } sleep(1); } } } ?>
在控制器中如何使用:
首先再頭部導(dǎo)入這個類:use App\Tools\Kafka;
下面是使用生產(chǎn)者實例:
public function test(){ $topic = 'tool';//輸入使用管道名稱 $data['shop_id'] = 58; $data['bar_code']=586; $data['goods_num'] = 1; $data['goods_unit'] = '個'; $Kafka = new Kafka(); $Error_Msg = $Kafka->send($data,$topic);//傳入數(shù)組會自動轉(zhuǎn)換json var_dump($Error_Msg); }
下面是消費者實例,消費者我這里使用了的是php腳本進行的操作:
<?php $conf = new RdKafka\Conf(); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("localhost:9092"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', sys_get_temp_dir()); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("tool", $topicConf);//讀取的管道 // Start consuming partition 0 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: //沒有錯誤打印信息 $message = json_decode(json_encode($message),true); $data = json_decode($message['payload'],true); var_dump($data); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "等待接收信息\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "超時\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } sleep(1); } ?>
到此這篇關(guān)于Laravel中Kafka的使用詳解的文章就介紹到這了,更多相關(guān)Laravel中Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
laravel使用組件實現(xiàn)微信網(wǎng)頁授權(quán)登入
這篇文章主要介紹了laravel使用組件實現(xiàn)微信網(wǎng)頁授權(quán)登入,使用laravel組件 laravel-wechat調(diào)用,使用起來很方便,有需要的同學可以學習下2021-03-03初識通用數(shù)據(jù)庫操作類——前端easyui-datagrid,form(php)
這篇文章主要介紹了初識通用數(shù)據(jù)庫操作類——前端easyui-datagrid,form(php),實現(xiàn)代碼比較簡單,有需要的小伙伴歡迎來參考2015-07-07php導(dǎo)出csv格式數(shù)據(jù)并將數(shù)字轉(zhuǎn)換成文本的思路以及代碼分享
最近接的一個項目,需要將一些統(tǒng)計結(jié)果之類的東西導(dǎo)出成CSV,以便做報表,根據(jù)往常經(jīng)驗,現(xiàn)將思路和代碼都發(fā)出來,如有更好的方法,希望高手指正2014-06-06基于php(Thinkphp)+jquery 實現(xiàn)ajax多選反選不選刪除數(shù)據(jù)功能
這篇文章主要介紹了基于php(Thinkphp)+jquery 實現(xiàn)ajax多選反選不選刪除數(shù)據(jù)功能的相關(guān)資料,需要的朋友可以參考下2017-02-02