利用擴(kuò)展的方式在PHP中使用Kafka的教程分享
前言
由于之前在 PHP 中使用 Kafka 是通過(guò) composer 包的方式,由于 nmred/kafka-php 很久沒(méi)有維護(hù),并且網(wǎng)上相關(guān)問(wèn)題的文章也比較少。所以我這次換成 PHP 擴(kuò)展 RdKafka 繼續(xù)使用,主要介紹擴(kuò)展安裝和這種方式的基本操作。
安裝
1. 下載
地址(找到與自己環(huán)境匹配的就可以)
2. 目錄
由于 php-rdkafka 依賴(lài) librdkafka,linux 就需要先安裝 librdkafka 后安裝 php-rdkafka,而 windows 版本是如下幾個(gè)文件,安裝方法如下:
(1). 將 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安裝的根目錄下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安裝目錄的 ext 下。
(2). php.ini 配置文件添加 extension=php_rdkafka.dll,最后重啟 PHP。
(3). php-m 或這 phpinfo (); 就可以查看到擴(kuò)展了。
通過(guò) get_declared_classes() 也可以查看到擴(kuò)展里預(yù)設(shè)的函數(shù)了。
使用
1. 生產(chǎn)
public function kafkaTest() { $rk = new \RdKafka\Producer(); $rk->addBrokers("127.0.0.1:9092"); $topic = $rk->newTopic("shop"); $ret = []; for ($i = 0; $i < 5; $i++) { $content = "第" . $i . "次發(fā)送失敗"; $message = ["mobile" => "15623652142", "content" => $content]; $payload = json_encode($message); // 指定向0號(hào)partition生產(chǎn)數(shù)據(jù) $ret[]['produce_res'] = $topic->produce(0, 0, $payload, "sms_$i"); // 隨機(jī)選擇partition //$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload); if ($rk->getOutQLen() > 0) { $ret[]['produce_poll'] = $rk->poll(500); } else { $ret[]['produce_poll'] = $rk->poll(0); } } dump($ret); }
2. 消費(fèi)(從指定的 partition 消費(fèi))
protected function execute(Input $input, Output $output) { $output->writeln("!!!hello kafka!!!"); $conf = new \RdKafka\Conf(); $conf->set('group.id', 'sms-consumer-group'); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1:9092"); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', sys_get_temp_dir()); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("shop", $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while(true) { // 設(shè)置消費(fèi)時(shí)的時(shí)間間隔,單位毫秒,以下表示5秒消費(fèi)一個(gè) $message = $topic->consume(0, 5000); if ($message) { echo "讀取到消息\n\r"; // 消息對(duì)象,包括消息主題,消息創(chuàng)建時(shí)間戳,消息分區(qū)編號(hào),消息主體,消息鍵名,消息長(zhǎng)度等 var_dump($message); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "讀取消息成功:\n\r"; var_dump($message->payload); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "讀取消息失敗\n\r"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "請(qǐng)求超時(shí)\n\r"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } else { echo "未讀取到消息\n\r"; } } $output->writeln("!!!the end!!!"); }
其他
在執(zhí)行消費(fèi)過(guò)程中,發(fā)現(xiàn) kafka 停止服務(wù),拋出的異常:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。
解決方法
刪除 kafka-logs 下的所有日志,再重新啟動(dòng) Kafaka, kafka-server-start.bat ....\config\server.properties &
到此這篇關(guān)于利用擴(kuò)展的方式在PHP中使用Kafka的教程分享的文章就介紹到這了,更多相關(guān)PHP使用Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
php超快高效率統(tǒng)計(jì)大文件行數(shù)
這篇文章主要介紹了php超快高效率統(tǒng)計(jì)大文件行數(shù)的相關(guān)資料,需要的朋友可以參考下2015-07-07Json_encode防止?jié)h字轉(zhuǎn)義成unicode的方法
json_encode通常會(huì)把json中的漢字轉(zhuǎn)義成unicode,但是有些時(shí)候不是我們想要的,下面小編給大家介紹json_encode防止?jié)h字轉(zhuǎn)義成unicode的方法,需要的朋友參考下吧2016-02-02php使用遞歸函數(shù)實(shí)現(xiàn)數(shù)字累加的方法
這篇文章主要介紹了php使用遞歸函數(shù)實(shí)現(xiàn)數(shù)字累加的方法,涉及php遞歸操作的技巧,需要的朋友可以參考下2015-03-03php魔術(shù)函數(shù)__call()用法實(shí)例分析
這篇文章主要介紹了php魔術(shù)函數(shù)__call()用法,實(shí)例分析了__call()函數(shù)的功能及使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-02-02PHP超級(jí)全局變量、魔術(shù)變量和魔術(shù)函數(shù)匯總整理
這篇文章主要介紹了PHP超級(jí)全局變量、魔術(shù)變量和魔術(shù)函數(shù)匯總整理,有需要的同學(xué)可以看下2021-02-02