欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

go操作Kafka使用示例詳解

 更新時間:2022年12月05日 09:13:39   作者:qi66  
這篇文章主要為大家介紹了go操作Kafka使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

1. Kafka介紹

1.1 Kafka是什么

kafka使用scala開發(fā),支持多語言客戶端(c++、java、python、go等)

Kafka最先由LinkedIn公司開發(fā),之后成為Apache的頂級項目。

Kafka是一個分布式的、分區(qū)化、可復制提交的日志服務

LinkedIn使用Kafka實現(xiàn)了公司不同應用程序之間的松耦和,那么作為一個可擴展、高可靠的消息系統(tǒng) 支持高Throughput的應用

scale out:無需停機即可擴展機器

持久化:通過將數(shù)據(jù)持久化到硬盤以及replication防止數(shù)據(jù)丟失

支持online和offline的場景

1.2 Kafka的特點

Kafka是分布式的,其所有的構件borker(服務端集群)、producer(消息生產(chǎn))、consumer(消息消費者)都可以是分布式的。

在消息的生產(chǎn)時可以使用一個標識topic來區(qū)分,且可以進行分區(qū);每一個分區(qū)都是一個順序的、不可變的消息隊列, 并且可以持續(xù)的添加。

同時為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。

消息被處理的狀態(tài)是在consumer端維護,而不是由server端維護。當失敗時能自動平衡

1.3 常用的場景

監(jiān)控:主機通過Kafka發(fā)送與系統(tǒng)和應用程序健康相關的指標,然后這些信息會被收集和處理從而創(chuàng)建監(jiān)控儀表盤并發(fā)送警告。

消息隊列: 應用程度使用Kafka作為傳統(tǒng)的消息系統(tǒng)實現(xiàn)標準的隊列和消息的發(fā)布—訂閱,例如搜索和內(nèi)容提要(Content Feed)。比起大多數(shù)的消息系統(tǒng)來說,Kafka有更好的吞吐量,內(nèi)置的分區(qū),冗余及容錯性,這讓Kafka成為了一個很好的大規(guī)模消息處理應用的解決方案。消息系統(tǒng) 一般吞吐量相對較低,但是需要更小的端到端延時,并嘗嘗依賴于Kafka提供的強大的持久性保障。在這個領域,Kafka足以媲美傳統(tǒng)消息系統(tǒng),如ActiveMR或RabbitMQ

站點的用戶活動追蹤: 為了更好地理解用戶行為,改善用戶體驗,將用戶查看了哪個頁面、點擊了哪些內(nèi)容等信息發(fā)送到每個數(shù)據(jù)中心的Kafka集群上,并通過Hadoop進行分析、生成日常報告。

流處理:保存收集流數(shù)據(jù),以提供之后對接的Storm或其他流式計算框架進行處理。很多用戶會將那些從原始topic來的數(shù)據(jù)進行 階段性處理,匯總,擴充或者以其他的方式轉換到新的topic下再繼續(xù)后面的處理。例如一個文章推薦的處理流程,可能是先從RSS數(shù)據(jù)源中抓取文章的內(nèi) 容,然后將其丟入一個叫做“文章”的topic中;后續(xù)操作可能是需要對這個內(nèi)容進行清理,比如回復正常數(shù)據(jù)或者刪除重復數(shù)據(jù),最后再將內(nèi)容匹配的結果返 還給用戶。這就在一個獨立的topic之外,產(chǎn)生了一系列的實時數(shù)據(jù)處理的流程。

日志聚合:使用Kafka代替日志聚合(log aggregation)。日志聚合一般來說是從服務器上收集日志文件,然后放到一個集中的位置(文件服務器或HDFS)進行處理。然而Kafka忽略掉 文件的細節(jié),將其更清晰地抽象成一個個日志或事件的消息流。這就讓Kafka處理過程延遲更低,更容易支持多數(shù)據(jù)源和分布式數(shù)據(jù)處理。比起以日志為中心的 系統(tǒng)比如Scribe或者Flume來說,Kafka提供同樣高效的性能和因為復制導致的更高的耐用性保證,以及更低的端到端延遲

持久性日志:Kafka可以為一種外部的持久性日志的分布式系統(tǒng)提供服務。這種日志可以在節(jié)點間備份數(shù)據(jù),并為故障節(jié)點數(shù)據(jù)回復提供一種重新同步的機制。Kafka中日志壓縮功能為這種用法提供了條件。在這種用法中,Kafka類似于Apache BookKeeper項目。

