Golang監(jiān)聽日志文件并發(fā)送到kafka中
前言
日志收集項目的準(zhǔn)備中,本文主要講的是利用golang的tail庫,監(jiān)聽日志文件的變動,將日志信息發(fā)送到kafka中。
涉及的golang庫和可視化工具:
go-ini,sarama,tail其中:
go-ini:用于讀取配置文件,統(tǒng)一管理配置項,有利于后其的維護sarama:是一個go操作kafka的客戶端。目前我用于向kefka發(fā)送消息tail:類似于linux的tail命令了,讀取文件的后幾行。如果文件有追加數(shù)據(jù),會檢測到。就是通過它來監(jiān)聽日志文件
可視化工具:
offsetexplorer:是kafka的可視化工具,這里用來查看消息是否投遞成功
工作的流程
- 加載配置,初始化
sarama和kafka。 - 起一個的協(xié)程,利用
tail不斷去監(jiān)聽日志文件的變化。 - 主協(xié)程中一直阻塞等待
tail發(fā)送消息,兩者通過一個管道通訊。一旦主協(xié)程接收到新日志,組裝格式,然后發(fā)送到kafka中

環(huán)境準(zhǔn)備
環(huán)境的話,確保zookeeper和kafka正常運行。因為還沒有使用sarama讀取數(shù)據(jù),使用offsetexplorer來查看任務(wù)是否真的投遞成功了。
代碼分層
serve來存放寫tail服務(wù)類和sarama服務(wù)類,conf存放ini配置文件
main函數(shù)為程序入口

關(guān)鍵的代碼
main.go
main函數(shù)做的有:構(gòu)建配置結(jié)構(gòu)體,映射配置文件。調(diào)用和初始化tail,srama服務(wù)。
package main
import (
"fmt"
"sarama/serve"
"github.com/go-ini/ini"
)
type KafkaConfig struct {
Address string `ini:"address"`
ChannelSize int `ini:"chan_size"`
}
type TailConfig struct {
Path string `ini:"path"`
Filename string `ini:"fileName"`
// 如果是結(jié)構(gòu)體,則指明分區(qū)名
Children `ini:"tailfile.children"`
}
type Config struct {
KafkaConfig `ini:"kafka"`
TailConfig `ini:"tailfile"`
}
type Children struct {
Name string `ini:"name"`
}
func main() {
// 加載配置
var cfg = new(Config)
err := ini.MapTo(cfg, "./conf/go-conf.ini")
if err != nil {
fmt.Print(err)
}
// 初始化kafka
ks := &serve.KafukaServe{}
// 啟動kafka消息監(jiān)聽。異步
ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize))
// 關(guān)閉主協(xié)程時,關(guān)閉channel
defer ks.Destruct()
// 初始化tail
ts := &serve.TailServe{}
ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename)
// 阻塞
ts.Listener(ks.MsgChan)
}
kafka.go
有3個方法 :
InitKafka,組裝配置項以及初始化接收消息的管道,Listener,監(jiān)聽管道消息,收到消息后,將消息組裝,發(fā)送到kafkaDestruct, 關(guān)閉管道
package serve
import (
"fmt"
"github.com/Shopify/sarama"
)
type KafukaServe struct {
MsgChan chan string
//err error
}
func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) {
// 讀取配置
config := sarama.NewConfig()
// 1. 初始化生產(chǎn)者配置
config.Producer.RequiredAcks = sarama.WaitForAll
// 選擇分區(qū)
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的信息
config.Producer.Return.Successes = true
ks.MsgChan = make(chan string, chanSize)
go ks.Listener(addr, chanSize, config)
}
func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) {
// 連接kafka
var kafkaClient, _ = sarama.NewSyncProducer(addr, config)
defer kafkaClient.Close()
for {
select {
case content := <-ks.MsgChan:
//
msg := &sarama.ProducerMessage{
Topic: "weblog",
Value: sarama.StringEncoder(content),
}
partition, offset, err := kafkaClient.SendMessage(msg)
if err != nil {
fmt.Println(err)
}
fmt.Println("分區(qū),偏移量:")
fmt.Println(partition, offset)
fmt.Println("___")
}
}
}
func (ks *KafukaServe) Destruct() {
close(ks.MsgChan)
}
tail.go
主要包括了兩個方法:
TailInit初始化,組裝tail配置。ListenerListener,保存kafka服務(wù)類初始化之后的管道。監(jiān)聽日志文件,如果有新日志,就往管道里發(fā)送
package serve
import (
"fmt"
"github.com/hpcloud/tail"
)
type TailServe struct {
tails *tail.Tail
}
func (ts *TailServe) TailInit(filenName string) {
config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
// 打開文件開始讀取數(shù)據(jù)
ts.tails, _ = tail.TailFile(filenName, config)
// if err != nil {
// fmt.Println("tails %s failed,err:%v\n", filenName, err)
// return nil, err
// }
fmt.Println("啟動," + filenName + "監(jiān)聽")
}
func (ts *TailServe) Listener(MsgChan chan string) {
for {
msg, ok := <-ts.tails.Lines
if !ok {
// todo
fmt.Println("數(shù)據(jù)接收失敗")
return
}
fmt.Println(msg.Text)
MsgChan <- msg.Text
}
}
// 測試案例
func Demo() {
filename := `E:\xx.log`
config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
// 打開文件開始讀取數(shù)據(jù)
tails, err := tail.TailFile(filename, config)
if err != nil {
fmt.Println("tails %s failed,err:%v\n", filename, err)
return
}
var (
msg *tail.Line
ok bool
)
fmt.Println("啟動")
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Println("tails file close reopen,filename:$s\n", tails.Filename)
}
fmt.Println("msg:", msg.Text)
}
}
到此這篇關(guān)于Golang監(jiān)聽日志文件并發(fā)送到kafka中的文章就介紹到這了,更多相關(guān)Golang 監(jiān)聽日志文件 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go并發(fā)數(shù)據(jù)一致性事務(wù)的保障面試應(yīng)答
這篇文章主要為大家介紹了go并發(fā)數(shù)據(jù)一致性事務(wù)的保障面試應(yīng)答,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12
GoFrame框架gredis優(yōu)雅的取值和類型轉(zhuǎn)換
這篇文章主要為大家介紹了GoFrame框架gredis優(yōu)雅的取值和類型轉(zhuǎn)換,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06
Win10系統(tǒng)下Golang環(huán)境搭建全過程
在編程語言的選取上,越來越多的人選擇了Golang,下面這篇文章主要給大家介紹了關(guān)于Win10系統(tǒng)下Golang環(huán)境搭建的相關(guān)資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-01-01
Go Uber靜態(tài)分析工具NilAway使用初體驗
這篇文章主要介紹了Go Uber靜態(tài)分析工具NilAway使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01

