欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka安裝部署+go整合過程

 更新時間:2024年08月02日 10:35:47   作者:我不是你的夢  
go語言是一種快速、分布式、并發(fā)編程的語言,它天生適合于處理高并發(fā)場景下的消息傳遞和處理,在本文中,我們將介紹Kafka安裝部署+go整合過程,感興趣的朋友一起看看吧

1、Kafka的安裝

1、下載與安裝Kafka

Kafka官網(wǎng)https://Kafka.apache.org/downloads

所以這里推薦的版本是 : https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz

將下載下來的安裝包直接解壓到一個路徑下即可完成Kafka的安裝,這里統(tǒng)一將Kafka安裝到/usr/local目錄下

基本操作過程如下:

mkdir -p /www/kuangstudy
cd /www/kuangstudy
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
tar -zxvf kafka_2.12-2.7.2.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka
#新建存放日志和數(shù)據(jù)的文件夾
mkdir /usr/local/kafka/logs

這里我們將Kafka安裝到了/usr/local目錄下。

2、配置Kafka

這里將Kafka安裝到/usr/local目錄下

因此,Kafka的主配置文件為/usr/local/Kafka/config/server.properties,這里以節(jié)點Kafkazk1為例,重點介紹一些常用配置項的含義:

broker.id=1
listeners=PLAINTEXT://127.0.0.1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/Kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
#不是集群,所以可以寫成localhost
#zookeeper.connect=127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topic.enable=true

每個配置項含義如下:

  • broker.id:每一個broker在集群中的唯一表示,要求是正數(shù)。當(dāng)該服務(wù)器的IP地址發(fā)生改變時,broker.id沒有變化,則不會影響consumers的消息情況。
  • listeners:設(shè)置Kafka的監(jiān)聽地址與端口,可以將監(jiān)聽地址設(shè)置為主機(jī)名或IP地址,這里將監(jiān)聽地址設(shè)置為IP地址。
  • log.dirs:這個參數(shù)用于配置Kafka保存數(shù)據(jù)的位置,Kafka中所有的消息都會存在這個目錄下。可以通過逗號來指定多個路徑, Kafka會根據(jù)最少被使用的原則選擇目錄分配新的parition。需要注意的是,Kafka在分配parition的時候選擇的規(guī)則不是按照磁盤的空間大小來定的,而是根據(jù)分配的 parition的個數(shù)多小而定。
  • num.partitions:這個參數(shù)用于設(shè)置新創(chuàng)建的topic有多少個分區(qū),可以根據(jù)消費者實際情況配置,配置過小會影響消費性能。這里配置6個。
  • log.retention.hours:這個參數(shù)用于配置Kafka中消息保存的時間,還支持log.retention.minutes和 log.retention.ms配置項。這三個參數(shù)都會控制刪除過期數(shù)據(jù)的時間,推薦使用log.retention.ms。如果多個同時設(shè)置,那么會選擇最小的那個。
  • log.segment.bytes:配置partition中每個segment數(shù)據(jù)文件的大小,默認(rèn)是1GB,超過這個大小會自動創(chuàng)建一個新的segment file。
zookeeper.connect

:這個參數(shù)用于指定zookeeper所在的地址,它存儲了broker的元信息。 這個值可以通過逗號設(shè)置多個值,每個值的格式均為:hostname:port/path,每個部分的含義如下:

  • hostname:表示zookeeper服務(wù)器的主機(jī)名或者IP地址,這里設(shè)置為IP地址。
  • port: 表示是zookeeper服務(wù)器監(jiān)聽連接的端口號。
  • /path:表示Kafka在zookeeper上的根目錄。如果不設(shè)置,會使用根目錄。

auto.create.topics.enable:這個參數(shù)用于設(shè)置是否自動創(chuàng)建topic,如果請求一個topic時發(fā)現(xiàn)還沒有創(chuàng)建, Kafka會在broker上自動創(chuàng)建一個topic,如果需要嚴(yán)格的控制topic的創(chuàng)建,那么可以設(shè)置auto.create.topics.enable為false,禁止自動創(chuàng)建topic。

delete.topic.enable:在0.8.2版本之后,Kafka提供了刪除topic的功能,但是默認(rèn)并不會直接將topic數(shù)據(jù)物理刪除。如果要從物理上刪除(即刪除topic后,數(shù)據(jù)文件也會一同刪除),就需要設(shè)置此配置項為true。

3、添加環(huán)境變量

$ vim /etc/profile
export kafka_HOME=/usr/local/kafka
export PATH=$PATH:$kafka_HOME/bin
#生效
$ source /etc/profile

zookeeper服務(wù)的啟動

cd /usr/local/kafka/bin
# 占用啟動
./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
# 后臺啟動
nohup ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &

4、Kafka啟動腳本

$ vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache kafka server (broker)
After=network.target  zookeeper.service
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
systemctl daemon-reload

5、啟動Kafka

在啟動Kafka集群前,需要確保ZooKeeper集群已經(jīng)正常啟動。接著,依次在Kafka各個節(jié)點上執(zhí)行如下命令即可:

$ cd /usr/local/kafka
$ nohup bin/kafka-server-start.sh config/server.properties &
# 或者
$ systemctl start kafka
$ jps
21840 kafka
15593 Jps
15789 QuorumPeerMain

這里將Kafka放到后臺運行,啟動后,會在啟動Kafka的當(dāng)前目錄下生成一個nohup.out文件,可通過此文件查看Kafka的啟動和運行狀態(tài)。通過jps指令,可以看到有個Kafka標(biāo)識,這是Kafka進(jìn)程成功啟動的標(biāo)志。

6、測試Kafka基本命令操作

kefka提供了多個命令用于查看、創(chuàng)建、修改、刪除topic信息,也可以通過命令測試如何生產(chǎn)消息、消費消息等,這些命令位于Kafka安裝目錄的bin目錄下,這里是/usr/local/Kafka/bin。

登錄任意一臺Kafka集群節(jié)點,切換到此目錄下,即可進(jìn)行命令操作。

下面列舉Kafka的一些常用命令的使用方法。
(1)顯示topic列表

#kafka-topics.sh  --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --list
$ kafka-topics.sh  --zookeeper 127.0.0.1:2181 --list
topic123

(2)創(chuàng)建一個topic,并指定topic屬性(副本數(shù)、分區(qū)數(shù)等)

#kafka-topics.sh --create --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123 
$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic topic123
Created topic topic123.
#--replication-factor表示指定副本的個數(shù)

(3)查看某個topic的狀態(tài)

#kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
$ kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic123
Topic: topic123	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: topic123	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic123	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic123	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

(4)生產(chǎn)消息 阻塞狀態(tài)

#kafka-console-producer.sh --broker-list 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic123

(5)消費消息 阻塞狀態(tài)

#kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123
#從頭開始消費消息
#Kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123 --from-beginning
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning

(6)刪除topic

#kafka-topics.sh --delete --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
$ kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topic_

2、GO整合Kafka實現(xiàn)消息發(fā)送和訂閱

2.1 消息生產(chǎn)代碼示例

package main
import (
	"fmt"
	"github.com/IBM/sarama"
)
func main() {
	// 配置生產(chǎn)者信息
	conf := sarama.NewConfig()
	conf.Producer.RequiredAcks = sarama.WaitForAll // 生產(chǎn)者等待所有分區(qū)副本成功提交消息
	conf.Producer.Return.Successes = true          // 成功消息寫入返回
	client, err := sarama.NewSyncProducer([]string{"47.115.230.36:9092"}, conf)
	if nil != err {
		fmt.Println("create Kafka sync producer failed", err)
		return
	}
	defer client.Close()
	msg := &sarama.ProducerMessage{
		Topic: "topic123",                          // 指定消息主題
		Value: sarama.StringEncoder("hello world"), // 構(gòu)造消息
	}
	// 發(fā)送消息
	_, _, err = client.SendMessage(msg)
	if nil != err {
		fmt.Println("send message to Kafka failed", err)
		return
	}
	fmt.Println("send message success")
}

2.2 消息消費代碼示例

package main
import (
	"fmt"
	"github.com/IBM/sarama"
)
/**
 * @desc 生產(chǎn)者
 * @author feige
 * @date 2023-11-15
 * @version 1.0
 */
func main() {
	// 創(chuàng)建一個消費者
	consumer, err := sarama.NewConsumer([]string{"47.115.230.36:9092"}, nil)
	if err != nil {
		fmt.Println("消費者kafka連接服務(wù)失敗,失敗的原因:", err)
		return
	}
	// 從topic123這個主題去獲取消息
	partitions, err := consumer.Partitions("topic123")
	if err != nil {
		fmt.Println("主題獲取失敗,失敗的原因:", err)
		return
	}
	fmt.Println(partitions)
	// 開始遍歷分區(qū)中的消息,開始進(jìn)行消費
	for _, partition := range partitions {
		pc, err := consumer.ConsumePartition("topic123", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Println("分區(qū)數(shù)據(jù)獲取失敗,失敗的原因:", err)
			return
		}
		defer pc.AsyncClose()
		// 開始異步獲取消息
		go func(sarama.PartitionConsumer) {
			for message := range pc.Messages() {
				fmt.Printf("當(dāng)前消費的分區(qū)是:%d,offset:%d,key:%v,消息的內(nèi)容是:%v", message.Partition,
					message.Offset, message.Key, string(message.Value))
				fmt.Println("")
			}
		}(pc)
	}
	// 阻塞讓消費一直處于監(jiān)聽狀態(tài)
	select {}
}

