kafka安裝部署超詳細步驟
概述
Kafka是最初由Linkedin公司開發(fā),是一個分布式、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當做MQ系統(tǒng)),常見可以用于web/nginx日志、訪問日志,消息服務(wù)等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
主要應(yīng)用場景是:日志收集系統(tǒng)和消息系統(tǒng)。
Kafka主要設(shè)計目標如下:
- 以時間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。
- 支持Kafka Server間的消息分區(qū),及分布式消費,同時保證每個partition內(nèi)的消息順序傳輸。
- 同時支持離線數(shù)據(jù)處理和實時數(shù)據(jù)處理。
- Scale out:支持在線水平擴展
Step 1: 下載代碼
你可以登錄Apache kafka 官方下載。
http://kafka.apache.org/downloads.html
備注:2.11-1.1.0
版本才與JDK1.7
兼容,否則更高版本需要JDK1.8
Step 2: 啟動服務(wù)
運行kafka需要使用Zookeeper,所以你需要先啟動Zookeeper,如果你沒有Zookeeper,你可以使用kafka自帶打包和配置好的Zookeeper(PS:在kafka包里)。
//這是前臺啟動,啟動以后,當前就無法進行其他操作(不推薦) ./zookeeper-server-start.sh ../config/zookeeper.properties //后臺啟動(推薦) ./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &
現(xiàn)在啟動kafka
config/server1.properties: broker.id=0 listeners=PLAINTEXT://192.168.10.130:9092 log.dirs=kafka-logs zookeeper.connect=localhost:2181
//后臺啟動kafka ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
Step 3:創(chuàng)建一個主題
??創(chuàng)建一個名為“test”的Topic,只有一個分區(qū)和備份(2181是zookeeper的默認端口)
./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
命令解析: --create: 指定創(chuàng)建topic動作 --topic:指定新建topic的名稱 --zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣 --config:指定當前topic上有效的參數(shù)值,參數(shù)列表參考文檔為: http://kafka.apache.org/082/documentation.html#brokerconfigs --partitions:指定當前創(chuàng)建的kafka分區(qū)數(shù)量,默認為1個 --replication-factor:指定每個分區(qū)的復(fù)制因子個數(shù),默認1個
創(chuàng)建好之后,可以通過運行以下命令,查看已創(chuàng)建的topic信息:
>./kafka-topics.sh --list --zookeeper localhost:2181 test
??或者,除了手工創(chuàng)建topic外,你也可以配置你的broker,當發(fā)布一個不存在的topic時自動創(chuàng)建topic。
??補充:
(1)查看對應(yīng)topic的描述信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test0
命令解析: --describe: 指定是展示詳細信息命令 --zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣 --topic:指定需要展示數(shù)據(jù)的topic名稱
??(2)Topic信息修改
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --config max.message.bytes=128000 bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --delete-config max.message.bytes bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 10 bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 3 ## Kafka分區(qū)數(shù)量只允許增加,不允許減少
??(3)Topic刪除
默認情況下Kafka的Topic是沒法直接刪除的,需要進行相關(guān)參數(shù)配置
bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181
加粗樣式
Note: This will have no impact if delete.topic.enable is not set to true
.## 默認情況下,刪除是標記刪除,沒有實際刪除這個Topic;如果運行刪除Topic,兩種方式:
方式一:通過delete命令刪除后,手動將本地磁盤以及zk上的相關(guān)topic的信息刪除即可
方式二:配置server.properties文件,給定參數(shù)delete.topic.enable=true,重啟kafka服務(wù),此時執(zhí)行delete命令表示允許進行Topic的刪除
Step 4: 發(fā)送消息
??Kafka
提供了一個命令行的工具,可以從輸入文件或者命令行中讀取消息并發(fā)送給Kafka
集群。每一行是一條消息。
??運行producer(生產(chǎn)者),然后在控制臺輸入幾條消息到服務(wù)器。
??備注:這里的localhost:9092不是固定的,需要根據(jù)server.properties中配置的地址來寫這里的地址!
[root@administrator bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test >this is a message >this is another message //按`Ctrl+C`終止輸入
Step 5: 消費消息
??Kafka也提供了一個消費消息的命令行工具,將存儲的信息輸出出來。
??備注:這里的localhost:9092不是固定的,需要根據(jù)server.properties中配置的地址來寫這里的地址!
[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning this is a message this is another message //按`Ctrl+C`終止讀取消息
??如果你有2臺不同的終端上運行上述命令,那么當你在運行生產(chǎn)者時,消費者就能消費到生產(chǎn)者發(fā)送的消息。
Step 6: 設(shè)置多個broker集群(單機偽集群的配置)
??到目前,我們只是單一的運行一個broker,沒什么意思。對于Kafka,一個broker僅僅只是一個集群的大小,所有讓我們多設(shè)幾個broker。
??首先為每個broker創(chuàng)建一個配置文件:
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
現(xiàn)在編輯這些新建的文件,設(shè)置以下屬性:
vim config/server.properties
config/server1.properties: broker.id=0 listeners=PLAINTEXT://192.168.10.130:9092 log.dirs=kafka-logs zookeeper.connect=localhost:2181 config/server-1.properties: broker.id=1 listeners=PLAINTEXT://192.168.10.130:9093 log.dirs=kafka-logs-1 zookeeper.connect=localhost:2181 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://192.168.10.130:9094 log.dirs=kafka-logs-2 zookeeper.connect=localhost:2181
??備注1:listeners
一定要配置成為IP地址;如果配置為localhost
或服務(wù)器的hostname
,在使用java
發(fā)送數(shù)據(jù)時就會拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。
因為在沒有配置advertised.host.name
的情況下,Kafka
并沒有像官方文檔宣稱的那樣改為廣播我們配置的host.name
,而是廣播了主機配置的hostname
。遠端的客戶端并沒有配置 hosts
,所以自然是連接不上這個hostname
的。
??備注2:當使用java客戶端訪問遠程的kafka時,一定要把集群中所有的端口打開,否則會連接超時
/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT /sbin/iptables -I INPUT -p tcp --dport 9093 -j ACCEPT /sbin/iptables -I INPUT -p tcp --dport 9094 -j ACCEPT /etc/rc.d/init.d/iptables save
??broker.id
是集群中每個節(jié)點的唯一且永久的名稱,我們修改端口和日志目錄是因為我們現(xiàn)在在同一臺機器上運行,我們要防止broker
在同一端口上注冊和覆蓋對方的數(shù)據(jù)。
??我們已經(jīng)運行了zookeeper
和剛才的一個kafka
節(jié)點,所有我們只需要在啟動2個新的kafka
節(jié)點。
./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 & ./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &
??現(xiàn)在,我們創(chuàng)建一個新topic,把備份設(shè)置為:3
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
??好了,現(xiàn)在我們已經(jīng)有了一個集群了,我們怎么知道每個集群在做什么呢?運行命令“describe topics”
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //所有分區(qū)的摘要 Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: //提供一個分區(qū)信息,因為我們只有一個分區(qū),所以只有一行。 Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
- “l(fā)eader”:該節(jié)點負責該分區(qū)的所有的讀和寫,每個節(jié)點的leader都是隨機選擇的。
- “replicas”:備份的節(jié)點列表,無論該節(jié)點是否是leader或者目前是否還活著,只是顯示。
- “isr”:“同步備份”的節(jié)點列表,也就是活著的節(jié)點并且正在同步leader
??其中Replicas
和Isr
中的1,2,0
就對應(yīng)著3個broker
他們的broker.id
屬性!
??我們運行這個命令,看看一開始我們創(chuàng)建的那個節(jié)點:
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
??這并不奇怪,剛才創(chuàng)建的主題沒有Replicas,并且在服務(wù)器“0”上,我們創(chuàng)建它的時候,集群中只有一個服務(wù)器,所以是“0”。
Step 7: 測試集群的容錯能力
7.1發(fā)布消息到集群
[root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic >cluster message 1 >cluster message 2 //Ctrl+C終止產(chǎn)生消息
7.2消費消息
[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topic cluster message 1 cluster message 2 //Ctrl+C終止消費消息
7.3干掉leader,測試集群容錯
首先查詢誰是leader
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //所有分區(qū)的摘要 Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: //提供一個分區(qū)信息,因為我們只有一個分區(qū),所以只有一行。 Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
??可以看到Leader
的broker.id
為1
,找到對應(yīng)的Broker
[root@administrator bin]# jps -m 5130 Kafka ../config/server.properties 4861 QuorumPeerMain ../config/zookeeper.properties 1231 Bootstrap start start 7420 Kafka ../config/server-2.properties 7111 Kafka ../config/server-1.properties 9139 Jps -m
??通過以上查詢到Leader
的PID
(Kafka ../config/server-1.properties
)為7111
,殺掉該進程
//殺掉該進程 kill -9 7111 //再查詢一下,確認新的Leader已經(jīng)產(chǎn)生,新的Leader為broker.id=0 [root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: //備份節(jié)點之一成為新的leader,而broker1已經(jīng)不在同步備份集合里了 Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
7.4再次消費消息,確認消息沒有丟失
[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic cluster message 1 cluster message 2
??消息依然存在,故障轉(zhuǎn)移成功??!
到此這篇關(guān)于kafka安裝部署的文章就介紹到這了,更多相關(guān)kafka安裝部署內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Intellij IDEA Debug調(diào)試技巧(小結(jié))
這篇文章主要介紹了Intellij IDEA Debug調(diào)試技巧(小結(jié)),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10深入理解Java SpringCloud Ribbon 負載均衡
Ribbon是一個客戶端負載均衡器,它提供了對HTTP和TCP客戶端的行為的大量控制。這篇文章主要介紹了SpringCloud Ribbon 負載均衡的實現(xiàn),感興趣的小伙伴們可以參考一下2021-09-09關(guān)于synchronized的參數(shù)及其含義
這篇文章主要介紹了synchronized的參數(shù)及其含義詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10Springboot+WebSocket+Netty實現(xiàn)在線聊天/群聊系統(tǒng)
這篇文章主要實現(xiàn)在好友添加、建群、聊天對話、群聊功能,使用Java作為后端語言進行支持,界面友好,開發(fā)簡單,文章中有詳細的代碼示例供大家參考,需要的朋友可以參考下2023-08-08