欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解如何用Golang處理每分鐘100萬(wàn)個(gè)請(qǐng)求

 更新時(shí)間:2023年04月11日 11:40:36   作者:janrs_com  
在項(xiàng)目開(kāi)發(fā)中,我們常常會(huì)遇到處理來(lái)自數(shù)百萬(wàn)個(gè)端點(diǎn)的大量POST請(qǐng)求,本文主要介紹了Golang實(shí)現(xiàn)處理每分鐘100萬(wàn)個(gè)請(qǐng)求的方法,希望對(duì)大家有所幫助

面臨的問(wèn)題

在我設(shè)計(jì)一個(gè)分析系統(tǒng)中,我們公司的目標(biāo)是能夠處理來(lái)自數(shù)百萬(wàn)個(gè)端點(diǎn)的大量POST請(qǐng)求。web 網(wǎng)絡(luò)處理程序?qū)⑹盏揭粋€(gè)JSON文檔,其中可能包含許多有效載荷的集合,需要寫(xiě)入Amazon S3,以便我們的地圖還原系統(tǒng)隨后對(duì)這些數(shù)據(jù)進(jìn)行操作。

傳統(tǒng)上,我們會(huì)研究創(chuàng)建一個(gè)工人層架構(gòu),利用諸如以下東西:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • 還有等等其他的技術(shù)手段...

并設(shè)置 2 個(gè)不同的集群,一個(gè)用于 Web 前端,另一個(gè)用于 worker 處理進(jìn)程,這樣我們就可以擴(kuò)大我們可以處理的后臺(tái)工作量。

但從一開(kāi)始,我們的團(tuán)隊(duì)就知道我們應(yīng)該在 Go 中這樣做,因?yàn)樵谟懻撾A段我們看到這可能是一個(gè)非常大的流量系統(tǒng)。 我使用 Go 已有大約 2 年左右的時(shí)間,我們公司在處理業(yè)務(wù)時(shí)開(kāi)發(fā)了一些系統(tǒng),但沒(méi)有一個(gè)能承受如此大的負(fù)載。以下是優(yōu)化的過(guò)程。

我們首先創(chuàng)建一些結(jié)構(gòu)體來(lái)定義我們將通過(guò) POST 調(diào)用接收的 Web 請(qǐng)求負(fù)載,以及一種將其上傳到我們的 S3 存儲(chǔ)桶的方法。代碼如下:

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // ...負(fù)載字段
}

func (p *Payload) UploadToS3() error {
    // storageFolder 方法確保在我們?cè)阪I名中獲得相同時(shí)間戳?xí)r不會(huì)發(fā)生名稱(chēng)沖突
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // 我們發(fā)布到 S3 存儲(chǔ)桶的所有內(nèi)容都應(yīng)標(biāo)記為“私有”
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

使用 Go 協(xié)程

最初我們采用了一個(gè)非常簡(jiǎn)單的 POST 處理程序?qū)崿F(xiàn),只是試圖將job 處理程序并行化到一個(gè)簡(jiǎn)單的 goroutine 中:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 將body讀入字符串進(jìn)行json解碼
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    
    // 分別檢查每個(gè)有效負(fù)載和隊(duì)列項(xiàng)目以發(fā)布到 S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- 這是不建議的做法。這里是最開(kāi)始的做法。
    }

    w.WriteHeader(http.StatusOK)
}

對(duì)于中等負(fù)載,這可能適用于大多數(shù)公司的流量,但很快證明這在大規(guī)模情況下效果不佳。 我們期望有很多請(qǐng)求,但沒(méi)有達(dá)到我們將第一個(gè)版本部署到生產(chǎn)環(huán)境時(shí)開(kāi)始看到的數(shù)量級(jí)。 我們完全低估了流量。

上面的方法在幾個(gè)不同的方面是不好的。 無(wú)法控制我們生成了多少個(gè) go routines。 由于我們每分鐘收到 100 萬(wàn)個(gè) POST 請(qǐng)求,因此這段代碼很快崩潰了。

進(jìn)一步優(yōu)化