1.4 Kafka中包含以下基礎概念

1.Topic(話題):Kafka中用于區(qū)分不同類別信息的類別名稱。由producer指定

2.Producer(生產(chǎn)者):將消息發(fā)布到Kafka特定的Topic的對象(過程)

3.Consumers(消費者):訂閱并處理特定的Topic中的消息的對象(過程)

4.Broker(Kafka服務集群):已發(fā)布的消息保存在一組服務器中,稱之為Kafka集群。集群中的每一個服務器都是一個代理(Broker). 消費者可以訂閱一個或多個話題,并從Broker拉數(shù)據(jù),從而消費這些已發(fā)布的消息。

5.Partition(分區(qū)):Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)

Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發(fā)布一些消息。

1.5 消息

消息由一個固定大小的報頭和可變長度但不透明的字節(jié)陣列負載。報頭包含格式版本和CRC32效驗和以檢測損壞或截斷

1.6 消息格式

    1. 4 byte CRC32 of the message
    2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
    3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
       bit 0 ~ 2 : Compression codec
           0 : no compression
           1 : gzip
           2 : snappy
           3 : lz4
       bit 3 : Timestamp type
           0 : create time
           1 : log append time
       bit 4 ~ 7 : reserved
    4. (可選) 8 byte timestamp only if "magic" identifier is greater than 0
    5. 4 byte key length, containing length K
    6. K byte key
    7. 4 byte payload length, containing length V
    8. V byte payload

2. Kafka深層介紹

2.1 架構介紹

Producer:Producer即生產(chǎn)者,消息的產(chǎn)生者,是消息的?口。

kafka cluster:kafka集群,一臺或多臺服務?組成

  • Broker:Broker是指部署了Kafka實例的服務?節(jié)點。每個服務?上有一個或多個kafka的實 例,我們姑且認為每個broker對應一臺服務?。每個kafka集群內(nèi)的broker都有一個不重復的 編號,如圖中的broker-0、broker-1等……
  • Topic:消息的主題,可以理解為消息的分類,kafka的數(shù)據(jù)就保存在topic。在每個broker上 都可以創(chuàng)建多個topic。實際應用中通常是一個業(yè)務線建一個topic。
  • Partition:Topic的分區(qū),每個topic可以有多個分區(qū),分區(qū)的作用是做負載,提高kafka的吞 吐量。同一個topic在不同的分區(qū)的數(shù)據(jù)是不重復的,partition的表現(xiàn)形式就是一個一個的?件夾!
  • Replication:每一個分區(qū)都有多個副本,副本的作用是做備胎。當主分區(qū)(Leader)故障的 時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數(shù)量是10 個,且副本的數(shù)量不能大于Broker的數(shù)量,follower和leader絕對是在不同的機器,同一機 ?對同一個分區(qū)也只可能存放一個副本(包括自己)。

Consumer:消費者,即消息的消費方,是消息的出口。

  • Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中同一個分 區(qū)的數(shù)據(jù)只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個 topic的不同分區(qū)的數(shù)據(jù),這也是為了提高kafka的吞吐量!

2.2 ?作流程

我們看上?的架構圖中,producer就是生產(chǎn)者,是數(shù)據(jù)的入口。Producer在寫入數(shù)據(jù)的時候會把數(shù)據(jù) 寫入到leader中,不會直接將數(shù)據(jù)寫入follower!那leader怎么找呢?寫入的流程又是什么樣的呢?我 們看下圖:

1.?產(chǎn)者從Kafka集群獲取分區(qū)leader信息

2.?產(chǎn)者將消息發(fā)送給leader

3.leader將消息寫入本地磁盤

4.follower從leader拉取消息數(shù)據(jù)

5.follower將消息寫入本地磁盤后向leader發(fā)送ACK

6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK

2.3 選擇partition的原則

那在kafka中,如果某個topic有多個partition,producer?怎么知道該將數(shù)據(jù)發(fā)往哪個partition呢? kafka中有幾個原則:

1.partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。

2.如果沒有指定partition,但是設置了數(shù)據(jù)的key,則會根據(jù)key的值hash出一個partition。

