Kafka之kafka-topics.sh的使用解讀
一、kafka的基本操作
1.1、創(chuàng)建topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
參數(shù)說明:
–create
是創(chuàng)建主題的的動(dòng)作指令。–zookeeper
指定kafka所連接的zookeeper服務(wù)地址。–replicator-factor
指定了副本因子(即副本數(shù)量); 表示該topic需要在不同的broker中保存幾份,這里設(shè)置成1,表示在兩個(gè)broker中保存兩份Partitions分區(qū)數(shù)。–partitions
指定分區(qū)個(gè)數(shù);多通道,類似車道。–topic
指定所要?jiǎng)?chuàng)建主題的名稱,比如test。
成功則顯示:
Created topic "test".
1.2、查看topic
sh kafka-topics.sh --list --zookeeper localhost:2181
顯示:
test
1.3、查看topic屬性
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
顯示:
Topic:test?? ?PartitionCount:1?? ?ReplicationFactor:1?? ?Configs: ?? ?Topic: test?? ?Partition: 0?? ?Leader: 0?? ?Replicas: 0?? ?Isr: 0
1.4、發(fā)送消息
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
發(fā)送端輸入:
>hello >where are you >let's go
1.5、消費(fèi)消息
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
消費(fèi)端顯示:
hello where are you let's go ^CProcessed a total of 3 messages
二、kafka-topics.sh 使用方式
創(chuàng)建、修改、刪除以及查看等功能。
2.1、查看幫助
/bin目錄下的每一個(gè)腳本工具,都有著眾多的參數(shù)選項(xiàng),不可能所有命令都記得住,這些腳本都可以使用 --help 參數(shù)來打印列出其所需的參數(shù)信息。
$ sh kafka-topics.sh --help Command must include exactly one action: --list, --describe, --create, --alter or --delete Option ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Description ? ? ? ? ? ? ? ? ? ? ? ? ? ? ------ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ----------- ? ? ? ? ? ? ? ? ? ? ? ? ? ? --alter ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Alter the number of partitions, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?replica assignment, and/or ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?configuration for the topic. ? ? ? ?? --config <String: name=value> ? ? ? ? ? ?A topic configuration override for the? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topic being created or altered.The ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?following is a list of valid ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?configurations: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?cleanup.policy ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?compression.type ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?delete.retention.ms ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?file.delete.delay.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?flush.messages ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?flush.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?follower.replication.throttled. ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?replicas ? ? ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?index.interval.bytes ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?leader.replication.throttled.replicas? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?max.message.bytes ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.downconversion.enable ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.format.version ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.timestamp.difference.max.ms ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.timestamp.type ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?min.cleanable.dirty.ratio ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?min.compaction.lag.ms ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?min.insync.replicas ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?preallocate ? ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?retention.bytes ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?retention.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.bytes ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.index.bytes ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.jitter.ms ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?unclean.leader.election.enable ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?See the Kafka documentation for full ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?details on the topic configs. ? ? ? ? --create ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Create a new topic. ? ? ? ? ? ? ? ? ? ? --delete ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Delete a topic ? ? ? ? ? ? ? ? ? ? ? ?? --delete-config <String: name> ? ? ? ? ? A topic configuration override to be ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?removed for an existing topic (see ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?the list of configurations under the? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--config option). ? ? ? ? ? ? ? ? ? ? --describe ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? List details for the given topics. ? ?? --disable-rack-aware ? ? ? ? ? ? ? ? ? ? Disable rack aware replica assignment ? --force ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Suppress console prompts ? ? ? ? ? ? ?? --help ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Print usage information. ? ? ? ? ? ? ?? --if-exists ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if set when altering or deleting ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topics, the action will only execute? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if the topic exists ? ? ? ? ? ? ? ? ? --if-not-exists ? ? ? ? ? ? ? ? ? ? ? ? ?if set when creating topics, the ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?action will only execute if the ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topic does not already exist ? ? ? ?? --list ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? List all available topics. ? ? ? ? ? ?? --partitions <Integer: # of partitions> ?The number of partitions for the topic? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?being created or altered (WARNING: ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?If partitions are increased for a ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topic that has a key, the partition ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?logic or ordering of the messages ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?will be affected ? ? ? ? ? ? ? ? ? ?? --replica-assignment <String: ? ? ? ? ? ?A list of manual partition-to-broker ?? ? broker_id_for_part1_replica1 : ? ? ? ? ? assignments for the topic being ? ? ? ? broker_id_for_part1_replica2 , ? ? ? ? ? created or altered. ? ? ? ? ? ? ? ? ? ? broker_id_for_part2_replica1 : ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? broker_id_for_part2_replica2 , ...> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? --replication-factor <Integer: ? ? ? ? ? The replication factor for each ? ? ? ? ? replication factor> ? ? ? ? ? ? ? ? ? ? ?partition in the topic being created. --topic <String: topic> ? ? ? ? ? ? ? ? ?The topic to be create, alter or ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?describe. Can also accept a regular ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?expression except for --create option --topics-with-overrides ? ? ? ? ? ? ? ? ?if set when describing topics, only ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?show topics that have overridden ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?configs ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --unavailable-partitions ? ? ? ? ? ? ? ? if set when describing topics, only ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?show partitions whose leader is not ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?available ? ? ? ? ? ? ? ? ? ? ? ? ? ? --under-replicated-partitions ? ? ? ? ? ?if set when describing topics, only ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?show under replicated partitions ? ?? --zookeeper <String: hosts> ? ? ? ? ? ? ?REQUIRED: The connection string for ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?the zookeeper connection in the form? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?host:port. Multiple hosts can be ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?given to allow fail-over. ? ? ? ?
2.2、副本數(shù)量規(guī)則
副本數(shù)量不能大于broker的數(shù)量。
kafka 創(chuàng)建主題的時(shí)候其副本數(shù)量不能大于broker的數(shù)量,否則創(chuàng)建主題 topic 失敗。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1
報(bào)錯(cuò):
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
注意:副本數(shù)量和分區(qū)數(shù)量的區(qū)別。
2.3、創(chuàng)建主題
創(chuàng)建主題時(shí)候,有3個(gè)參數(shù)是必填的:
–partitions
(分區(qū)數(shù)量)、–topic
(主題名) 、–replication-factor
(復(fù)制系數(shù)),
同時(shí)還需使用 --create 參數(shù)表明本次操作是想要?jiǎng)?chuàng)建一個(gè)主題操作。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
返回顯示:
Created topic "test1".
另外在創(chuàng)建主題的時(shí)候,還可以附加以下兩個(gè)選項(xiàng):–if-not-exists 和 --if-exists . 第一個(gè)參數(shù)表明僅當(dāng)該主題不存在時(shí)候,創(chuàng)建; 第二個(gè)參數(shù)表明當(dāng)修改或刪除這個(gè)主題時(shí)候,僅在該主題存在的時(shí)候去執(zhí)行操作。
2.4、查看broker上所有的主題
–list。
sh kafka-topics.sh --list --zookeeper localhost:2181
結(jié)果顯示:
__consumer_offsets
test
test1
2.5、查看指定主題 topic 的詳細(xì)信息
–describe。
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1?
結(jié)果顯示:
Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2.6、修改主題信息之增加主題分區(qū)數(shù)量
–alter。
sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2
結(jié)果顯示:
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
查看主題信息:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
可以看到已經(jīng)成功的將主題的分區(qū)數(shù)量從1修改為了2。
Topic:test1?? ?PartitionCount:2?? ?ReplicationFactor:1?? ?Configs: ?? ?Topic: test1?? ?Partition: 0?? ?Leader: 0?? ?Replicas: 0?? ?Isr: 0 ?? ?Topic: test1?? ?Partition: 1?? ?Leader: 0?? ?Replicas: 0?? ?Isr: 0
當(dāng)去修改一個(gè)不存在的topic信息時(shí)(比如修改主題 test2,當(dāng)前這主題是不存在的)。
sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2
會(huì)報(bào)錯(cuò):
Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181
at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
注意:不要使用 --alter 去嘗試減少分區(qū)的數(shù)量,如果非要減少分區(qū)的數(shù)量,只能刪除整個(gè)主題 topic, 然后重新創(chuàng)建。
2.7、刪除主題
–delete。
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
日志信息提示,主題 test1已經(jīng)被標(biāo)記刪除狀態(tài),但是若delete.topic.enable 沒有設(shè)置為 true , 則將不會(huì)有任何作用。
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
可以測(cè)試一些:
# 一個(gè)終端啟動(dòng)生產(chǎn)者: sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1 # 另一個(gè)終端啟動(dòng)消費(fèi)者: sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning
發(fā)現(xiàn)此時(shí)還是可以發(fā)送消息和接收消息。如果要支持能夠刪除主題的操作,則需要在 /bin 的同級(jí)目錄 /config目錄下的文件server.properties中,修改配置delete.topic.enable=true(如果置為false,則kafka broker 是不允許刪除主題的)。
然后就重啟kafka:
# 停止: sh kafka-server-stop.sh -daemon ../config/server.properties # 啟動(dòng): sh kafka-server-start.sh -daemon ../config/server.properties
再次刪除就可以了。
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot 圖形驗(yàn)證碼的生成和校驗(yàn)
隨著系統(tǒng)和業(yè)務(wù)的不停升級(jí),前后端代碼放在一起的項(xiàng)目越來越臃腫,已經(jīng)無法快速迭代和職責(zé)區(qū)分了,于是紛紛投入了前后端分離的懷抱,發(fā)現(xiàn)代碼和職責(zé)分離以后,開發(fā)效率越來越高了,但是以前的驗(yàn)證碼登錄方案就要更改了。本文來看一下SpringBoot 圖形驗(yàn)證碼的生成和校驗(yàn)2021-05-05SpringBoot 配置文件中配置的中文,程序讀取出來是亂碼的解決
這篇文章主要介紹了SpringBoot 配置文件中配置的中文,程序讀取出來是亂碼的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09靈活控制任務(wù)執(zhí)行時(shí)間的Cron表達(dá)式范例
這篇文章主要為大家介紹了靈活控制任務(wù)執(zhí)行時(shí)間的Cron表達(dá)式范例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10java基于jcifs.smb實(shí)現(xiàn)遠(yuǎn)程發(fā)送文件到服務(wù)器
這篇文章主要介紹了java基于jcifs.smb實(shí)現(xiàn)遠(yuǎn)程發(fā)送文件到服務(wù)器,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01SpringBoot整合rockerMQ消息隊(duì)列詳解
今天和大家一起深入生產(chǎn)級(jí)別消息中間件 - RocketMQ 的內(nèi)核實(shí)現(xiàn),來看看真正落地能支撐萬億級(jí)消息容量、低延遲的消息隊(duì)列到底是如何設(shè)計(jì)的。我會(huì)先介紹整體的架構(gòu)設(shè)計(jì),然后再深入各核心模塊的詳細(xì)設(shè)計(jì)、核心流程的剖析2022-07-07springboot?實(shí)戰(zhàn):異常與重定向問題
這篇文章主要介紹了springboot實(shí)戰(zhàn):異常與重定向問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12