GoLang日志監(jiān)控系統(tǒng)實(shí)現(xiàn)
日志監(jiān)控系統(tǒng)
Nginx(日志文件) -> log_process (實(shí)時(shí)讀取解析寫入) -> influxdb(存儲(chǔ)) ->grafana(前端日志展示器)
influxdb 屬于GO語言編寫的開源的時(shí)序型數(shù)據(jù),著力于高性能 查詢與存儲(chǔ)時(shí)序型數(shù)據(jù),influxdb 廣泛的應(yīng)用于存儲(chǔ)系統(tǒng)的監(jiān)控?cái)?shù)據(jù),IOT行業(yè)的實(shí)時(shí)數(shù)據(jù)。
目前市面上流行 TSDB(時(shí)序型處理數(shù)據(jù)庫(kù)):influxDB, TimescaleDB, QuestDBinfluxDB 類似于NOSQL體驗(yàn),自動(dòng)適合標(biāo)記集模型的技術(shù)的數(shù)據(jù)集;TimescaleDB 與 postgreSQL 兼容, 更加適合物聯(lián)網(wǎng)數(shù)據(jù),與PostgreSQL更好的兼容QuestDB: 支持InfluxDB內(nèi)聯(lián)協(xié)議和PostgreSQL, 但是生態(tài)問題比較大
項(xiàng)目簡(jiǎn)答介紹
本日志系統(tǒng) DEMO,但是可以直接使用到生產(chǎn)環(huán)境上面,使用LOG_Process 讀取Nginx ./Access.log, 使用influxDB 進(jìn)行存取
log_process -path ./access.log influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s
常見并發(fā)模型
- 解決C10k 的問題 采用異步非阻塞的模型(Nginx, libevent, NodeJS)-- 問題 復(fù)雜度高 大量回調(diào)函數(shù)
- 協(xié)程(Go,Erlang, lua): 協(xié)線性函數(shù)一樣寫代碼;理解根加輕量級(jí)別的線程
- 程序并行執(zhí)行 go foo() // 執(zhí)行函數(shù)
- mgs:= <- c 多個(gè)gorountine 需要進(jìn)行通信
- select 從多個(gè)channel 中讀取數(shù)據(jù) ,多個(gè) channel 隨機(jī)選擇一個(gè)進(jìn)行消費(fèi)
- 并發(fā): 一個(gè)任務(wù)通過調(diào)度器讓任務(wù)看起來運(yùn)行 屬于單核CPU(邏輯運(yùn)行)對(duì)于IO密集型比較友好
- 并行:任務(wù)真正的運(yùn)行
在go 語言中 并發(fā)執(zhí)行 ,使用三個(gè)不同 gorountine, 一個(gè)負(fù)責(zé)裝填,一個(gè)負(fù)責(zé)運(yùn)輸,一個(gè)負(fù)責(zé)處理 ,讓程序并發(fā)的運(yùn)行起來,讓任務(wù)更加的職責(zé)單一化 這種思想 也可以將 日志解析讀取,寫入模塊進(jìn)行單獨(dú)小模塊,每個(gè)模塊讓使用gorountine ,通過channel 數(shù)據(jù)交互,至于這么多gorountine 是在一個(gè)CPU調(diào)度執(zhí)行還是分配到多個(gè)CPU上進(jìn)行執(zhí)行 ,取決于系統(tǒng).
go 語言有自己的調(diào)度器, go fun() 屬于一個(gè)獨(dú)立的工作單元,go的調(diào)度器,根據(jù)每個(gè)可用的物理處理器分配一個(gè)邏輯處理器,通過這個(gè)邏輯處理器對(duì) 獨(dú)立單元進(jìn)行處理,
通過設(shè)置: runtime.GOMAXPROCS(1)//給調(diào)度器分配多小個(gè)具體的邏輯處理器
一臺(tái)服務(wù)器的 物理處理器越多 ,go 獲取到邏輯處理器也越多,導(dǎo)致器允許速度越快。 參考:傳送門
系統(tǒng)架構(gòu)
日志解析的基本流程化的偽函數(shù),如下的函數(shù)有兩個(gè)缺陷,解析介入和解析后輸出只能寫死,所以需要進(jìn)行擴(kuò)展,接口方式進(jìn)行擴(kuò)展
package main
import (
"fmt"
"strings"
"time"
)
/**
* 日志解析系統(tǒng)分為: 解析,讀取,寫入
*/
type LogProcess struct {
path string // 讀取文件路徑
influxDBDsn string // influx data source
rc chan string // read module to process
wc chan string // process to influx
}
// 返回函數(shù)使用 指針, 結(jié)構(gòu)體很大 不需要進(jìn)行拷貝 性能優(yōu)化
func (l *LogProcess) ReadFromFile() {
// 文件讀取模塊
line := "message"
l.rc <- line
}
func (l *LogProcess) Process() {
// 文件解析模塊
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
func (l *LogProcess) writeToInfluxDB() {
fmt.Println(<-l.wc)
}
func main() {
// lp 引用類型
lp := &LogProcess{
path: "./tmp/access.log",
influxDBDsn: "username&password...",
rc: make(chan string),
wc: make(chan string),
}
// tree goroutine run
go lp.ReadFromFile()
go lp.Process()
// 需要定義 chan 將 Process 數(shù)據(jù) 傳遞給 influxDB
go lp.writeToInfluxDB()
time.Sleep(2 * time.Second)
}接口方式約束 輸入和輸出 進(jìn)行優(yōu)化
package main
import (
"fmt"
"strings"
"time"
)
/**
* 日志解析系統(tǒng)分為: 解析,讀取,寫入
*/
type LogProcess struct {
rc chan string // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// 文件解析模塊
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
fmt.Println(<-wc)
}
type Read interface {
read(rc chan string)
}
type ReadFromFile struct {
path string // 讀取文件
}
func (r *ReadFromFile) read(rc chan string) {
// 讀取模塊
line := "message"
rc <- line
}
func main() {
// lp 引用類型
r := &ReadFromFile{
path: "./tmp/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}
// 通過接口方式 約束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通過參數(shù)注入方式
time.Sleep(2 * time.Second)
}讀取模塊具體實(shí)現(xiàn)
從上次讀取光標(biāo)后開始逐行進(jìn)行讀取,無需每次都全部文件讀取
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"time"
)
/**
* 日志解析系統(tǒng)分為: 解析,讀取,寫入
*/
type LogProcess struct {
rc chan []byte // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// 文件解析模塊
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
// wc 通道另外一種讀取方式
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // 讀取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
// 實(shí)時(shí)系統(tǒng): 從文件末尾逐行進(jìn)行讀取
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintln("open file error:%s", err.Error()))
}
// 文件末尾最開始進(jìn)行讀取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d讀取到文件末尾, 日志還沒有寫入
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp 引用類型
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan string),
read: r,
write: w,
}
// 通過接口方式 約束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通過參數(shù)注入方式
time.Sleep(100 * time.Second)
}日志解析模塊
- 沖Read Chan 中讀取每一行數(shù)據(jù)
- 正則方式提取所需要的監(jiān)控?cái)?shù)據(jù)
- 將數(shù)據(jù)寫入到influxDB
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"regexp"
"strconv"
"time"
)
/**
* 日志解析系統(tǒng)分為: 解析,讀取,寫入
*/
type LogProcess struct {
rc chan []byte // read module to process
wc chan *Message // process to influx
read Read
write Writer
}
//日志寫入結(jié)構(gòu)體
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// 通過正則表達(dá)式進(jìn)行解析數(shù)據(jù)
r := regexp.MustCompile(`(\s*)`)
loc, _ := time.LoadLocation("Asia/shanghai")
// 文件解析模塊
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 13 {
log.Println("FindStringSub match fail:", string(v))
continue
}
message := &Message{
}
location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[4])
}
message.TimeLocal = location
// 字符串類型轉(zhuǎn)換成int
atoi, err := strconv.Atoi(ret[8])
if err != nil {
log.Println("strconv.Atoi fail:", err.Error(), ret[4])
}
message.BytesSent = atoi
l.wc <- message
}
}
type Writer interface {
writer(wc chan *Message)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
// wc 通道另外一種讀取方式
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // 讀取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
// 實(shí)時(shí)系統(tǒng): 從文件末尾逐行進(jìn)行讀取
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s\n", err.Error()))
}
// 文件末尾最開始進(jìn)行讀取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d讀取到文件末尾, 日志還沒有寫入
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp 引用類型
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan *Message),
read: r,
write: w,
}
// 通過接口方式 約束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通過參數(shù)注入方式
time.Sleep(100 * time.Second)
}到此這篇關(guān)于GoLang日志監(jiān)控系統(tǒng)實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)GoLang日志監(jiān)控內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理
這篇文章主要為大家介紹了go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
go?defer?return?panic?執(zhí)行順序示例詳解
這篇文章主要介紹了go?defer?return?panic?執(zhí)行順序,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-01-01
go語言中基本數(shù)據(jù)類型及應(yīng)用快速了解
這篇文章主要為大家介紹了go語言中基本數(shù)據(jù)類型應(yīng)用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
go獲取協(xié)程(goroutine)號(hào)的實(shí)例
這篇文章主要介紹了go獲取協(xié)程(goroutine)號(hào)的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12