3.如果既沒指定partition,又沒有設置key,則會采用輪詢?式,即每次取一小段時間的數(shù)據(jù)寫入某partition,下一小段的時間寫入下一個partition

2.4 ACK應答機制

producer在向kafka寫入消息的時候,可以設置參數(shù)來確定是否確認kafka接收到數(shù)據(jù),這個參數(shù)可設置 的值為 0,1,all

  • 0代表producer往集群發(fā)送數(shù)據(jù)不需要等到集群的返回,不確保消息發(fā)送成功。安全性最低但是效 率最高。
  • 1代表producer往集群發(fā)送數(shù)據(jù)只要leader應答就可以發(fā)送下一條,只確保leader發(fā)送成功。
  • all代表producer往集群發(fā)送數(shù)據(jù)需要所有的follower都完成從leader的同步才會發(fā)送下一條,確保 leader發(fā)送成功和所有的副本都完成備份。安全性最?高,但是效率最低。

最后要注意的是,如果往不存在的topic寫數(shù)據(jù),kafka會?動創(chuàng)建topic,partition和replication的數(shù)量 默認配置都是1。

2.5 Topic和數(shù)據(jù)?志

topic 是同?類別的消息記錄(record)的集合。在Kafka中,?個主題通常有多個訂閱者。對于每個 主題,Kafka集群維護了?個分區(qū)數(shù)據(jù)?志?件結構如下:

每個partition都是?個有序并且不可變的消息記錄集合。當新的數(shù)據(jù)寫?時,就被追加到partition的末 尾。在每個partition中,每條消息都會被分配?個順序的唯?標識,這個標識被稱為offset,即偏移 量。注意,Kafka只保證在同?個partition內(nèi)部消息是有序的,在不同partition之間,并不能保證消息 有序。

Kafka可以配置?個保留期限,?來標識?志會在Kafka集群內(nèi)保留多?時間。Kafka集群會保留在保留 期限內(nèi)所有被發(fā)布的消息,不管這些消息是否被消費過。?如保留期限設置為兩天,那么數(shù)據(jù)被發(fā)布到 Kafka集群的兩天以內(nèi),所有的這些數(shù)據(jù)都可以被消費。當超過兩天,這些數(shù)據(jù)將會被清空,以便為后 續(xù)的數(shù)據(jù)騰出空間。由于Kafka會將數(shù)據(jù)進?持久化存儲(即寫?到硬盤上),所以保留的數(shù)據(jù)??可 以設置為?個?較?的值。

2.6 Partition結構

Partition在服務器上的表現(xiàn)形式就是?個?個的?件夾,每個partition的?件夾下?會有多組segment ?件,每組segment?件?包含 .index ?件、 .log ?件、 .timeindex ?件三個?件,其中 .log ? 件就是實際存儲message的地?,? .index 和 .timeindex ?件為索引?件,?于檢索消息。

2.7 消費數(shù)據(jù)

多個消費者實例可以組成?個消費者組,并??個標簽來標識這個消費者組。?個消費者組中的不同消 費者實例可以運?在不同的進程甚?不同的服務器上。

如果所有的消費者實例都在同?個消費者組中,那么消息記錄會被很好的均衡的發(fā)送到每個消費者實 例。

如果所有的消費者實例都在不同的消費者組,那么每?條消息記錄會被?播到每?個消費者實例。

舉個例?,如上圖所示?個兩個節(jié)點的Kafka集群上擁有?個四個partition(P0-P3)的topic。有兩個 消費者組都在消費這個topic中的數(shù)據(jù),消費者組A有兩個消費者實例,消費者組B有四個消費者實例。 從圖中我們可以看到,在同?個消費者組中,每個消費者實例可以消費多個分區(qū),但是每個分區(qū)最多只 能被消費者組中的?個實例消費。也就是說,如果有?個4個分區(qū)的主題,那么消費者組中最多只能有4 個消費者實例去消費,多出來的都不會被分配到分區(qū)。其實這也很好理解,如果允許兩個消費者實例同 時消費同?個分區(qū),那么就?法記錄這個分區(qū)被這個消費者組消費的offset了。如果在消費者組中動態(tài) 的上線或下線消費者,那么Kafka集群會?動調整分區(qū)與消費者實例間的對應關系。

3. 操作Kafka

3.1 sarama

