欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

go-zero數(shù)據(jù)的流處理利器fx使用詳解

 更新時間:2023年05月29日 14:02:44   作者:Keson  
這篇文章主要為大家介紹了go-zero數(shù)據(jù)的流處理利器fx使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

數(shù)據(jù)的流處理利器

go-zero微服務庫地址https://github.com/tal-tech/go-zero

流處理(Stream processing)是一種計算機編程范式,其允許給定一個數(shù)據(jù)序列(流處理數(shù)據(jù)源),一系列數(shù)據(jù)操作(函數(shù))被應用到流中的每個元素。同時流處理工具可以顯著提高程序員的開發(fā)效率,允許他們編寫有效、干凈和簡潔的代碼。

流數(shù)據(jù)處理在我們的日常工作中非常常見,舉個例子,我們在業(yè)務開發(fā)中往往會記錄許多業(yè)務日志,這些日志一般是先發(fā)送到Kafka,然后再由Job消費Kafaka寫到elasticsearch,在進行日志流處理的過程中,往往還會對日志做一些處理,比如過濾無效的日志,做一些計算以及重新組合日志等等,示意圖如下:

流處理工具fx

gozero是一個功能完備的微服務框架,框架中內置了很多非常實用的工具,其中就包含流數(shù)據(jù)處理工具fx,下面我們通過一個簡單的例子來認識下該工具:

package main
import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
    "github.com/tal-tech/go-zero/core/fx"
)
func main() {
    ch := make(chan int)
    go inputStream(ch)
    go outputStream(ch)
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
    <-c
}
func inputStream(ch chan int) {
    count := 0
    for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
    }
}
func outputStream(ch chan int) {
    fx.From(func(source chan<- interface{}) {
        for c := range ch {
            source <- c
        }
    }).Walk(func(item interface{}, pipe chan<- interface{}) {
        count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {
        itemInt := item.(int)
        if itemInt%2 == 0 {
            return true
        }
        return false
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}

inputStream函數(shù)模擬了流數(shù)據(jù)的產(chǎn)生,outputStream函數(shù)模擬了流數(shù)據(jù)的處理過程,其中From函數(shù)為流的輸入,Walk函數(shù)并發(fā)的作用在每一個item上,F(xiàn)ilter函數(shù)對item進行過濾為true保留為false不保留,F(xiàn)orEach函數(shù)遍歷輸出每一個item元素。

流數(shù)據(jù)處理中間操作

一個流的數(shù)據(jù)處理可能存在許多的中間操作,每個中間操作都可以作用在流上。就像流水線上的工人一樣,每個工人操作完零件后都會返回處理完成的新零件,同理流處理中間操作完成后也會返回一個新的流。

fx的流處理中間操作:

操作函數(shù)功能輸入
Distinct去除重復的itemKeyFunc,返回需要去重的key
Filter過濾不滿足條件的itemFilterFunc,Option控制并發(fā)量
Group對item進行分組KeyFunc,以key進行分組
Head取出前n個item,返回新streamint64保留數(shù)量
Map對象轉換MapFunc,Option控制并發(fā)量
Merge合并item到slice并生成新stream
Reverse反轉item
Sort對item進行排序LessFunc實現(xiàn)排序算法
Tail與Head功能類似,取出后n個item組成新streamint64保留數(shù)量
Walk作用在每個item上WalkFunc,Option控制并發(fā)量

下圖展示了每個步驟和每個步驟的結果:

用法與原理分析

From

通過From函數(shù)構建流并返回Stream,流數(shù)據(jù)通過channel進行存儲:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
})
// 源碼
func From(generate GenerateFunc) Stream {
    source := make(chan interface{})
    go func() {
        defer close(source)
    // 構造流數(shù)據(jù)寫入channel
        generate(source)
    }()
    return Range(source)
}

Filter

Filter函數(shù)提供過濾item的功能,F(xiàn)ilterFunc定義過濾邏輯true保留item,false則不保留:

// 例子 保留偶數(shù)
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
}).Filter(func(item interface{}) bool {
  if item.(int)%2 == 0 {
    return true
  }
  return false
})
// 源碼
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
    return p.Walk(func(item interface{}, pipe chan<- interface{}) {
    // 執(zhí)行過濾函數(shù)true保留,false丟棄
        if fn(item) {
            pipe <- item
        }
    }, opts...)
}