我們需要找到一種不同的方式。 從一開(kāi)始我們就開(kāi)始討論我們需要如何保持請(qǐng)求處理程序的生命周期非常短,并在后臺(tái)進(jìn)行生成處理。 當(dāng)然,這是你在使用 Ruby on Rails 時(shí)必須做的,否則你將阻止所有可用的 worker web 處理器,無(wú)論你使用的是 puma、unicorn 還是 passenger(請(qǐng)不要進(jìn)入 JRuby 討論)。 然后我們需要利用常見(jiàn)的解決方案來(lái)做到這一點(diǎn),例如 Resque、Sidekiq、SQS 等等,有很多方法可以實(shí)現(xiàn)這一點(diǎn)。

所以第二次迭代是創(chuàng)建一個(gè)緩沖通道,我們可以創(chuàng)建一些隊(duì)列,然后把 job push到隊(duì)列并將它們上傳到 S3,并且由于我們可以控制job 隊(duì)列中的最大數(shù)數(shù)量并且我們有足夠的內(nèi)存來(lái)處理隊(duì)列中的 job。在這個(gè)方案中,我們認(rèn)為只需要在通道隊(duì)列中緩沖需要處理的 job 就可以了。

代碼如下:

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // 分別檢查每個(gè)有效負(fù)載和隊(duì)列項(xiàng)目以發(fā)布到 S3
    for _, payload := range content.Payloads {
        Queue <- payload // <----- 這是建議的做法。
    }
    ...
}

然后為了實(shí)際出列作業(yè)并處理它們,我們使用了類(lèi)似的東西:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- 這里雖然優(yōu)化了,但還不是最好的。
        }
    }
}

在上面的代碼中,我們用一個(gè)緩沖隊(duì)列來(lái)交換有缺陷的并發(fā)性,而緩沖隊(duì)列只是推遲了問(wèn)題。 我們的同步處理器一次只將一個(gè)有效負(fù)載上傳到 S3,并且由于傳入請(qǐng)求的速率遠(yuǎn)遠(yuǎn)大于單個(gè)處理器上傳到 S3 的能力,我們的 job 緩沖通道很快達(dá)到了極限并阻止了請(qǐng)求處理程序的能力,隊(duì)列很快就阻塞滿(mǎn)了。

我們只是在避免這個(gè)問(wèn)題,并開(kāi)始倒計(jì)時(shí),直到我們的系統(tǒng)最終死亡。 在我們部署這個(gè)有缺陷的版本后,我們的延遲率在幾分鐘內(nèi)以恒定的速度持續(xù)增加。以下是延遲率增長(zhǎng)圖:

更好的解決方案

我們決定在使用 Go 通道時(shí)使用一種通用模式,以創(chuàng)建一個(gè) 2 層通道系統(tǒng),一個(gè)用于 Job 隊(duì)列,另一個(gè)用于控制同時(shí)在 Job 隊(duì)列上操作的 Worker 的數(shù)量。

這個(gè)想法是將上傳到 S3 的數(shù)據(jù)并行化到某種程度上可持續(xù)的速度,這種速度既不會(huì)削弱機(jī)器也不會(huì)開(kāi)始從 S3 生成連接錯(cuò)誤。 所以我們選擇創(chuàng)建 Job/Worker 模式。 對(duì)于那些熟悉 Java、C# 等的人來(lái)說(shuō),可以將其視為 Golang 使用通道實(shí)現(xiàn) Worker 線程池的方式。

代碼如下:

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job 表示要運(yùn)行的作業(yè)
type Job struct {
    Payload Payload
}

// 我們可以在 Job 隊(duì)列上發(fā)送工作請(qǐng)求的緩沖通道。
var JobQueue chan Job

// Worker 代表執(zhí)行作業(yè)的 Worker。
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start 方法為 Worker 啟動(dòng)循環(huán)監(jiān)聽(tīng)。監(jiān)聽(tīng)退出信號(hào)以防我們需要停止它。
func (w Worker) Start() {
    go func() {
        for {
            // 將當(dāng)前 woker 注冊(cè)到工作隊(duì)列中。
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 接收 work 請(qǐng)求。
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 接收一個(gè)退出的信號(hào)。
                return
            }
        }
    }()
}

