使用Golang簡(jiǎn)單實(shí)現(xiàn)七牛圖片處理API
之前一直在用qiniu的存儲(chǔ)服務(wù),生成圖片的縮略圖,模糊圖,視頻的webp,現(xiàn)在需要把存儲(chǔ)移到s3上,那么這些圖片,視頻處理就要自己動(dòng)手寫了,本文梳理一下大致的思路。
分析需求
先看一下qiniu的接口是如何處理圖片的,例如先截取視頻第一秒的圖片,再把圖片縮略,最后存儲(chǔ)到一個(gè)新的key,命令可以這么寫 vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/xxx, 可以看到三個(gè)操作之間用 | 符號(hào)分割,類似unix 的 pipe 操作。
上面的操作算作一個(gè)cmd, 一次API請(qǐng)求可以同時(shí)處理多個(gè)cmd,cmd之間用分號(hào)分割, 處理完畢后,在回調(diào)中把處理結(jié)果返回,例如
{
"id": "xxxxx",
"pipeline": "xxx",
"code": 0,
"desc": "The fop was completed successfully",
"reqid": "xTsAAFnxUbR5J10U",
"inputBucket": "xxx",
"inputKey": "xxxxx",
"items": [
{
"cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzA=",
"code": 0,
"desc": "The fop was completed successfully",
"hash": "FhdN6V8EI4vW4XJGALSfxutvMEIv",
"key": "xx",
"returnOld": 0
},
{
"cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|imageMogr2/blur/45x8|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzBfYmx1cg==",
"code": 0,
"desc": "The fop was completed successfully",
"hash": "FgNiRzrCsa7TZx1xVSb_4d5TiaK3",
"key": "xxx",
"returnOld": 0
}
]
}
分解需求
這個(gè)程序大致需要這么幾個(gè)部分:
一個(gè)http接口,接受任務(wù),接受后把任務(wù)扔到隊(duì)列,返回一個(gè)job ID。 worker異步處理任務(wù),worker的個(gè)數(shù) 和 每個(gè)worker 并行的處理的個(gè)數(shù) 能夠配置,worker有重試機(jī)制。
從 job payload 中解析出需要做的任務(wù),解析出每個(gè)cmd, 最好能并行執(zhí)行每一個(gè) cmd, 記錄每一個(gè)cmd的結(jié)果
每個(gè)cmd中有多個(gè) operation, 并且用 pipe 連接,前一個(gè)operaion的輸出是后一個(gè)operation的輸入
可以把 1 和 2,3 分開(kāi)來(lái)看,1 比較獨(dú)立,之前寫過(guò)一個(gè)worker的模型,參考的是這篇文章 Handling 1 Million Requests per Minute with Go,比較詳細(xì),是用 go channel 作為queue的,我加了一個(gè) beanstalk 作為 queue的 providor。還有一點(diǎn)改進(jìn)是,文章中只提供了worker數(shù)量的設(shè)置,我再加了一個(gè)參數(shù),設(shè)定每個(gè)worker可以并行執(zhí)行的協(xié)程數(shù)。所以下面主要講講3, 2的解決辦法
Pipe
可以參考這個(gè)庫(kù) pipe, 用法如下:
p := pipe.Line(
pipe.ReadFile("test.png"),
resize(300, 300),
blur(0.5),
)
output, err := pipe.CombinedOutput(p)
if err != nil {
fmt.Printf("%v\n", err)
}
buf := bytes.NewBuffer(output)
img, _ := imaging.Decode(buf)
imaging.Save(img, "test_a.png")
還是比較方便的,建一個(gè) Cmd struct, 利用正則匹配一下每個(gè) Operation 的參數(shù),放入一個(gè) []Op slice, 最后執(zhí)行,struct和方法如下:
type Cmd struct {
cmd string
saveas string
ops []Op
err error
}
type Op interface {
getPipe() pipe.Pipe
}
type ResizeOp struct {
width, height int
}
func (c ResizeOp) getPipe() pipe.Pipe {
return resize(c.width, c.height)
}
//使用方法
cmdStr := `file/test.png|thumbnail/x300|blur/20x8`
cmd := Cmd{cmdStr, "test_b.png", nil, nil}
cmd.parse()
cmd.doOps()
sync.WaitGroup
單個(gè)cmd處理解決后,就是多個(gè)cmd的并行問(wèn)題,沒(méi)啥好想的,直接用 sync.WaitGroup 就可以完美解決。一步一步來(lái),我們先看看這個(gè)struct的使用方法:
func main() {
cmds := []string{}
for i := 0; i < 10000; i++ {
cmds = append(cmds, fmt.Sprintf("cmd-%d", i))
}
results := handleCmds(cmds)
fmt.Println(len(results)) // 10000
}
func doCmd(cmd string) string {
return fmt.Sprintf("cmd=%s", cmd)
}
func handleCmds(cmds []string) (results []string) {
fmt.Println(len(cmds)) //10000
var count uint64
group := sync.WaitGroup{}
lock := sync.Mutex{}
for _, item := range cmds {
// 計(jì)數(shù)加一
group.Add(1)
go func(cmd string) {
result := doCmd(cmd)
atomic.AddUint64(&count, 1)
lock.Lock()
results = append(results, result)
lock.Unlock()
// 計(jì)數(shù)減一
group.Done()
}(item)
}
// 阻塞
group.Wait()
fmt.Printf("count=%d \n", count) // 10000
return
}
group本質(zhì)大概是一個(gè)計(jì)數(shù)器,計(jì)數(shù) > 0時(shí), group.Wait() 會(huì)阻塞,直到 計(jì)數(shù) == 0. 這里還有一點(diǎn)要注意,就是 results = append(results, result) 的操作是線程不安全的,清楚這里 results 是共享的,需要加鎖來(lái)保證同步,否則最后 len(results) 不為 10000。
我們建一個(gè)BenchCmd, 來(lái)存放 cmds. 如下:
type BenchCmd struct {
cmds []Cmd
waitGroup sync.WaitGroup
errs []error
lock sync.Mutex
}
func (b *BenchCmd) doCmds() {
for _, item := range b.cmds {
b.waitGroup.Add(1)
go func(cmd Cmd) {
cmd.parse()
err := cmd.doOps()
b.lock.Lock()
b.errs = append(b.errs, err)
b.lock.Unlock()
b.waitGroup.Done()
}(item)
}
b.waitGroup.Wait()
}
最后的調(diào)用就像這樣:
var cmds []Cmd
cmd_a := Cmd{`file/test.png|thumbnail/x300|blur/20x8`, "test_a.png", nil, nil}
cmd_b := Cmd{`file/test.png|thumbnail/500x1000|blur/20x108`, "test_b.png", nil, nil}
cmd_c := Cmd{`file/test.png|thumbnail/300x300`, "test_c.png", nil, nil}
cmds = append(cmds, cmd_a)
cmds = append(cmds, cmd_b)
cmds = append(cmds, cmd_c)
bench := BenchCmd{
cmds: cmds,
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
bench.doCmds()
fmt.Println(bench.errs)
這只是一個(gè)初級(jí)的實(shí)驗(yàn),思考還不夠全面,并且只是模仿API,qiniu應(yīng)該不是這么做的,耦合更低,可能各個(gè)Cmd都有各自處理的集群,那pipe這個(gè)庫(kù)就暫時(shí)沒(méi)法解決了,目前的局限在于 每個(gè)Cmd必須都在一個(gè)進(jìn)程中。
相關(guān)文章
Go語(yǔ)言實(shí)現(xiàn)讀取文件的方式總結(jié)
這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言實(shí)現(xiàn)讀取文件的幾個(gè)方式,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)Go語(yǔ)言有一定的幫助,感興趣的小伙伴可以收藏一下2023-04-04
使用gorm.Scopes函數(shù)實(shí)現(xiàn)復(fù)用查詢邏輯示例
這篇文章主要為大家介紹了使用gorm.Scopes函數(shù)實(shí)現(xiàn)復(fù)用查詢邏輯示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12
golang編程開(kāi)發(fā)使用sort排序示例詳解
這篇文章主要為大家介紹了go語(yǔ)言編程開(kāi)發(fā)使用sort來(lái)排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2021-11-11
Golang?手寫一個(gè)簡(jiǎn)單的并發(fā)任務(wù)?manager
這篇文章主要介紹了Golang?手寫一個(gè)簡(jiǎn)單的并發(fā)任務(wù)?manager,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-08-08
Go語(yǔ)言中比較兩個(gè)map[string]interface{}是否相等
本文主要介紹了Go語(yǔ)言中比較兩個(gè)map[string]interface{}是否相等,我們可以將其轉(zhuǎn)化成順序一樣的 slice ,然后再轉(zhuǎn)化未json,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08