Go語言中連接kafka使用第三方庫: github.com/Shopify/sarama。

3.2 下載及安裝

    go get github.com/Shopify/sarama

注意事項: sarama v1.20之后的版本加入了zstd壓縮算法,需要用到cgo,在Windows平臺編譯時會提示類似如下錯誤: github.com/DataDog/zstd exec: "gcc":executable file not found in %PATH% 所以在Windows平臺請使用v1.19版本的sarama。(如果不會版本控制請查看博客里面的go module章節(jié))

3.3 連接kafka發(fā)送消息

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
// 基于sarama第三方庫開發(fā)的kafka client
func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 發(fā)送完數(shù)據(jù)需要leader和follow都確認
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
    config.Producer.Return.Successes = true                   // 成功交付的消息將在success channel返回
    // 構造一個消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "web_log"
    msg.Value = sarama.StringEncoder("this is a test log")
    // 連接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()
    // 發(fā)送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

3.4 連接kafka消費消息

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
// kafka consumer
func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū)
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍歷所有的分區(qū)
        // 針對每個分區(qū)創(chuàng)建一個對應的分區(qū)消費者
        pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        // 異步從每個分區(qū)消費信息
        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
            }
        }(pc)
    }
}

以上就是go操作Kfaka使用示例詳解的詳細內(nèi)容,更多關于go操作Kfaka的資料請關注腳本之家其它相關文章!

相關文章

  • Golang內(nèi)存管理之內(nèi)存逃逸分析

    Golang內(nèi)存管理之內(nèi)存逃逸分析

    逃逸分析是指由編譯器決定內(nèi)存分配的位置,不需要程序員指定,這篇文章主要為大家詳細介紹了Golang中內(nèi)存逃逸分析的幾種方法,需要的可以參考一下
    2023-07-07
  • 詳解Golang中Context的三個常見應用場景

    詳解Golang中Context的三個常見應用場景

    Golang?context主要用于定義超時取消,取消后續(xù)操作,在不同操作中傳遞值。本文通過簡單易懂的示例進行說明,感興趣的可以了解一下
    2022-12-12
  • go值賦值和引用賦值的使用

    go值賦值和引用賦值的使用

    本文將介紹Go語言中的值賦值和引用賦值,并比較它們之間的差異,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-10-10
  • Golang的md5 hash計算操作

    Golang的md5 hash計算操作

    這篇文章主要介紹了Golang的md5 hash計算操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go?為什么不支持可重入鎖原理解析

    Go?為什么不支持可重入鎖原理解析

    這篇文章主要為大家介紹了Go?為什么不支持可重入鎖原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • Golang必知必會之Go?Mod命令詳解

    Golang必知必會之Go?Mod命令詳解

    go mod可以使項目從GOPATH的強制依賴中獨立出來,也就是說你的項目依賴不再需要放在在GOPATH下面了,下面這篇文章主要給大家介紹了關于Golang必知必會之Go?Mod命令的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-07-07
  • Go語言函數(shù)的延遲調用(Deferred Code)詳解

    Go語言函數(shù)的延遲調用(Deferred Code)詳解

    本文將介紹Go語言函數(shù)和方法中的延遲調用,正如名稱一樣,這部分定義不會立即執(zhí)行,一般會在函數(shù)返回前再被調用,我們通過一些示例來了解一下延遲調用的使用場景
    2022-07-07
  • go語言實現(xiàn)將重要數(shù)據(jù)寫入圖片中

    go語言實現(xiàn)將重要數(shù)據(jù)寫入圖片中

    本文給大家分享的是go語言實現(xiàn)將數(shù)據(jù)的二進制形式寫入圖像紅色通道數(shù)據(jù)二進制的低位,從而實現(xiàn)將重要數(shù)據(jù)隱藏,有需要的小伙伴參考下吧。
    2015-03-03
  • Go java 算法之括號生成示例詳解

    Go java 算法之括號生成示例詳解

    這篇文章主要為大家介紹了Go java 算法之括號生成示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08
  • 淺析Go語言bitset的實現(xiàn)原理

    淺析Go語言bitset的實現(xiàn)原理

    bitset包是一個將非負整數(shù)映射到布爾值的位的集合,這篇文章主要通過開源包bitset來為大家分析一下位集合的設計和實現(xiàn),感興趣的可以學習一下
    2023-08-08

最新評論