Group

Group對流數(shù)據(jù)進行分組,需定義分組的key,數(shù)據(jù)分組后以slice存入channel:

// 例子 按照首字符"g"或者"p"分組,沒有則分到另一組
    ss := []string{"golang", "google", "php", "python", "java", "c++"}
    fx.From(func(source chan<- interface{}) {
        for _, s := range ss {
            source <- s
        }
    }).Group(func(item interface{}) interface{} {
        if strings.HasPrefix(item.(string), "g") {
            return "g"
        } else if strings.HasPrefix(item.(string), "p") {
            return "p"
        }
        return ""
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}
// 源碼
func (p Stream) Group(fn KeyFunc) Stream {
  // 定義分組存儲map
    groups := make(map[interface{}][]interface{})
    for item := range p.source {
    // 用戶自定義分組key
        key := fn(item)
    // key相同分到一組
        groups[key] = append(groups[key], item)
    }
    source := make(chan interface{})
    go func() {
        for _, group := range groups {
      // 相同key的一組數(shù)據(jù)寫入到channel
            source <- group
        }
        close(source)
    }()
    return Range(source)
}

Reverse

reverse可以對流中元素進行反轉處理:

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 源碼
func (p Stream) Reverse() Stream {
    var items []interface{}
  // 獲取流中數(shù)據(jù)
    for item := range p.source {
        items = append(items, item)
    }
    // 反轉算法
    for i := len(items)/2 - 1; i >= 0; i-- {
        opp := len(items) - 1 - i
        items[i], items[opp] = items[opp], items[i]
    }
  // 寫入流
    return Just(items...)
}

Distinct

distinct對流中元素進行去重,去重在業(yè)務開發(fā)中比較常用,經(jīng)常需要對用戶id等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  return item
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 結果為 1,2,3,4,5,6
// 源碼
func (p Stream) Distinct(fn KeyFunc) Stream {
    source := make(chan interface{})
    threading.GoSafe(func() {
        defer close(source)
        // 通過key進行去重,相同key只保留一個
        keys := make(map[interface{}]lang.PlaceholderType)
        for item := range p.source {
            key := fn(item)
      // key存在則不保留
            if _, ok := keys[key]; !ok {
                source <- item
                keys[key] = lang.Placeholder
            }
        }
    })
    return Range(source)
}

Walk

Walk函數(shù)并發(fā)的作用在流中每一個item上,可以通過WithWorkers設置并發(fā)數(shù),默認并發(fā)數(shù)為16,最小并發(fā)數(shù)為1,如設置unlimitedWorkers為true則并發(fā)數(shù)無限制,但并發(fā)寫入流中的數(shù)據(jù)由defaultWorkers限制,WalkFunc中用戶可以自定義后續(xù)寫入流中的元素,可以不寫入也可以寫入多個元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  newItem := strings.ToUpper(item.(string))
  pipe <- newItem
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 源碼
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
    pipe := make(chan interface{}, option.workers)
    go func() {
        var wg sync.WaitGroup
        pool := make(chan lang.PlaceholderType, option.workers)
        for {
      // 控制并發(fā)數(shù)量
            pool <- lang.Placeholder
            item, ok := <-p.source
            if !ok {
                <-pool
                break
            }
            wg.Add(1)
            go func() {
                defer func() {
                    wg.Done()
                    <-pool
                }()
                // 作用在每個元素上
                fn(item, pipe)
            }()
        }
    // 等待處理完成
        wg.Wait()
        close(pipe)
    }()
    return Range(pipe)
}

并發(fā)處理

