Kafka之kafka-topics.sh的使用解讀
一、kafka的基本操作
1.1、創(chuàng)建topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
參數(shù)說明:
–create
是創(chuàng)建主題的的動作指令。–zookeeper
指定kafka所連接的zookeeper服務地址。–replicator-factor
指定了副本因子(即副本數(shù)量); 表示該topic需要在不同的broker中保存幾份,這里設置成1,表示在兩個broker中保存兩份Partitions分區(qū)數(shù)。–partitions
指定分區(qū)個數(shù);多通道,類似車道。–topic
指定所要創(chuàng)建主題的名稱,比如test。
成功則顯示:
Created topic "test".
1.2、查看topic
sh kafka-topics.sh --list --zookeeper localhost:2181
顯示:
test
1.3、查看topic屬性
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
顯示:
Topic:test?? ?PartitionCount:1?? ?ReplicationFactor:1?? ?Configs: ?? ?Topic: test?? ?Partition: 0?? ?Leader: 0?? ?Replicas: 0?? ?Isr: 0
1.4、發(fā)送消息
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
發(fā)送端輸入:
>hello >where are you >let's go
1.5、消費消息
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
消費端顯示:
hello where are you let's go ^CProcessed a total of 3 messages
二、kafka-topics.sh 使用方式
創(chuàng)建、修改、刪除以及查看等功能。
2.1、查看幫助
/bin目錄下的每一個腳本工具,都有著眾多的參數(shù)選項,不可能所有命令都記得住,這些腳本都可以使用 --help 參數(shù)來打印列出其所需的參數(shù)信息。
$ sh kafka-topics.sh --help Command must include exactly one action: --list, --describe, --create, --alter or --delete Option ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Description ? ? ? ? ? ? ? ? ? ? ? ? ? ? ------ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ----------- ? ? ? ? ? ? ? ? ? ? ? ? ? ? --alter ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Alter the number of partitions, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?replica assignment, and/or ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?configuration for the topic. ? ? ? ?? --config <String: name=value> ? ? ? ? ? ?A topic configuration override for the? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topic being created or altered.The ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?following is a list of valid ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?configurations: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?cleanup.policy ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?compression.type ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?delete.retention.ms ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?file.delete.delay.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?flush.messages ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?flush.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?follower.replication.throttled. ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?replicas ? ? ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?index.interval.bytes ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?leader.replication.throttled.replicas? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?max.message.bytes ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.downconversion.enable ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.format.version ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.timestamp.difference.max.ms ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?message.timestamp.type ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?min.cleanable.dirty.ratio ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?min.compaction.lag.ms ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?min.insync.replicas ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?preallocate ? ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?retention.bytes ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?retention.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.bytes ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.index.bytes ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.jitter.ms ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?segment.ms ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ?unclean.leader.election.enable ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?See the Kafka documentation for full ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?details on the topic configs. ? ? ? ? --create ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Create a new topic. ? ? ? ? ? ? ? ? ? ? --delete ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Delete a topic ? ? ? ? ? ? ? ? ? ? ? ?? --delete-config <String: name> ? ? ? ? ? A topic configuration override to be ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?removed for an existing topic (see ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?the list of configurations under the? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--config option). ? ? ? ? ? ? ? ? ? ? --describe ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? List details for the given topics. ? ?? --disable-rack-aware ? ? ? ? ? ? ? ? ? ? Disable rack aware replica assignment ? --force ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Suppress console prompts ? ? ? ? ? ? ?? --help ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Print usage information. ? ? ? ? ? ? ?? --if-exists ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if set when altering or deleting ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topics, the action will only execute? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if the topic exists ? ? ? ? ? ? ? ? ? --if-not-exists ? ? ? ? ? ? ? ? ? ? ? ? ?if set when creating topics, the ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?action will only execute if the ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topic does not already exist ? ? ? ?? --list ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? List all available topics. ? ? ? ? ? ?? --partitions <Integer: # of partitions> ?The number of partitions for the topic? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?being created or altered (WARNING: ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?If partitions are increased for a ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?topic that has a key, the partition ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?logic or ordering of the messages ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?will be affected ? ? ? ? ? ? ? ? ? ?? --replica-assignment <String: ? ? ? ? ? ?A list of manual partition-to-broker ?? ? broker_id_for_part1_replica1 : ? ? ? ? ? assignments for the topic being ? ? ? ? broker_id_for_part1_replica2 , ? ? ? ? ? created or altered. ? ? ? ? ? ? ? ? ? ? broker_id_for_part2_replica1 : ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? broker_id_for_part2_replica2 , ...> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? --replication-factor <Integer: ? ? ? ? ? The replication factor for each ? ? ? ? ? replication factor> ? ? ? ? ? ? ? ? ? ? ?partition in the topic being created. --topic <String: topic> ? ? ? ? ? ? ? ? ?The topic to be create, alter or ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?describe. Can also accept a regular ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?expression except for --create option --topics-with-overrides ? ? ? ? ? ? ? ? ?if set when describing topics, only ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?show topics that have overridden ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?configs ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --unavailable-partitions ? ? ? ? ? ? ? ? if set when describing topics, only ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?show partitions whose leader is not ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?available ? ? ? ? ? ? ? ? ? ? ? ? ? ? --under-replicated-partitions ? ? ? ? ? ?if set when describing topics, only ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?show under replicated partitions ? ?? --zookeeper <String: hosts> ? ? ? ? ? ? ?REQUIRED: The connection string for ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?the zookeeper connection in the form? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?host:port. Multiple hosts can be ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?given to allow fail-over. ? ? ? ?
2.2、副本數(shù)量規(guī)則
副本數(shù)量不能大于broker的數(shù)量。
kafka 創(chuàng)建主題的時候其副本數(shù)量不能大于broker的數(shù)量,否則創(chuàng)建主題 topic 失敗。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1
報錯:
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
注意:副本數(shù)量和分區(qū)數(shù)量的區(qū)別。
2.3、創(chuàng)建主題
創(chuàng)建主題時候,有3個參數(shù)是必填的:
–partitions
(分區(qū)數(shù)量)、–topic
(主題名) 、–replication-factor
(復制系數(shù)),
同時還需使用 --create 參數(shù)表明本次操作是想要創(chuàng)建一個主題操作。
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
返回顯示:
Created topic "test1".
另外在創(chuàng)建主題的時候,還可以附加以下兩個選項:–if-not-exists 和 --if-exists . 第一個參數(shù)表明僅當該主題不存在時候,創(chuàng)建; 第二個參數(shù)表明當修改或刪除這個主題時候,僅在該主題存在的時候去執(zhí)行操作。
2.4、查看broker上所有的主題
–list。
sh kafka-topics.sh --list --zookeeper localhost:2181
結果顯示:
__consumer_offsets
test
test1
2.5、查看指定主題 topic 的詳細信息
–describe。
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1?
結果顯示:
Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
2.6、修改主題信息之增加主題分區(qū)數(shù)量
–alter。
sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2
結果顯示:
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
查看主題信息:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
可以看到已經成功的將主題的分區(qū)數(shù)量從1修改為了2。
Topic:test1?? ?PartitionCount:2?? ?ReplicationFactor:1?? ?Configs: ?? ?Topic: test1?? ?Partition: 0?? ?Leader: 0?? ?Replicas: 0?? ?Isr: 0 ?? ?Topic: test1?? ?Partition: 1?? ?Leader: 0?? ?Replicas: 0?? ?Isr: 0
當去修改一個不存在的topic信息時(比如修改主題 test2,當前這主題是不存在的)。
sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2
會報錯:
Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181
at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
注意:不要使用 --alter 去嘗試減少分區(qū)的數(shù)量,如果非要減少分區(qū)的數(shù)量,只能刪除整個主題 topic, 然后重新創(chuàng)建。
2.7、刪除主題
–delete。
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
日志信息提示,主題 test1已經被標記刪除狀態(tài),但是若delete.topic.enable 沒有設置為 true , 則將不會有任何作用。
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
可以測試一些:
# 一個終端啟動生產者: sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1 # 另一個終端啟動消費者: sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning
發(fā)現(xiàn)此時還是可以發(fā)送消息和接收消息。如果要支持能夠刪除主題的操作,則需要在 /bin 的同級目錄 /config目錄下的文件server.properties中,修改配置delete.topic.enable=true(如果置為false,則kafka broker 是不允許刪除主題的)。
然后就重啟kafka:
# 停止: sh kafka-server-stop.sh -daemon ../config/server.properties # 啟動: sh kafka-server-start.sh -daemon ../config/server.properties
再次刪除就可以了。
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
SpringBoot 配置文件中配置的中文,程序讀取出來是亂碼的解決
這篇文章主要介紹了SpringBoot 配置文件中配置的中文,程序讀取出來是亂碼的解決,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09java基于jcifs.smb實現(xiàn)遠程發(fā)送文件到服務器
這篇文章主要介紹了java基于jcifs.smb實現(xiàn)遠程發(fā)送文件到服務器,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01