// 將退出信號(hào)傳遞給 Worker 進(jìn)程以停止處理清理。
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

我們已經(jīng)修改了我們的 Web 請(qǐng)求處理程序,以創(chuàng)建一個(gè)帶有有效負(fù)載的 Job 結(jié)構(gòu)實(shí)例,并將其發(fā)送到 JobQueue 通道以供 Worker 提取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 將body讀入字符串進(jìn)行json解碼
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 分別檢查每個(gè)有效負(fù)載和隊(duì)列項(xiàng)目以發(fā)布到 S3
    for _, payload := range content.Payloads {

        // 創(chuàng)建一個(gè)有效負(fù)載的job
        work := Job{Payload: payload}

        // 將 work push 到隊(duì)列。
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在我們的 Web 服務(wù)器初始化期間,我們創(chuàng)建一個(gè) Dispatcher 調(diào)度器并調(diào)用 Run() 來(lái)創(chuàng)建 Woker 工作池并開(kāi)始偵聽(tīng)將出現(xiàn)在 Job 隊(duì)列中的 Job。

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

下面是我們的調(diào)度程序?qū)崿F(xiàn)的代碼:

type Dispatcher struct {
    // 通過(guò)調(diào)度器注冊(cè)一個(gè) Worker 通道池
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // 啟動(dòng)指定數(shù)量的 Worker
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // 接收一個(gè) job 請(qǐng)求
            go func(job Job) {
                // 嘗試獲取可用的 worker job 通道
                // 這將阻塞 worker 直到空閑
                jobChannel := <-d.WorkerPool

                // 調(diào)度一個(gè) job 到 worker job 通道
                jobChannel <- job
            }(job)
        }
    }
}

請(qǐng)注意,我們提供了要實(shí)例化并添加到我們的 Worker 池中的最大worker 數(shù)量。 由于我們?cè)谶@個(gè)項(xiàng)目中使用了 Amazon Elasticbeanstalk 和 dockerized Go 環(huán)境,因此我們從環(huán)境變量中讀取這些值。 這樣我們就可以控制 Job 隊(duì)列的數(shù)量和最大大小,因此我們可以快速調(diào)整這些值而無(wú)需重新部署集群。

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS")
  MaxQueue  = os.Getenv("MAX_QUEUE")
)

在我們部署它之后,我們立即看到我們所有的延遲率都下降到極低的延遲,并且我們處理請(qǐng)求的能力急劇上升。以下是流量截圖:

在我們的彈性負(fù)載均衡器完全預(yù)熱幾分鐘后,我們看到我們的 ElasticBeanstalk 應(yīng)用程序每分鐘處理近 100 萬(wàn)個(gè)請(qǐng)求。 我們通常在早上有幾個(gè)小時(shí)的流量會(huì)飆升至每分鐘超過(guò)一百萬(wàn)。

一旦我們部署了新代碼,服務(wù)器數(shù)量就從 100 臺(tái)服務(wù)器大幅下降到大約 20 臺(tái)服務(wù)器。以下是服務(wù)器數(shù)量變化截圖:

在正確配置集群和自動(dòng)縮放設(shè)置后,我們能夠?qū)⑵溥M(jìn)一步降低到僅 4x EC2 c4.Large 實(shí)例,并且如果 CPU 使用率超過(guò) 90% 持續(xù) 5 天,Elastic Auto-Scaling 將生成一個(gè)新實(shí)例 分鐘值。以下是截圖:

總結(jié)

可以看出利用 Elasticbeanstalk 自動(dòng)縮放的強(qiáng)大功能以及 Golang 提供的開(kāi)箱即用的高效和簡(jiǎn)單的并發(fā)方法,就可以構(gòu)建出一個(gè)高性能的處理程序。

