Kafka常用命令之kafka-console-consumer.sh解讀
kafka-console-consumer.sh解讀
kafka-console-consumer.sh 腳本是一個簡易的消費者控制臺。
該 shell 腳本的功能通過調用 kafka.tools 包下的 ConsoleConsumer 類,并將提供的命令行參數(shù)全部傳給該類實現(xiàn)。
- 注意:Kafka 從 2.2 版本開始將 kafka-topic.sh 腳本中的 −−zookeeper 參數(shù)標注為 “過時”,推薦使用 −−bootstrap-server 參數(shù)。
- 若讀者依舊使用的是 2.1 及以下版本,請將下述的 --bootstrap-server 參數(shù)及其值手動替換為 --zookeeper zk1:2181,zk2:2181,zk:2181。
- 一定要注意兩者參數(shù)值所指向的集群地址是不同的。
消息消費
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
表示從 latest 位移位置開始消費該主題的所有分區(qū)消息,即僅消費正在寫入的消息。
從開始位置消費
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName
表示從指定主題中有效的起始位移位置開始消費所有分區(qū)的消息。
顯示key消費
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName
消費出的消息結果將打印出消息體的 key 和 value。
若還需要為你的消息添加其他屬性
請參考下述列表
| 參數(shù) | 值類型 | 說明 | 有效值 |
|---|---|---|---|
| --topic | string | 被消費的topic | |
| --whitelist | string | 正則表達式,指定要包含以供使用的主題的白名單 | |
| --partition | integer | 指定分區(qū) 除非指定’–offset’,否則從分區(qū)結束(latest)開始消費 | |
| --offset | string | 執(zhí)行消費的起始offset位置 默認值:latest | latest earliest <offset> |
| --consumer-property | string | 將用戶定義的屬性以key=value的形式傳遞給使用者 | |
| --consumer.config | string | 消費者配置屬性文件 請注意,[consumer-property]優(yōu)先于此配置 | |
| --formatter | string | 用于格式化kafka消息以供顯示的類的名稱 默認值:kafka.tools.DefaultMessageFormatter | kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter |
| --property | string | 初始化消息格式化程序的屬性 | print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
| --from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
| --max-messages | integer | 消費的最大數(shù)據(jù)量,若不指定,則持續(xù)消費下去 | |
| --timeout-ms | integer | 在指定時間間隔內沒有消息可用時退出 | |
| --skip-message-on-error | 如果處理消息時出錯,請?zhí)^它而不是暫停 | ||
| --bootstrap-server | string | 必需(除非使用舊版本的消費者),要連接的服務器 | |
| --key-deserializer | string | ||
| --value-deserializer | string | ||
| --enable-systest-events | 除記錄消費的消息外,還記錄消費者的生命周期 (用于系統(tǒng)測試) | ||
| --isolation-level | string | 設置為read_committed以過濾掉未提交的事務性消息 設置為read_uncommitted以讀取所有消息 默認值:read_uncommitted | |
| --group | string | 指定消費者所屬組的ID | |
| --blacklist | string | 要從消費中排除的主題黑名單 | |
| --csv-reporter-enabled | 如果設置,將啟用csv metrics報告器 | ||
| --delete-consumer-offsets | 如果指定,則啟動時刪除zookeeper中的消費者信息 | ||
| --metrics-dir | string | 輸出csv度量值 需與[csv-reporter-enable]配合使用 | |
| --zookeeper | string | 必需(僅當使用舊的使用者時)連接zookeeper的字符串。 可以給出多個URL以允許故障轉移 |
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Java實現(xiàn)學生信息管理系統(tǒng)(借助Array?List)
這篇文章主要為大家詳細介紹了Java實現(xiàn)學生信息管理系統(tǒng),借助Array?List,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01
Mybatis Plus使用條件構造器增刪改查功能的實現(xiàn)方法
這篇文章主要介紹了Mybatis-Plus使用條件構造器增刪改查,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05
Spring?Security?基于URL的權限判斷源碼解析
這篇文章主要介紹了Spring?Security?基于URL的權限判斷問題,我們想要實現(xiàn)自己的基于請求Url的授權只需自定義一個?AccessDecisionManager即可,接下來跟隨小編一起看看實現(xiàn)代碼吧2021-12-12
Spring boot+VUE實現(xiàn)token驗證的示例代碼
本文詳細介紹了使用Vue和SpringBoot實現(xiàn)token認證的方法,包括前后端交互流程、后端依賴導入、token工具類、攔截器、跨域處理、前端路由守衛(wèi)、請求攔截器等內容,具有一定的參考價值,感興趣的可以了解一下2024-10-10