2.3 創(chuàng)建主題代碼示例

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
func CreateTopic(addrs []string, topic string) bool {
	config := sarama.NewConfig()
	config.Version = sarama.V2_0_0_0         // 設(shè)置客戶端版本
	config.Admin.Timeout = 3 * time.Second // 設(shè)置Admin請求超時時間
	admin, err := sarama.NewClusterAdmin(addrs, config)
	if err!= nil {
		return false
	}
	defer admin.Close()
	err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: 3, ReplicationFactor: 2}, false)
	if err == nil {
		fmt.Println("success create topic:", topic)
	} else {
		fmt.Println("failed create topic:", topic)
	}
	return err == nil
}

2.4 性能測試結(jié)果

Kafka目前已經(jīng)成為云計算領(lǐng)域中的“事件驅(qū)動”架構(gòu)、微服務(wù)架構(gòu)中的主要消息隊列,隨著越來越多的公司和組織開始采用Kafka作為基礎(chǔ)消息隊列技術(shù),越來越多的性能測試報告也陸續(xù)出來。筆者提前做了一輪性能測試,并發(fā)現(xiàn)它的消費性能比其它消息隊列還要好,甚至更好些。下面是測試結(jié)果:

測試環(huán)境:

  • 操作系統(tǒng):Ubuntu 16.04
  • CPU:Intel® Xeon® Gold 6148 CPU @ 2.40GHz
  • 內(nèi)存:128G DDR4 ECC
  • Kafka集群:3節(jié)點,每節(jié)點配置6個CPU、32G內(nèi)存、SSD
  • 測試用例:生產(chǎn)者每秒鐘發(fā)送2萬條消息,消費者每秒鐘消費100條消息。

測試結(jié)果:

Kafka消費者

每秒消費100條消息,平均耗時:67毫秒

每秒消費1000條消息,平均耗時:6.7毫秒

RabbitMQ消費者

每秒消費100條消息,平均耗時:1038毫秒

每秒消費1000條消息,平均耗時:10.38毫秒

3、參考

github.com/Shopify/sarama
github.com/bsm/sarama-cluster

生產(chǎn)者

import (
	"fmt"
	"math/rand"
	"os"
	"strconv"
	"strings"
	"time"
	"github.com/Shopify/sarama"
	"github.com/golang/glog"
)
//同步生產(chǎn)者
func Produce() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          //賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。
	config.Producer.Partitioner = sarama.NewRandomPartitioner //寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個分區(qū)
	config.Producer.Return.Successes = true
	msg := &sarama.ProducerMessage{}
	msg.Topic = `test0`
	msg.Value = sarama.StringEncoder("Hello World!")
	client, err := sarama.NewSyncProducer([]string{"Kafka_master:9092"}, config)
	if err != nil {
		fmt.Println("producer close err, ", err)
		return
	}
	defer client.Close()
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send message failed, ", err)
		return
	}
	fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)
}
//異步生產(chǎn)者
func AsyncProducer() {
	var topics = "test0"
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true //必須有這個選項
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewAsyncProducer(strings.Split("Kafka_master:9092", ","), config)
	defer p.Close()
	if err != nil {
		return
	}
	//這個部分一定要寫,不然通道會被堵塞
	go func(p sarama.AsyncProducer) {
		errors := p.Errors()
		success := p.Successes()
		for {
			select {
			case err := <-errors:
				if err != nil {
					glog.Errorln(err)
				}
			case <-success:
			}
		}
	}(p)
	for {
		v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
		fmt.Fprintln(os.Stdout, v)
		msg := &sarama.ProducerMessage{
			Topic: topics,
			Value: sarama.ByteEncoder(v),
		}
		p.Input() <- msg
		time.Sleep(time.Second * 1)
	}
}

消費者