以上就是詳解如何用Golang處理每分鐘100萬(wàn)個(gè)請(qǐng)求的詳細(xì)內(nèi)容,更多關(guān)于Golang處理請(qǐng)求的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang實(shí)現(xiàn)Java虛擬機(jī)之解析class文件詳解

    Golang實(shí)現(xiàn)Java虛擬機(jī)之解析class文件詳解

    這篇文章主要為大家詳細(xì)介紹了Golang實(shí)現(xiàn)Java虛擬機(jī)之解析class文件的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-01-01
  • Go 常量基礎(chǔ)概念(聲明更改只讀)

    Go 常量基礎(chǔ)概念(聲明更改只讀)

    這篇文章主要為大家介紹了Go常量基礎(chǔ)概念包括常量的聲明更改只讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • 超全講解Golang中defer關(guān)鍵字的用法

    超全講解Golang中defer關(guān)鍵字的用法

    本文將從一個(gè)資源回收問(wèn)題引入,引出defer關(guān)鍵字,并對(duì)其進(jìn)行基本介紹,從而讓大家對(duì)Go語(yǔ)言中的defer有更深入的了解,需要的小伙伴可以學(xué)習(xí)一下
    2023-05-05
  • Go語(yǔ)言如何實(shí)現(xiàn)TCP通信詳解

    Go語(yǔ)言如何實(shí)現(xiàn)TCP通信詳解

    go里面實(shí)現(xiàn)tcp沒(méi)有像之前寫(xiě)的C++那些那么麻煩,在C++里面要先創(chuàng)建套接字,然后綁定ip地址,go里面直接就一個(gè)函數(shù)建立套接字,然后在進(jìn)行通信就可以了,下面這篇文章主要給大家介紹了關(guān)于Go語(yǔ)言如何實(shí)現(xiàn)TCP通信的相關(guān)資料,需要的朋友可以參考下
    2023-01-01
  • 利用Golang解析json數(shù)據(jù)的方法示例

    利用Golang解析json數(shù)據(jù)的方法示例

    Go提供了原生的JSON庫(kù),并且與語(yǔ)言本身有效的集成在了一起。下面這篇文章將給大家介紹關(guān)于利用Golang解析json數(shù)據(jù)的方法,文中給出了詳細(xì)的示例代碼供大家參考學(xué)習(xí),需要的朋友們下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-07-07
  • 一文帶你了解Go語(yǔ)言中time包的時(shí)間常用操作

    一文帶你了解Go語(yǔ)言中time包的時(shí)間常用操作

    在日常開(kāi)發(fā)中,我們避免不了時(shí)間的使用,我們可能需要獲取當(dāng)前時(shí)間,然后格式化保存,也可能需要在時(shí)間類(lèi)型與字符串類(lèi)型之間相互轉(zhuǎn)換等。本文將會(huì)對(duì)?Go?time?包里面的常用函數(shù)和方法進(jìn)行介紹,需要的可以參考一下
    2022-12-12
  • 一文帶你深入了解Go語(yǔ)言中的事務(wù)

    一文帶你深入了解Go語(yǔ)言中的事務(wù)

    事務(wù)中止時(shí),你結(jié)束事務(wù)了嗎?在開(kāi)發(fā)時(shí)有可能就會(huì)犯這樣的錯(cuò)誤,其問(wèn)題就是你在提交事務(wù)時(shí),如果中間有其他業(yè)務(wù)就取消操作,那么事務(wù)也關(guān)閉了嗎?本文就來(lái)詳細(xì)講講
    2023-04-04
  • go語(yǔ)言實(shí)現(xiàn)十大常見(jiàn)的排序算法示例

    go語(yǔ)言實(shí)現(xiàn)十大常見(jiàn)的排序算法示例

    這篇文章主要為大家介紹了go語(yǔ)言實(shí)現(xiàn)十大常見(jiàn)的排序算法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • go結(jié)構(gòu)體嵌套的切片數(shù)組操作

    go結(jié)構(gòu)體嵌套的切片數(shù)組操作

    這篇文章主要介紹了go結(jié)構(gòu)體嵌套的切片數(shù)組操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-04-04
  • 如何讓shell終端和goland控制臺(tái)輸出彩色的文字

    如何讓shell終端和goland控制臺(tái)輸出彩色的文字

    這篇文章主要介紹了如何讓shell終端和goland控制臺(tái)輸出彩色的文字的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-05-05

最新評(píng)論