fx工具除了進行流數(shù)據(jù)處理以外還提供了函數(shù)并發(fā)功能,在微服務中實現(xiàn)某個功能往往需要依賴多個服務,并發(fā)的處理依賴可以有效的降低依賴耗時,提升服務的性能。

fx.Parallel(func() {
  userRPC() // 依賴1
}, func() {
  accountRPC() // 依賴2
}, func() {
  orderRPC() // 依賴3
})

注意fx.Parallel進行依賴并行處理的時候不會有error返回,如需有error返回或者有一個依賴報錯需要立馬結束依賴請求請使用MapReduce工具進行處理。

總結

本篇文章介紹了流處理的基本概念和gozero中的流處理工具fx,在實際的生產(chǎn)中流處理場景應用也非常多,希望本篇文章能給大家?guī)硪欢ǖ膯l(fā),更好的應對工作中的流處理場景。

以上就是go-zero數(shù)據(jù)的流處理利器fx使用詳解的詳細內容,更多關于go-zero數(shù)據(jù)流處理fx的資料請關注腳本之家其它相關文章!

相關文章

  • Golang time.Sleep()用法及示例講解

    Golang time.Sleep()用法及示例講解

    Go語言中的Sleep()函數(shù)用于在至少規(guī)定的持續(xù)時間d內停止最新的go-routine,這篇文章主要介紹了Golang time.Sleep()用法及示例講解,需要的朋友可以參考下
    2023-02-02
  • 淺析Golang中類型嵌入的簡介與使用

    淺析Golang中類型嵌入的簡介與使用

    類型嵌入指的就是在一個類型的定義中嵌入了其他類型,Go?語言支持兩種類型嵌入,分別是接口類型的類型嵌入和結構體類型的類型嵌入,下面我們就來詳細一下類型嵌入的使用吧
    2023-11-11
  • 詳解go?mod?使用方法

    詳解go?mod?使用方法

    golang 提供了 go mod命令來管理包,是go的一個模塊管理工具,用來代替?zhèn)鹘y(tǒng)的GOPATH方案,本文給大家介紹go?mod?使用方法,感興趣的朋友一起看看吧
    2022-05-05
  • golang套接字的實現(xiàn)

    golang套接字的實現(xiàn)

    Go語言中通過標準庫net實現(xiàn)套接字編程,涵蓋了TCP和UDP兩種網(wǎng)絡類型,通過這些基本概念和實際代碼示例,可以幫助理解和掌握Go語言中的套接字編程
    2024-10-10
  • golang sql語句超時控制方案及原理

    golang sql語句超時控制方案及原理

    一般應用程序在執(zhí)行一條sql語句時,都會給這條sql設置一個超時時間,本文主要介紹了golang sql語句超時控制方案及原理,具有一定的參考價值,感興趣的可以了解一下
    2023-12-12
  • Golang比較兩個slice是否相等的問題

    Golang比較兩個slice是否相等的問題

    本文主要介紹了Golang比較兩個slice是否相等的問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03
  • gin項目部署到服務器并后臺啟動的步驟

    gin項目部署到服務器并后臺啟動的步驟

    本文主要介紹了gin項目部署到服務器并后臺啟動的步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-02-02
  • GO語言實現(xiàn)的端口掃描器分享

    GO語言實現(xiàn)的端口掃描器分享

    這篇文章主要介紹了GO語言實現(xiàn)的端口掃描器分享,本文直接給出實現(xiàn)代碼,代碼中包含大量注釋,需要的朋友可以參考下
    2014-10-10
  • Golang生成Excel文檔的方法步驟

    Golang生成Excel文檔的方法步驟

    生成Excel是一個很常見的需求,本文將介紹如何使用Go的 Excelize庫去生成Excel文檔,以及一些具體場景下的代碼實現(xiàn),感興趣的可以參考一下
    2021-06-06
  • Go Java算法之累加數(shù)示例詳解

    Go Java算法之累加數(shù)示例詳解

    這篇文章主要為大家介紹了Go Java算法之累加數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08

最新評論