Golang如何將日志以Json格式輸出到Kafka
在上一篇文章中我實(shí)現(xiàn)了一個支持Debug、Info、Error等多個級別的日志庫,并將日志寫到了磁盤文件中,代碼比較簡單,適合練手。有興趣的可以通過這個鏈接前往:https://github.com/bosima/ylog/releases/tag/v1.0.1
工程實(shí)踐中,我們往往還需要對日志進(jìn)行采集,將日志歸集到一起,然后用于各種處理分析,比如生產(chǎn)環(huán)境上的錯誤分析、異常告警等等。在日志消息系統(tǒng)領(lǐng)域,Kafka久負(fù)盛名,這篇文章就以將日志發(fā)送到Kafka來實(shí)現(xiàn)日志的采集;同時考慮到日志分析時對結(jié)構(gòu)化數(shù)據(jù)的需求,這篇文章還會提供一種輸出Json格式日志的方法。
這個升級版的日志庫還要保持向前兼容,即還能夠使用普通文本格式,以及寫日志到磁盤文件,這兩個特性和要新增的兩個功能分別屬于同類處理,因此我這里對它們進(jìn)行抽象,形成兩個接口:格式化接口、寫日志接口。
格式化接口
所謂格式化,就是日志的格式處理。這個日志庫目前要支持兩種格式:普通文本和Json。
為了在不同格式之上提供一個統(tǒng)一的抽象,ylog中定義 logEntry 來代表一條日志:
type logEntry struct { Ts time.Time `json:"ts"` File string `json:"file"` Line int `json:"line"` Level LogLevel `json:"level"` Msg string `json:"msg"` }
格式化接口的能力就是將日志從logEntry格式轉(zhuǎn)化為其它某種數(shù)據(jù)格式。ylog中對它的定義是:
type LoggerFormatter interface { Format(*logEntry, *[]byte) error }
第1個參數(shù)是一個logEntry實(shí)例,也就是要被格式化的日志,第2個參數(shù)是日志格式化之后要寫入的容器。
普通文本格式化器
其實(shí)現(xiàn)是這樣的:
type textFormatter struct { } func NewTextFormatter() *textFormatter { return &textFormatter{} } func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error { formatTime(buf, entry.Ts) *buf = append(*buf, ' ') file := toShort(entry.File) *buf = append(*buf, file...) *buf = append(*buf, ':') itoa(buf, entry.Line, -1) *buf = append(*buf, ' ') *buf = append(*buf, levelNames[entry.Level]...) *buf = append(*buf, ' ') *buf = append(*buf, entry.Msg...) return nil }
可以看到它的主要功能就是將logEntry中的各個字段按照某種順序平鋪開來,中間用空格分隔。
其中的很多數(shù)據(jù)處理方法參考了Golang標(biāo)準(zhǔn)日志庫中的數(shù)據(jù)格式化處理代碼,有興趣的可以去Github中詳細(xì)查看。
這里對日期時間格式化為字符串做了特別的優(yōu)化,在標(biāo)準(zhǔn)日志庫中為了將年、月、日、時、分、秒、毫秒、微秒等格式化指定長度的字符串,使用了一個函數(shù):
func itoa(buf *[]byte, i int, wid int) { // Assemble decimal in reverse order. var b [20]byte bp := len(b) - 1 for i >= 10 || wid > 1 { wid-- q := i / 10 b[bp] = byte('0' + i - q*10) bp-- i = q } // i < 10 b[bp] = byte('0' + i) *buf = append(*buf, b[bp:]...) }
其邏輯大概就是將數(shù)字中的每一位轉(zhuǎn)換為字符并存入byte中,注意這里初始化byte數(shù)組的時候是20位,這是int64最大的數(shù)字位數(shù)。
其實(shí)時間字符串中的每個部分位數(shù)都是固定的,比如年是4位、月日時分秒都是2位,根本不需要20位,所以這個空間可以節(jié)??;還有這里用了循環(huán),這對于CPU的分支預(yù)測可能有那么點(diǎn)影響,所以我這里分別對不同位數(shù)寫了專門的格式化方法,以2位數(shù)為例:
func itoa2(buf *[]byte, i int) { q := i / 10 s := byte('0' + i - q*10) f := byte('0' + q) *buf = append(*buf, f, s) }
Json文本格式化器
其實(shí)現(xiàn)是這樣的:
type jsonFormatter struct { } func NewJsonFormatter() *jsonFormatter { return &jsonFormatter{} } func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) { entry.File = toShortFile(entry.File) jsonBuf, err := json.Marshal(entry) *buf = append(*buf, jsonBuf...) return }
代碼也很簡單,使用標(biāo)準(zhǔn)庫的json序列化方法將logEntry實(shí)例轉(zhuǎn)化為Json格式的數(shù)據(jù)。
對于Json格式,后續(xù)考慮支持用戶自定義Json字段,這里暫時先簡單處理。
寫日志接口
寫日志就是將日志輸出到別的目標(biāo),比如ylog要支持的輸出到磁盤文件、輸出到Kafka等。
前邊格式化接口將格式化后的數(shù)據(jù)封裝到了 []byte 中,寫日志接口就是將格式化處理的輸出 []byte 寫到某種輸出目標(biāo)中。參考Golang中各種Writer的定義,ylog中對它的定義是:
type LoggerWriter interface { Ensure(*logEntry) error Write([]byte) error Sync() error Close() error }
這里有4個方法:
- Ensure 確保輸出目標(biāo)已經(jīng)準(zhǔn)備好接收數(shù)據(jù),比如打開要寫入的文件、創(chuàng)建Kafka連接等等。
- Write 向輸出目標(biāo)寫數(shù)據(jù)。
- Sync 要求輸出目標(biāo)將緩存持久化,比如寫數(shù)據(jù)到磁盤時,操作系統(tǒng)會有緩存,通過這個方法要求緩存數(shù)據(jù)寫入磁盤。
- Close 寫日志結(jié)束,關(guān)閉輸出目標(biāo)。
寫日志到文件
這里定義一個名為fileWriter的類型,它需要實(shí)現(xiàn)LoggerWriter的接口。
先看類型的定義:
type fileWriter struct { file *os.File lastHour int64 Path string }
包含四個字段:
- file 要輸出的文件對象。
- lastHour 按照小時創(chuàng)建文件的需要。
- Path 日志文件的根路徑。
再看其實(shí)現(xiàn)的接口:
func (w *fileWriter) Ensure(entry *logEntry) (err error) { if w.file == nil { f, err := w.createFile(w.Path, entry.Ts) if err != nil { return err } w.lastHour = w.getTimeHour(entry.Ts) w.file = f return nil } currentHour := w.getTimeHour(entry.Ts) if w.lastHour != currentHour { _ = w.file.Close() f, err := w.createFile(w.Path, entry.Ts) if err != nil { return err } w.lastHour = currentHour w.file = f } return } func (w *fileWriter) Write(buf []byte) (err error) { buf = append(buf, '\n') _, err = w.file.Write(buf) return } func (w *fileWriter) Sync() error { return w.file.Sync() } func (w *fileWriter) Close() error { return w.file.Close() }
Ensure 中的主要邏輯是創(chuàng)建當(dāng)前要寫入的文件對象,如果小時數(shù)變了,先把之前的關(guān)閉,再創(chuàng)建一個新的文件。
Write 把數(shù)據(jù)寫入到文件對象,這里加了一個換行符,也就是說對于文件日志,其每條日志最后都會有一個換行符,這樣比較方便閱讀。
Sync 調(diào)用文件對象的Sync方法,將日志從操作系統(tǒng)緩存刷到磁盤。
Close 關(guān)閉當(dāng)前文件對象。
寫日志到Kafka
這里定義一個名為kafkaWriter的類型,它也需要實(shí)現(xiàn)LoggerWriter的接口。
先看其結(jié)構(gòu)體定義:
type kafkaWriter struct { Topic string Address string writer *kafka.Writer batchSize int }
這里包含四個字段:
Topic 寫Kafka時需要一個主題,這里默認(rèn)當(dāng)前Logger中所有日志使用同一個主題。
Address Kafka的訪問地址。
writer 向Kafka寫數(shù)據(jù)時使用的Writer,這里集成的是:github.com/segmentio/kafka-go,支持自動重試和重連。
batchSize Kafka寫日志的批次大小,批量寫可以提高日志的寫效率。
再看其實(shí)現(xiàn)的接口:
func (w *kafkaWriter) Ensure(curTime time.Time) (err error) { if w.writer == nil { w.writer = &kafka.Writer{ Addr: kafka.TCP(w.Address), Topic: w.Topic, BatchSize: w.batchSize, Async: true, } } return } func (w *kafkaWriter) Write(buf []byte) (err error) { // buf will be reused by ylog when this method return, // with aysnc write, we need copy data to a new slice kbuf := append([]byte(nil), buf...) err = w.writer.WriteMessages(context.Background(), kafka.Message{Value: kbuf}, ) return } func (w *kafkaWriter) Sync() error { return nil } func (w *kafkaWriter) Close() error { return w.writer.Close() }
這里采用的是異步發(fā)送到Kafka的方式,WriteMessages方法不會阻塞,因?yàn)閭魅氲腷uf要被ylog重用,所以這里copy了一下。異步還會存在的一個問題就是不會返回錯誤,可能丟失數(shù)據(jù),不過對于日志這種數(shù)據(jù),沒有那么嚴(yán)格的要求,也可以接受。
如果采用同步發(fā)送,因?yàn)榕堪l(fā)送比較有效率,這里可以攢幾條再發(fā),但日志比較稀疏時,可能短時間很難攢夠,就會出現(xiàn)長時間等不到日志的情況,所以還要有個超時機(jī)制,這有點(diǎn)麻煩,不過我也寫了一個版本,有興趣的可以去看看:https://github.com/bosima/ylog/blob/main/examples/kafka-writer.go
接口的組裝
有了格式化接口和寫日志接口,下一步就是將它們組裝起來,以實(shí)現(xiàn)相應(yīng)的處理能力。
首先是創(chuàng)建它們,因?yàn)槲疫@里也沒有動態(tài)配置的需求,所以就放到創(chuàng)建Logger實(shí)例的時候了,這樣比較簡單。
func NewYesLogger(opts ...Option) (logger *YesLogger) { logger = &YesLogger{} ... logger.writer = NewFileWriter("logs") logger.formatter = NewTextFormatter() for _, opt := range opts { opt(logger) } ... return }
可以看到默認(rèn)的formatter是textFormatter,默認(rèn)的writer是fileWriter。這個函數(shù)傳入的Option其實(shí)是個函數(shù),在下邊的opt(logger)中會執(zhí)行它們,所以使用其它的Formatter或者Writer可以這樣做:
logger := ylog.NewYesLogger( ... ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)), ylog.Formatter(ylog.NewJsonFormatter()), )
這里 ylog.Writer 和 ylog.Formatter 就是符合Option類型的函數(shù),調(diào)用它們可以設(shè)置不同的Formatter和Writer。
然后怎么使用它們呢?
... l.formatter.Format(entry, &buf) l.writer.Ensure(entry) err := l.writer.Write(buf) ...
當(dāng) logEntry 進(jìn)入消息處理環(huán)節(jié)后,首先調(diào)用formatter的Format方法格式化logEntry;然后調(diào)用了writer的Ensure方法確保writer已經(jīng)準(zhǔn)備好,最后調(diào)用writer的Write方法將格式化之后的數(shù)據(jù)輸出到對應(yīng)的目標(biāo)。
為什么不將Ensure方法放到Write中呢?這是因?yàn)槟壳皩懳谋救罩镜臅r候需要根據(jù)logEntry中的日志時間創(chuàng)建日志文件,這樣就需要給Writer傳遞兩個參數(shù),有點(diǎn)別扭,所以這里將它們分開了。
如何提高日志處理的吞吐量
Kafka的吞吐量是很高的,那么如果放到y(tǒng)log自身來說,如何提高它的吞吐量呢?
首先想到的就是Channel,可以使用有緩沖的Channel模擬一個隊(duì)列,生產(chǎn)者不停的向Channel發(fā)送數(shù)據(jù),如果Writer可以一直在緩沖被填滿之前將數(shù)據(jù)取走,那么理論上說生產(chǎn)者就是非阻塞的,相比同步輸出到某個Writer,沒有直接磁盤IO、網(wǎng)絡(luò)IO,日志處理的吞吐量必將大幅提升。
定義一個Channel,其容量默認(rèn)為當(dāng)前機(jī)器邏輯處理器的數(shù)量:
logger.pipe = make(chan *logEntry, runtime.NumCPU())
發(fā)送數(shù)據(jù)的代碼:
entry := &logEntry{ Level: level, Msg: s, File: file, Line: line, Ts: now, } l.pipe <- entry
接收數(shù)據(jù)的代碼:
for { select { case entry := <-l.pipe: // reuse the slice memory buf = buf[:0] l.formatter.Format(entry, &buf) l.writer.Ensure(entry.Ts) err := l.writer.Write(buf) ... } }
實(shí)際效果怎么樣呢?看下Benchmark:
goos: darwin goarch: amd64 pkg: github.com/bosima/ylog cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz BenchmarkInfo-8 1332333 871.6 ns/op 328 B/op 4 allocs/op
這個結(jié)果可以和zerolog、zap等高性能日志庫一較高下了,當(dāng)然目前可以做的事情要比它們簡單很多。
如果對Java有所了解的同學(xué)應(yīng)該聽說過log4j,在log4j2中引入了一個名為Disruptor的組件,它讓日志處理飛快了起來,受到很多Java開發(fā)者的追捧。Disruptor之所以這么厲害,是因?yàn)樗褂昧藷o鎖并發(fā)、環(huán)形隊(duì)列、緩存行填充等多種高級技術(shù)。
相比之下,Golang的Channel雖然也使用了環(huán)形緩沖,但是還是使用了鎖,作為隊(duì)列來說性能并不是最優(yōu)的。
Golang中有沒有類似的東西呢?最近出來的ZenQ可能是一個不錯的選擇,不過看似還不太穩(wěn)定,過段時間再嘗試下。有興趣的可以去看看:https://github.com/alphadose/ZenQ 。
好了,以上就是本文的主要內(nèi)容。關(guān)于ylog的介紹也告一段落了,后續(xù)會在Github上持續(xù)更新,增加更多有用的功能,并不斷優(yōu)化處理性能,歡迎關(guān)注:https://github.com/bosima/ylog 。
到此這篇關(guān)于Golang:將日志以Json格式輸出到Kafka的文章就介紹到這了,更多相關(guān)Golang日志輸出到Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang time包下定時器的實(shí)現(xiàn)方法
定時器的實(shí)現(xiàn)大家應(yīng)該都遇到過,最近在學(xué)習(xí)golang,所以下面這篇文章主要給大家介紹了關(guān)于golang time包下定時器的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-12-12golang Gorm與數(shù)據(jù)庫完整性約束詳解
這篇文章主要介紹了golang Gorm與數(shù)據(jù)庫完整性約束詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12深入淺出Go:掌握基礎(chǔ)知識的關(guān)鍵要點(diǎn)
Go是一種開源的編程語言,由Google開發(fā),它具有簡潔、高效、并發(fā)性強(qiáng)的特點(diǎn),適用于構(gòu)建可靠的、高性能的軟件系統(tǒng),本文將介紹Go的基礎(chǔ)知識,需要的朋友可以參考下2023-10-10