欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

kafka消費者kafka-console-consumer接收不到數(shù)據(jù)的解決

 更新時間:2023年03月07日 09:36:45   作者:k55  
這篇文章主要介紹了kafka消費者kafka-console-consumer接收不到數(shù)據(jù)的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

kafka消費者kafka-console-consumer接收不到數(shù)據(jù)

發(fā)送端

接收端

問題

采用內(nèi)置的zookeeper,發(fā)送端發(fā)送數(shù)據(jù),接收端能夠接收數(shù)據(jù)

但是采用外置的zookeeper,發(fā)送端發(fā)送數(shù)據(jù),接收端一直接收不到數(shù)據(jù)

解決

先判斷主題是否一致,如果一致就在關(guān)閉kafka

./kafka-server-stop.sh ../config/server.properties

修改一下配置,確保這些配置已加上,不要用localhost,在listeners的ip地址和端口號要和消費者,生產(chǎn)者的的地址端口號一直

vim ../config/server.propertiesst3

最后把log.dirs后面的文件刪除或者重新?lián)Q個地址

rm -rf /tmp/kafka

重新在前臺啟動kafka,注意查看打印在桌面的日志,有無報錯信息

./kafka-server-start.sh ../config/server.properties

如果沒有報錯信息,啟動正常,那么就可以在后臺啟動了

./kafka-server-start.sh -daemon ../config/server.properties

創(chuàng)建生產(chǎn)者

./kafka-console-producer.sh --broker-list 172.16.193.175:9092 --topic test3

創(chuàng)建消費者

./kafka-console-consumer.sh --bootstrap-server 172.16.193.175:9092 --topic test3 --from-beginning

關(guān)于kafka-console-consumer.sh消費者的一些思考

(人物設(shè)定初步了解kafka的我)

我司現(xiàn)在有三臺kafka服務(wù)器作為一個集群

需求是我寫了一個監(jiān)聽器去監(jiān)聽活動失敗的情況,如果活動失敗則調(diào)用一個統(tǒng)計接口 做數(shù)據(jù)統(tǒng)計

我需要從失敗事件的隨路數(shù)據(jù)中取一些數(shù)據(jù),做一些判斷.

現(xiàn)在我想從集群中看一下失敗事件中的隨路數(shù)據(jù)是否完整正確

于是,我xshell連接上了三臺服務(wù)器并且運行以下命令

./kafka-console-consumer.sh --bootstrap-server broker1IP:9092 --topic topicname
?
./kafka-console-consumer.sh --bootstrap-server broker2IP:9092 --topic topicname
?
./kafka-console-consumer.sh --bootstrap-server broker3IP:9092 --topic topicname

發(fā)現(xiàn)只要發(fā)送一個事件三個服務(wù)器都可以收到事件中的消息

怪了,為什么三臺都會顯示.

我第一反應(yīng)是:這是否是傳說中的leader和follower 同步策略

我問了一下我的leader ,

leader:.....,你知道你這個命令是什么意思嗎?

這個命令就是相當(dāng)于創(chuàng)建了一個消費者去消費了隊列中的消息!

你這個3個服務(wù)器相當(dāng)于啟動了3個消費者去消費了,同一個消息三次!

我:不對啊,同一個消息不能被消費三次啊!?

leader:........,你知道什么是消費者組嗎?你這相當(dāng)于三個消費者組 不信你看看

./kafka-console-consumer.sh -help
?
...
?
--group <String: consumer group id> ? ? ?The consumer group id of the consumer.?
?
...

看到了么?這里可以指定消費者組,你不指定他就默認(rèn)是一個新的消費者組

我: 牛啊!

kafka-console-consumer.sh相關(guān)知識拓展

kafka-console-consumer.sh腳本是一個簡易的消費者控制臺。該 shell 腳本的功能通過調(diào)用 kafka.tools 包下的 ConsoleConsumer 類,并將提供的命令行參數(shù)全部傳給該類實現(xiàn)。

./kafka-console-consumer.sh --bootstrap-server node:9092 --topic topicName
 
//表示從 latest 位移位置開始消費該主題的所有分區(qū)消息,即僅消費正在寫入的消息。
  
 
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName
 
//?表示從指定主題中有效的起始位移位置開始消費所有分區(qū)的消息。
 
 
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName
 
//?消費出的消息結(jié)果將打印出消息體的 key 和 value。
參數(shù)值類型說明有效值
--topicstring被消費的topic
--whiteliststring正則表達式,指定要包含以供使用的主題的白名單
--partitioninteger指定分區(qū)
除非指定’–offset’,否則從分區(qū)結(jié)束(latest)開始消費
--offsetstring執(zhí)行消費的起始o(jì)ffset位置
默認(rèn)值:latest
--from-beginning從存在的最早消息開始,而不是從最新消息開始
--max-messagesinteger消費的最大數(shù)據(jù)量,若不指定,則持續(xù)消費下去
--timeout-msinteger在指定時間間隔內(nèi)沒有消息可用時退出
--bootstrap-serverstring必需(除非使用舊版本的消費者),要連接的服務(wù)器
--key-deserializerstring
--value-deserializerstring
--groupstring指定消費者所屬組的ID
--zookeeperstring必需(僅當(dāng)使用舊的使用者時)連接zookeeper的字符串。
可以給出多個URL以允許故障轉(zhuǎn)移

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論