package consumer
import (
	"fmt"
	"strings"
	"sync"
	"time"
	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"github.com/golang/glog"
)
//單個消費者
func Consumer() {
	var wg sync.WaitGroup
	consumer, err := sarama.NewConsumer([]string{"Kafka_master:9092"}, nil)
	if err != nil {
		fmt.Println("Failed to start consumer: %s", err)
		return
	}
	partitionList, err := consumer.Partitions("test0") //獲得該topic所有的分區(qū)
	if err != nil {
		fmt.Println("Failed to get the list of partition:, ", err)
		return
	}
	for partition := range partitionList {
		pc, err := consumer.ConsumePartition("test0", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
			return
		}
		wg.Add(1)
		go func(sarama.PartitionConsumer) { //為每個分區(qū)開一個go協(xié)程去取值
			for msg := range pc.Messages() { //阻塞直到有值發(fā)送過來,然后再繼續(xù)等待
				fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
			defer pc.AsyncClose()
			wg.Done()
		}(pc)
	}
	wg.Wait()
}
//消費組
func ConsumerGroup() {
	groupID := "test-consumer-group"
	config := cluster.NewConfig()
	config.Group.Return.Notifications = true
	config.Consumer.Offsets.CommitInterval = 1 * time.Second
	config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始從最新的offset開始
	c, err := cluster.NewConsumer(strings.Split("Kafka_master:9092", ","), groupID, strings.Split("test0", ","), config)
	if err != nil {
		glog.Errorf("Failed open consumer: %v", err)
		return
	}
	defer c.Close()
	go func(c *cluster.Consumer) {
		errors := c.Errors()
		noti := c.Notifications()
		for {
			select {
			case err := <-errors:
				glog.Errorln(err)
			case <-noti:
			}
		}
	}(c)
	for msg := range c.Messages() {
		fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
		c.MarkOffset(msg, "") //MarkOffset 并不是實時寫入Kafka,有可能在程序crash時丟掉未提交的offset
	}
}

主函數(shù)

package main
import (
	"strom-huang-go/go_Kafka/consumer"
)
func main() {
	// produce.AsyncProducer()
	consumer.Consumer()
}

到此這篇關(guān)于Kafka安裝部署+go整合的文章就介紹到這了,更多相關(guān)Kafka安裝部署內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • GO語言中的常量

    GO語言中的常量

    go語言支持的常量有字符型,字符串型,布爾型和數(shù)字型。本文實例講述了Go語言中常量定義方法。分享給大家供大家參考。
    2015-04-04
  • Go語言實戰(zhàn)之實現(xiàn)均衡器功能

    Go語言實戰(zhàn)之實現(xiàn)均衡器功能

    這篇文章主要為大家詳細(xì)介紹了如何利用Golang?實現(xiàn)一個簡單的流浪均衡器,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-04-04
  • go語言對文件按照指定塊大小進(jìn)行分割的方法

    go語言對文件按照指定塊大小進(jìn)行分割的方法

    這篇文章主要介紹了go語言對文件按照指定塊大小進(jìn)行分割的方法,實例分析了Go語言文件操作的技巧,需要的朋友可以參考下
    2015-03-03
  • go語言map與string的相互轉(zhuǎn)換的實現(xiàn)

    go語言map與string的相互轉(zhuǎn)換的實現(xiàn)

    這篇文章主要介紹了go語言map與string的相互轉(zhuǎn)換的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • go實現(xiàn)grpc四種數(shù)據(jù)流模式

    go實現(xiàn)grpc四種數(shù)據(jù)流模式

    這篇文章主要為大家介紹了go實現(xiàn)grpc四種數(shù)據(jù)流模式,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-04-04
  • 詳解Go語言如何解析帶注釋的json

    詳解Go語言如何解析帶注釋的json

    標(biāo)準(zhǔn)的json格式是不帶注釋,但是有時候為了方便理解json中各字段的含義,需要支持帶注釋的json,這篇文章主要介紹了Go語言解析帶注釋json的相關(guān)方法,希望對大家有所幫助
    2024-03-03
  • Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別介紹

    Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別介紹

    這篇文章主要給大家介紹了關(guān)于Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別的相關(guān)資料,文中先對GO語言結(jié)構(gòu)體方法跟結(jié)構(gòu)體指針方法的區(qū)別進(jìn)行了一些簡單的介紹,來幫助大家理解學(xué)習(xí),需要的朋友可以參考下。
    2017-12-12
  • 利用Go語言實現(xiàn)簡單Ping過程的方法

    利用Go語言實現(xiàn)簡單Ping過程的方法

    相信利用各種語言實現(xiàn)Ping已經(jīng)是大家喜聞樂見的事情了,網(wǎng)絡(luò)上利用Golang實現(xiàn)Ping已經(jīng)有比較詳細(xì)的代碼示例,但大多是僅僅是實現(xiàn)了Request過程,而對Response的回顯內(nèi)容并沒有做接收。而Ping程序不僅僅是發(fā)送一個ICMP,更重要的是如何接收并進(jìn)行統(tǒng)計。
    2016-09-09
  • Go調(diào)用鏈可視化工具使用實例探究

    Go調(diào)用鏈可視化工具使用實例探究

    本文介紹一款工具?go-callvis,它能夠?qū)?Go?代碼的調(diào)用關(guān)系可視化出來,并提供了可交互式的?web?服務(wù),在接手他人代碼或調(diào)研一些開源項目時,如果能夠理清其中的代碼調(diào)用鏈路,這將加速我們對實現(xiàn)的理解
    2024-01-01
  • 使用go mod導(dǎo)入本地自定義包問題

    使用go mod導(dǎo)入本地自定義包問題

    這篇文章主要介紹了使用go mod導(dǎo)入本地自定義包問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07

最新評論