使用Golang實(shí)現(xiàn)流式輸出
流式輸出的深度剖析
之前一直在調(diào)用openai的key,只是照著文檔進(jìn)行流式調(diào)用,也只知其確是流式與api有所不同,而未成體系深究其實(shí)現(xiàn)原理。
就以openai的官方流式輸出為切入。
概述
流式輸出(Streaming Output)是 HTTP 響應(yīng)中的一種模式,服務(wù)器可以在生成部分內(nèi)容時(shí)立即將這些內(nèi)容發(fā)送給客戶端,而無(wú)需等待整個(gè)響應(yīng)內(nèi)容生成完成。這種方式常用于實(shí)時(shí)交互、高延遲操作或長(zhǎng)時(shí)間任務(wù)中,比如 OpenAI 的 GPT 模型生成流式對(duì)話。
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
// 定義必要的數(shù)據(jù)結(jié)構(gòu)
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type RequestBody struct {
Model string `json:"model"`
Messages []Message `json:"messages"`
Temperature float64 `json:"temperature"`
Stream bool `json:"stream"`
}
type Choice struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
}
type ResponseBody struct {
Choices []Choice `json:"choices"`
}
const (
apiURL = "https://api.example.com/v1/chat/completions" // 替換為實(shí)際的 API 地址
authToken = "your-auth-token" // 替換為實(shí)際的 Token
model = "gpt-3.5-turbo"
temperature = 0.7
)
func StreamHandler(w http.ResponseWriter, r *http.Request) {
// 從查詢參數(shù)獲取輸入內(nèi)容
content := r.URL.Query().Get("content")
if content == "" {
http.Error(w, "Missing 'content' parameter", http.StatusBadRequest)
return
}
// 構(gòu)造請(qǐng)求體
message := Message{
Role: "user",
Content: content,
}
requestBody := RequestBody{
Model: model,
Messages: []Message{message},
Temperature: temperature,
Stream: true,
}
jsonData, err := json.Marshal(requestBody)
if err != nil {
http.Error(w, "Failed to marshal request body", http.StatusInternalServerError)
return
}
// 創(chuàng)建 HTTP 請(qǐng)求
req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonData))
if err != nil {
http.Error(w, "Failed to create request", http.StatusInternalServerError)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+authToken)
// 設(shè)置 HTTP 客戶端
client := &http.Client{Timeout: time.Second * 50}
resp, err := client.Do(req)
if err != nil {
http.Error(w, "Failed to get response", http.StatusInternalServerError)
return
}
defer resp.Body.Close()
// 設(shè)置響應(yīng)頭,開(kāi)啟流式輸出
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 確保 ResponseWriter 支持 Flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
// 處理流式響應(yīng)
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// 處理以 "data: " 開(kāi)頭的行
if strings.HasPrefix(line, "data: ") {
line = strings.TrimPrefix(line, "data: ")
}
if line == "[DONE]" {
break
}
if line == "" {
continue
}
// 解析響應(yīng)內(nèi)容
var chunk ResponseBody
if err := json.Unmarshal([]byte(line), &chunk); err != nil {
continue
}
// 將響應(yīng)數(shù)據(jù)逐步發(fā)送給客戶端
for _, choice := range chunk.Choices {
content := choice.Delta.Content
_, err := w.Write([]byte(content))
if err != nil {
http.Error(w, "Failed to write response", http.StatusInternalServerError)
return
}
flusher.Flush() // 刷新緩沖區(qū)
}
}
if err := scanner.Err(); err != nil {
http.Error(w, "Scanner error", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/stream", StreamHandler)
fmt.Println("Server started at :8080")
http.ListenAndServe(":8080", nil)
}核心流程
接收到用戶輸入后,將其作為 content 參數(shù)發(fā)送給目標(biāo) API。
開(kāi)啟流式輸出模式,設(shè)置 Stream: true。
使用 http.Flusher 將從遠(yuǎn)程接口接收到的內(nèi)容逐步發(fā)送給客戶端。
關(guān)鍵點(diǎn)
1.流式響應(yīng)頭設(shè)置
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
實(shí)時(shí)輸出: 通過(guò) w.Write 輸出內(nèi)容后調(diào)用 flusher.Flush() 確保數(shù)據(jù)實(shí)時(shí)發(fā)送。
啟動(dòng)服務(wù)后,通過(guò)瀏覽器訪問(wèn)類似以下 URL:
http://localhost:8080/stream?content=Hello%20world
客戶端會(huì)逐步接收內(nèi)容,類似命令行實(shí)時(shí)打印。
1. HTTP 協(xié)議中的流式響應(yīng)
流式輸出利用 HTTP 協(xié)議的特性,不關(guān)閉連接,逐步將數(shù)據(jù)發(fā)送給客戶端。典型流式響應(yīng)會(huì)設(shè)置如下 HTTP Header:
Content-Type: text/event-stream表示這是一個(gè)事件流(Event Stream),用于向客戶端連續(xù)發(fā)送數(shù)據(jù)片段。
Cache-Control: no-cache防止響應(yīng)被緩存,以確??蛻舳私邮盏綄?shí)時(shí)內(nèi)容。
Connection: keep-alive 保持連接處于活躍狀態(tài),支持多次數(shù)據(jù)傳輸。
2. 流式輸出的工作原理
客戶端發(fā)起請(qǐng)求,服務(wù)器在接收到請(qǐng)求后開(kāi)始響應(yīng)。
服務(wù)器不一次性生成完整的響應(yīng)內(nèi)容,而是將生成的部分?jǐn)?shù)據(jù)逐段發(fā)送。
客戶端收到數(shù)據(jù)后立即處理,而無(wú)需等待完整響應(yīng)結(jié)束。
在數(shù)據(jù)發(fā)送完成后,服務(wù)器可以選擇關(guān)閉連接或保持連接以發(fā)送后續(xù)數(shù)據(jù)。
流式輸出的常見(jiàn)應(yīng)用場(chǎng)景
實(shí)時(shí)聊天:聊天模型逐詞/逐句生成時(shí),可以實(shí)時(shí)傳輸數(shù)據(jù)。
日志監(jiān)控:將服務(wù)器的實(shí)時(shí)日志逐行推送到前端。
流式文件傳輸:如大文件或視頻流傳輸。
實(shí)時(shí)進(jìn)度更新:如任務(wù)進(jìn)度條更新。
到此這篇關(guān)于使用Golang實(shí)現(xiàn)流式輸出的文章就介紹到這了,更多相關(guān)Golang流式輸出內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang上下文Context的常見(jiàn)應(yīng)用場(chǎng)景
Golang?context主要用于定義超時(shí)取消,取消后續(xù)操作,在不同操作中傳遞值。本文通過(guò)簡(jiǎn)單易懂的示例進(jìn)行說(shuō)明,感興趣的可以了解一下2023-04-04
Go type關(guān)鍵字(類型定義與類型別名的使用差異)用法實(shí)例探究
這篇文章主要為大家介紹了Go type關(guān)鍵字(類型定義與類型別名的使用差異)用法實(shí)例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
gorm 結(jié)構(gòu)體中 binding 和 msg 結(jié)構(gòu)體標(biāo)簽示例詳解
文章介紹了Gin框架中binding和msg結(jié)構(gòu)體標(biāo)簽的使用,包括基本用法、常用驗(yàn)證規(guī)則、自定義驗(yàn)證器、錯(cuò)誤信息自定義、控制器使用示例、組合驗(yàn)證規(guī)則、跨字段驗(yàn)證和初始化驗(yàn)證器等,這些標(biāo)簽主要用于數(shù)據(jù)驗(yàn)證、自定義錯(cuò)誤信息、參數(shù)綁定和表單驗(yàn)證2024-11-11
go語(yǔ)言使用pipe讀取子進(jìn)程標(biāo)準(zhǔn)輸出的方法
這篇文章主要介紹了go語(yǔ)言使用pipe讀取子進(jìn)程標(biāo)準(zhǔn)輸出的方法,實(shí)例分析了Go語(yǔ)言針對(duì)進(jìn)程操作的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03
Golang使用pprof檢查內(nèi)存泄漏的全過(guò)程
pprof 是golang提供的一款分析工具,可以分析CPU,內(nèi)存的使用情況,本篇文章關(guān)注它在分析內(nèi)存泄漏方面的應(yīng)用,本文給大家介紹了Golang使用pprof檢查內(nèi)存泄漏的全過(guò)程,文中通過(guò)代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2024-02-02
Golang構(gòu)建WebSocket服務(wù)器和客戶端的示例詳解
這篇文章主要為大家詳細(xì)介紹了如何使用Go語(yǔ)言構(gòu)建WebSocket服務(wù)器和客戶端,以實(shí)現(xiàn)雙向通信,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下2023-11-11

