Kafka安裝部署+go整合過程
1、Kafka的安裝
1、下載與安裝Kafka
Kafka官網(wǎng)https://Kafka.apache.org/downloads
所以這里推薦的版本是 : https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
將下載下來的安裝包直接解壓到一個路徑下即可完成Kafka的安裝,這里統(tǒng)一將Kafka安裝到/usr/local目錄下
基本操作過程如下:
mkdir -p /www/kuangstudy cd /www/kuangstudy wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz tar -zxvf kafka_2.12-2.7.2.tgz -C /usr/local/ mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka #新建存放日志和數(shù)據(jù)的文件夾 mkdir /usr/local/kafka/logs
這里我們將Kafka安裝到了/usr/local目錄下。
2、配置Kafka
這里將Kafka安裝到/usr/local目錄下
因此,Kafka的主配置文件為/usr/local/Kafka/config/server.properties,這里以節(jié)點Kafkazk1為例,重點介紹一些常用配置項的含義:
broker.id=1 listeners=PLAINTEXT://127.0.0.1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/Kafka/logs num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 #不是集群,所以可以寫成localhost #zookeeper.connect=127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true delete.topic.enable=true
每個配置項含義如下:
broker.id
:每一個broker在集群中的唯一表示,要求是正數(shù)。當(dāng)該服務(wù)器的IP地址發(fā)生改變時,broker.id沒有變化,則不會影響consumers的消息情況。listeners
:設(shè)置Kafka的監(jiān)聽地址與端口,可以將監(jiān)聽地址設(shè)置為主機(jī)名或IP地址,這里將監(jiān)聽地址設(shè)置為IP地址。log.dirs
:這個參數(shù)用于配置Kafka保存數(shù)據(jù)的位置,Kafka中所有的消息都會存在這個目錄下。可以通過逗號來指定多個路徑, Kafka會根據(jù)最少被使用的原則選擇目錄分配新的parition。需要注意的是,Kafka在分配parition的時候選擇的規(guī)則不是按照磁盤的空間大小來定的,而是根據(jù)分配的 parition的個數(shù)多小而定。num.partitions
:這個參數(shù)用于設(shè)置新創(chuàng)建的topic有多少個分區(qū),可以根據(jù)消費者實際情況配置,配置過小會影響消費性能。這里配置6個。log.retention.hours
:這個參數(shù)用于配置Kafka中消息保存的時間,還支持log.retention.minutes和 log.retention.ms配置項。這三個參數(shù)都會控制刪除過期數(shù)據(jù)的時間,推薦使用log.retention.ms。如果多個同時設(shè)置,那么會選擇最小的那個。log.segment.bytes
:配置partition中每個segment數(shù)據(jù)文件的大小,默認(rèn)是1GB,超過這個大小會自動創(chuàng)建一個新的segment file。
zookeeper.connect
:這個參數(shù)用于指定zookeeper所在的地址,它存儲了broker的元信息。 這個值可以通過逗號設(shè)置多個值,每個值的格式均為:hostname:port/path,每個部分的含義如下:
- hostname:表示zookeeper服務(wù)器的主機(jī)名或者IP地址,這里設(shè)置為IP地址。
- port: 表示是zookeeper服務(wù)器監(jiān)聽連接的端口號。
- /path:表示Kafka在zookeeper上的根目錄。如果不設(shè)置,會使用根目錄。
auto.create.topics.enable
:這個參數(shù)用于設(shè)置是否自動創(chuàng)建topic,如果請求一個topic時發(fā)現(xiàn)還沒有創(chuàng)建, Kafka會在broker上自動創(chuàng)建一個topic,如果需要嚴(yán)格的控制topic的創(chuàng)建,那么可以設(shè)置auto.create.topics.enable為false,禁止自動創(chuàng)建topic。
delete.topic.enable
:在0.8.2版本之后,Kafka提供了刪除topic的功能,但是默認(rèn)并不會直接將topic數(shù)據(jù)物理刪除。如果要從物理上刪除(即刪除topic后,數(shù)據(jù)文件也會一同刪除),就需要設(shè)置此配置項為true。
3、添加環(huán)境變量
$ vim /etc/profile export kafka_HOME=/usr/local/kafka export PATH=$PATH:$kafka_HOME/bin #生效 $ source /etc/profile
zookeeper服務(wù)的啟動
cd /usr/local/kafka/bin # 占用啟動 ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & # 后臺啟動 nohup ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
4、Kafka啟動腳本
$ vim /usr/lib/systemd/system/kafka.service [Unit] Description=Apache kafka server (broker) After=network.target zookeeper.service [Service] Type=simple User=root Group=root ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target
systemctl daemon-reload
5、啟動Kafka
在啟動Kafka集群前,需要確保ZooKeeper集群已經(jīng)正常啟動。接著,依次在Kafka各個節(jié)點上執(zhí)行如下命令即可:
$ cd /usr/local/kafka $ nohup bin/kafka-server-start.sh config/server.properties & # 或者 $ systemctl start kafka $ jps 21840 kafka 15593 Jps 15789 QuorumPeerMain
這里將Kafka放到后臺運行,啟動后,會在啟動Kafka的當(dāng)前目錄下生成一個nohup.out文件,可通過此文件查看Kafka的啟動和運行狀態(tài)。通過jps指令,可以看到有個Kafka標(biāo)識,這是Kafka進(jìn)程成功啟動的標(biāo)志。
6、測試Kafka基本命令操作
kefka提供了多個命令用于查看、創(chuàng)建、修改、刪除topic信息,也可以通過命令測試如何生產(chǎn)消息、消費消息等,這些命令位于Kafka安裝目錄的bin目錄下,這里是/usr/local/Kafka/bin。
登錄任意一臺Kafka集群節(jié)點,切換到此目錄下,即可進(jìn)行命令操作。
下面列舉Kafka的一些常用命令的使用方法。
(1)顯示topic列表
#kafka-topics.sh --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --list $ kafka-topics.sh --zookeeper 127.0.0.1:2181 --list topic123
(2)創(chuàng)建一個topic,并指定topic屬性(副本數(shù)、分區(qū)數(shù)等)
#kafka-topics.sh --create --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123 $ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic topic123 Created topic topic123. #--replication-factor表示指定副本的個數(shù)
(3)查看某個topic的狀態(tài)
#kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123 $ kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic123 Topic: topic123 PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: topic123 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: topic123 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: topic123 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
(4)生產(chǎn)消息 阻塞狀態(tài)
#kafka-console-producer.sh --broker-list 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 $ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic123
(5)消費消息 阻塞狀態(tài)
#kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 $ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123 #從頭開始消費消息 #Kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123 --from-beginning $ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning
(6)刪除topic
#kafka-topics.sh --delete --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123 $ kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topic_
2、GO整合Kafka實現(xiàn)消息發(fā)送和訂閱
2.1 消息生產(chǎn)代碼示例
package main import ( "fmt" "github.com/IBM/sarama" ) func main() { // 配置生產(chǎn)者信息 conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll // 生產(chǎn)者等待所有分區(qū)副本成功提交消息 conf.Producer.Return.Successes = true // 成功消息寫入返回 client, err := sarama.NewSyncProducer([]string{"47.115.230.36:9092"}, conf) if nil != err { fmt.Println("create Kafka sync producer failed", err) return } defer client.Close() msg := &sarama.ProducerMessage{ Topic: "topic123", // 指定消息主題 Value: sarama.StringEncoder("hello world"), // 構(gòu)造消息 } // 發(fā)送消息 _, _, err = client.SendMessage(msg) if nil != err { fmt.Println("send message to Kafka failed", err) return } fmt.Println("send message success") }
2.2 消息消費代碼示例
package main import ( "fmt" "github.com/IBM/sarama" ) /** * @desc 生產(chǎn)者 * @author feige * @date 2023-11-15 * @version 1.0 */ func main() { // 創(chuàng)建一個消費者 consumer, err := sarama.NewConsumer([]string{"47.115.230.36:9092"}, nil) if err != nil { fmt.Println("消費者kafka連接服務(wù)失敗,失敗的原因:", err) return } // 從topic123這個主題去獲取消息 partitions, err := consumer.Partitions("topic123") if err != nil { fmt.Println("主題獲取失敗,失敗的原因:", err) return } fmt.Println(partitions) // 開始遍歷分區(qū)中的消息,開始進(jìn)行消費 for _, partition := range partitions { pc, err := consumer.ConsumePartition("topic123", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Println("分區(qū)數(shù)據(jù)獲取失敗,失敗的原因:", err) return } defer pc.AsyncClose() // 開始異步獲取消息 go func(sarama.PartitionConsumer) { for message := range pc.Messages() { fmt.Printf("當(dāng)前消費的分區(qū)是:%d,offset:%d,key:%v,消息的內(nèi)容是:%v", message.Partition, message.Offset, message.Key, string(message.Value)) fmt.Println("") } }(pc) } // 阻塞讓消費一直處于監(jiān)聽狀態(tài) select {} }
2.3 創(chuàng)建主題代碼示例
package main import ( "fmt" "github.com/Shopify/sarama" ) func CreateTopic(addrs []string, topic string) bool { config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 // 設(shè)置客戶端版本 config.Admin.Timeout = 3 * time.Second // 設(shè)置Admin請求超時時間 admin, err := sarama.NewClusterAdmin(addrs, config) if err!= nil { return false } defer admin.Close() err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: 3, ReplicationFactor: 2}, false) if err == nil { fmt.Println("success create topic:", topic) } else { fmt.Println("failed create topic:", topic) } return err == nil }
2.4 性能測試結(jié)果
Kafka目前已經(jīng)成為云計算領(lǐng)域中的“事件驅(qū)動”架構(gòu)、微服務(wù)架構(gòu)中的主要消息隊列,隨著越來越多的公司和組織開始采用Kafka作為基礎(chǔ)消息隊列技術(shù),越來越多的性能測試報告也陸續(xù)出來。筆者提前做了一輪性能測試,并發(fā)現(xiàn)它的消費性能比其它消息隊列還要好,甚至更好些。下面是測試結(jié)果:
測試環(huán)境:
- 操作系統(tǒng):Ubuntu 16.04
- CPU:Intel® Xeon® Gold 6148 CPU @ 2.40GHz
- 內(nèi)存:128G DDR4 ECC
- Kafka集群:3節(jié)點,每節(jié)點配置6個CPU、32G內(nèi)存、SSD
- 測試用例:生產(chǎn)者每秒鐘發(fā)送2萬條消息,消費者每秒鐘消費100條消息。
測試結(jié)果:
Kafka消費者
每秒消費100條消息,平均耗時:67毫秒
每秒消費1000條消息,平均耗時:6.7毫秒
RabbitMQ消費者
每秒消費100條消息,平均耗時:1038毫秒
每秒消費1000條消息,平均耗時:10.38毫秒
3、參考
github.com/Shopify/sarama github.com/bsm/sarama-cluster
生產(chǎn)者
import ( "fmt" "math/rand" "os" "strconv" "strings" "time" "github.com/Shopify/sarama" "github.com/golang/glog" ) //同步生產(chǎn)者 func Produce() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。 config.Producer.Partitioner = sarama.NewRandomPartitioner //寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個分區(qū) config.Producer.Return.Successes = true msg := &sarama.ProducerMessage{} msg.Topic = `test0` msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"Kafka_master:9092"}, config) if err != nil { fmt.Println("producer close err, ", err) return } defer client.Close() pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed, ", err) return } fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset) } //異步生產(chǎn)者 func AsyncProducer() { var topics = "test0" config := sarama.NewConfig() config.Producer.Return.Successes = true //必須有這個選項 config.Producer.Timeout = 5 * time.Second p, err := sarama.NewAsyncProducer(strings.Split("Kafka_master:9092", ","), config) defer p.Close() if err != nil { return } //這個部分一定要寫,不然通道會被堵塞 go func(p sarama.AsyncProducer) { errors := p.Errors() success := p.Successes() for { select { case err := <-errors: if err != nil { glog.Errorln(err) } case <-success: } } }(p) for { v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := &sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() <- msg time.Sleep(time.Second * 1) } }
消費者
package consumer import ( "fmt" "strings" "sync" "time" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" "github.com/golang/glog" ) //單個消費者 func Consumer() { var wg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"Kafka_master:9092"}, nil) if err != nil { fmt.Println("Failed to start consumer: %s", err) return } partitionList, err := consumer.Partitions("test0") //獲得該topic所有的分區(qū) if err != nil { fmt.Println("Failed to get the list of partition:, ", err) return } for partition := range partitionList { pc, err := consumer.ConsumePartition("test0", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err) return } wg.Add(1) go func(sarama.PartitionConsumer) { //為每個分區(qū)開一個go協(xié)程去取值 for msg := range pc.Messages() { //阻塞直到有值發(fā)送過來,然后再繼續(xù)等待 fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } defer pc.AsyncClose() wg.Done() }(pc) } wg.Wait() } //消費組 func ConsumerGroup() { groupID := "test-consumer-group" config := cluster.NewConfig() config.Group.Return.Notifications = true config.Consumer.Offsets.CommitInterval = 1 * time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始從最新的offset開始 c, err := cluster.NewConsumer(strings.Split("Kafka_master:9092", ","), groupID, strings.Split("test0", ","), config) if err != nil { glog.Errorf("Failed open consumer: %v", err) return } defer c.Close() go func(c *cluster.Consumer) { errors := c.Errors() noti := c.Notifications() for { select { case err := <-errors: glog.Errorln(err) case <-noti: } } }(c) for msg := range c.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) c.MarkOffset(msg, "") //MarkOffset 并不是實時寫入Kafka,有可能在程序crash時丟掉未提交的offset } }
主函數(shù)
package main import ( "strom-huang-go/go_Kafka/consumer" ) func main() { // produce.AsyncProducer() consumer.Consumer() }
到此這篇關(guān)于Kafka安裝部署+go整合的文章就介紹到這了,更多相關(guān)Kafka安裝部署內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言map與string的相互轉(zhuǎn)換的實現(xiàn)
這篇文章主要介紹了go語言map與string的相互轉(zhuǎn)換的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04go實現(xiàn)grpc四種數(shù)據(jù)流模式
這篇文章主要為大家介紹了go實現(xiàn)grpc四種數(shù)據(jù)流模式,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別介紹
這篇文章主要給大家介紹了關(guān)于Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別的相關(guān)資料,文中先對GO語言結(jié)構(gòu)體方法跟結(jié)構(gòu)體指針方法的區(qū)別進(jìn)行了一些簡單的介紹,來幫助大家理解學(xué)習(xí),需要的朋友可以參考下。2017-12-12