Golang SSE 服務(wù)器端推送事件
寫在前面(愚蠢的我犯的錯(cuò)誤)
本應(yīng)該在EventStream的怎么都在響應(yīng)這里出現(xiàn)
后面通過查找問題知道EventSream
有特殊的回復(fù)格式為:data: [返回的內(nèi)容]\n\n
示例:data: success\n\n
返回success字符串
原因
我做了一個(gè)在線點(diǎn)贊的實(shí)時(shí)更新的小玩意,我想著實(shí)時(shí)更新WS全雙工用不著。
SSE介紹
SSE(Server-Sent Event)是一種用于客戶端與服務(wù)器端實(shí)時(shí)通訊的技術(shù)。它允許服務(wù)器端發(fā)送事件到客戶端,客戶端可以通過 EventSource 接口來接收這些事件。通常情況下,SSE 是基于 HTTP 協(xié)議實(shí)現(xiàn)的,它不需要建立和維護(hù)長連接,但服務(wù)器可以長時(shí)間向客戶端推送數(shù)據(jù),而客戶端只需要等待并處理收到的數(shù)據(jù)即可。
Golang實(shí)現(xiàn)方式
SSE核心代碼
//sse Server-Sent-Events 服務(wù)事件流 http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { // 設(shè)置響應(yīng)頭,表明這是一個(gè) SSE 連接 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") //設(shè)置為刷新模式 flush, ok := w.(http.Flusher) flush.Flush() if !ok { //判斷是否轉(zhuǎn)換成功,不成功則返回錯(cuò)誤信息 responseInfo(http.StatusOK, "response cannot convert http.Flusher", w) return } //這里因?yàn)槲覄?chuàng)建了一個(gè)Map用來存儲(chǔ)響應(yīng)IO和Flush刷新, //我在其他地方可以使用遍歷進(jìn)行給各個(gè)通信端進(jìn)行發(fā)送信息 respFlushMap[&w] = &flush select { case <-r.Context().Done(): delete(respFlushMap, &w) return } })
發(fā)送事件請(qǐng)求
func main(){ //.... //點(diǎn)贊評(píng)論 http.HandleFunc("/favorite", favorite(client)) //... } // 點(diǎn)贊 func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) { var lock = sync.RWMutex{} return func(w http.ResponseWriter, r *http.Request) { /* 業(yè)務(wù)處理邏輯 ...... */ //核心代碼 將點(diǎn)贊信息發(fā)送到各個(gè)SSE for writer, flusher := range respFlushMap { //一定要是這個(gè)格式“data: [數(shù)據(jù)內(nèi)容]\n\n”不然前端不會(huì)體現(xiàn)在ServeEvent中而出現(xiàn)在response中 fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method) (*flusher).Flush() } } }
全部代碼,包含了一些處理邏輯,可能比較混亂建議還是看看之前的
package main import ( "context" "encoding/json" "fmt" "github.com/go-redis/redis/v8" "html/template" "log" "math/rand" "net" "net/http" "strconv" "sync" "time" ) var commentNodeHashRedisKey = "commentNodeHashRedisKey" var commentNodeSorterSetRedisKey = "commentNodeSorterSetRedisKey" var respFlushMap = make(map[*http.ResponseWriter]*http.Flusher) type CommentNode struct { Content string `json:"content"` //內(nèi)容 Score float64 `json:"score"` //點(diǎn)贊數(shù) IP string `json:"ip"` //IP NickName string `json:"nickName"` //昵稱 IsFavorite bool `json:"isFavorite"` //是否點(diǎn)贊 Member string `json:"member"` //唯一值 } func main() { //靜態(tài)資源文件 staticServer := http.FileServer(http.Dir("./template")) //處理靜態(tài)資源文件 http.Handle("/static/", http.StripPrefix("/static/", staticServer)) //創(chuàng)建客戶端 client := redis.NewClient(&redis.Options{ Addr: "192.168.192.170:6379", Password: "", DB: 0, }) //判斷時(shí)候連接成功 err := client.Ping(context.Background()).Err() if err != nil { log.Println("連接錯(cuò)誤: ", err.Error()) } log.Println("連接成功") //添加評(píng)論 http.HandleFunc("/addComment", addComment(client)) //點(diǎn)贊評(píng)論 http.HandleFunc("/favorite", favorite(client)) //sse Server-Sent-Events 服務(wù)事件流 http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { // 設(shè)置響應(yīng)頭,表明這是一個(gè) SSE 連接 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") //設(shè)置為刷新模式 flush, ok := w.(http.Flusher) flush.Flush() if !ok { responseInfo(http.StatusOK, "response cannot convert http.Flusher", w) return } respFlushMap[&w] = &flush ticker := time.NewTicker(time.Second) defer ticker.Stop() select { case <-r.Context().Done(): delete(respFlushMap, &w) return } }) http.HandleFunc("/commentList", commentList(client)) //主頁 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) { //獲取模板 indexFile, err := template.ParseFiles("./template/index.html") if err != nil { log.Println(err.Error()) resp.Write([]byte("./template/index.html not found")) return } //將內(nèi)容輸出 indexFile.Execute(resp, nil) }) //啟動(dòng)服務(wù) if err := http.ListenAndServe(":80", nil); err != nil { log.Println("啟動(dòng)服務(wù)失敗!" + err.Error()) } } // 添加評(píng)論 func addComment(client *redis.Client) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() nickname := query.Get("nickName") if nickname == "" { nickname = "逸士" } //判斷內(nèi)容是否為空 content := query.Get("content") if content == "" { responseInfo(http.StatusBadRequest, "your comment content is empty", w) return } host, _, _ := net.SplitHostPort(r.RemoteAddr) //使用時(shí)間戳 member := fmt.Sprint(time.Now().UnixMilli() ^ rand.Int63()) //序列化 comment, _ := json.Marshal(CommentNode{ Member: member, IP: host, NickName: nickname, Content: content, }) //添加到隊(duì)列中 client.HSet(r.Context(), commentNodeHashRedisKey, member, string(comment)) //更新排行 client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{ Score: 0, Member: member, }) responseInfo(http.StatusOK, "add comment success", w) } } // 點(diǎn)贊 func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) { var lock = sync.RWMutex{} return func(w http.ResponseWriter, r *http.Request) { lock.Lock() defer lock.Unlock() //查詢成員(member)是否存在 query := r.URL.Query() member := query.Get("member") method := r.Method if member == "" { responseInfo(http.StatusBadRequest, "member cannot be empty", w) lock.Unlock() return } //獲取分?jǐn)?shù) score, err := client.ZScore(r.Context(), commentNodeSorterSetRedisKey, member).Result() //點(diǎn)贊減少 if method == http.MethodDelete { score -= 2 } score++ //更新排行 client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{ Score: score, Member: member, }) if err != nil { //不存在返回錯(cuò)誤 responseInfo(http.StatusBadRequest, "member does not exists", w) return } //更新分?jǐn)?shù) var commentNode CommentNode commentNodeStr, err := client.HGet(r.Context(), commentNodeHashRedisKey, member).Result() if err != nil { responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w) return } err = json.Unmarshal([]byte(commentNodeStr), &commentNode) if err != nil { responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w) return } commentNode.Score = score data, _ := json.Marshal(commentNode) if err = client.HSet(r.Context(), commentNodeHashRedisKey, member, data).Err(); err != nil { responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w) return } //返回成功 responseInfo(http.StatusOK, "favorite comment success", w) //將點(diǎn)贊信息發(fā)送到各個(gè)SSE for writer, flusher := range respFlushMap { fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method) (*flusher).Flush() } } } // 評(píng)論列表 func commentList(client *redis.Client) func(resp http.ResponseWriter, req *http.Request) { return func(resp http.ResponseWriter, req *http.Request) { query := req.URL.Query() offset, err := strconv.Atoi(query.Get("offset")) if err != nil { offset = 100 } //連接人地址 connectionAddr := req.RemoteAddr log.Printf("連接人地址: %s\n", connectionAddr) //獲取offset偏移量的排行 result, err := client.ZRevRangeWithScores(req.Context(), commentNodeSorterSetRedisKey, 0, int64(offset-1)).Result() if err != nil || result == nil { responseInfo(http.StatusOK, fmt.Sprint("錯(cuò)誤:", err), resp) return } //獲取評(píng)論詳細(xì)信息 members := make([]string, 0) scopeMap := make(map[string]float64) for _, item := range result { members = append(members, item.Member.(string)) scopeMap[item.Member.(string)] = item.Score } rlt, err := client.HMGet(req.Context(), commentNodeHashRedisKey, members...).Result() if err != nil { responseInfo(http.StatusInternalServerError, err.Error(), resp) return } data, _ := json.Marshal(rlt) responseInfo(http.StatusOK, string(data), resp) } } func responseInfo(code int, info string, w http.ResponseWriter) { w.WriteHeader(code) w.Write([]byte(info)) }
到此這篇關(guān)于Golang SSE 服務(wù)器端推送事件的文章就介紹到這了,更多相關(guān)Golang SSE推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Golang利用反射reflect動(dòng)態(tài)調(diào)用方法
這篇文章主要介紹了詳解Golang利用反射reflect動(dòng)態(tài)調(diào)用方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-11-11淺談?dòng)肎o構(gòu)建不可變的數(shù)據(jù)結(jié)構(gòu)的方法
這篇文章主要介紹了用Go構(gòu)建不可變的數(shù)據(jù)結(jié)構(gòu)的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09Go語言實(shí)現(xiàn)一個(gè)Http Server框架(二) Server的抽象
上一篇文章對(duì)http庫的基本使用做了說明,這篇文章主要介紹了如何實(shí)現(xiàn)一個(gè)簡單地httpServer,文中代碼示例非常詳細(xì),感興趣的朋友可以參考下2023-04-04Go語言實(shí)現(xiàn)類似c++中的多態(tài)功能實(shí)例
Go本身不具有多態(tài)的特性,不能夠像Java、C++那樣編寫多態(tài)類、多態(tài)方法。但是,使用Go可以編寫具有多態(tài)功能的類綁定的方法。下面來一起看看吧2016-09-09