以go為例探究beyla從環(huán)境變量BEYLA_OPEN_PORT發(fā)現(xiàn)進(jìn)程原理
beyla源碼中,關(guān)于BEYLA_OPEN_PORT的定義
// beyla/pkg/internal/pipe/config.go type Config struct { ... Port services.PortEnum `yaml:"open_port" env:"BEYLA_OPEN_PORT"` ... } type PortEnum struct { ranges []portRange } type portRange struct { start int // if end == 0, it means this entry is not a port range but a single port end int }
可以看出,BEYLA_OPEN_PORT是個(gè)環(huán)境變量,它對(duì)應(yīng)的變量類型是ProtEnum,它是一個(gè)數(shù)值范圍的集合。
這里僅以指定單個(gè)值為例,如BEYLA_OPEN_PORT=8080,此時(shí)start=8080,end=0。
啟動(dòng)golang程序并發(fā)現(xiàn)它
啟動(dòng)beyla/exmpales下的example-http-service進(jìn)程:
- 該golang進(jìn)程會(huì)監(jiān)聽8080端口;
# curl -OL https://raw.githubusercontent.com/grafana/beyla/main/examples/example-http-service/example-http-service.go # go run ./example-http-service.go
然后啟動(dòng)beyla,指定BEYLA_OPEN_PORT=8080,通過端口發(fā)現(xiàn)進(jìn)程:
# BEYLA_PROMETHEUS_PORT=9400 BEYLA_OPEN_PORT=8080 BEYLA_LOG_LEVEL=DEBUG beyla
最后beyla的日志中,就可以發(fā)現(xiàn)成功發(fā)現(xiàn)了該進(jìn)程:
...
time=2023-12-12T21:43:42.358-05:00 level=DEBUG msg="filtering processes" component=discover.CriteriaMatcher len=337
time=2023-12-12T21:43:42.435-05:00 level=DEBUG msg="found process" component=discover.CriteriaMatcher pid=612536 comm=~/go/src/github.com/grafana/beyla/examples/example-http-service/example-http-service.go
...
源碼的整體架構(gòu)
beyla源碼中,發(fā)現(xiàn)進(jìn)程的代碼流程分為2個(gè)部分:
WatchProvier:負(fù)責(zé)監(jiān)聽端口綁定的系統(tǒng)調(diào)用,然后檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除;
監(jiān)聽端口綁定:
- 通過ebpf完成的,監(jiān)聽kprobe/security_socket_bind;
- 若監(jiān)聽到kprobe調(diào)用,則設(shè)置fetchPorts=true,該flag意味著后面在查詢進(jìn)程信息的時(shí)候,同時(shí)查詢進(jìn)程的端口信息;
檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除:
- 查詢當(dāng)前系統(tǒng)上所有進(jìn)程及進(jìn)程使用的端口信息;
- 緩存上次輪訓(xùn)的進(jìn)程,然后對(duì)比本次輪訓(xùn)的結(jié)果,得到進(jìn)程的創(chuàng)建/刪除事件;
- 進(jìn)程的創(chuàng)建/刪除信息,保存在chan[] Event中傳遞給下一個(gè)pipeline;
CriteriaMatchProvider:負(fù)責(zé)檢查過并濾滿足條件的進(jìn)程;
- 指定BEYLA_OPEN_PORT的話,就檢查chan []Event中,是否有監(jiān)聽該端口的進(jìn)程;
- 若有,則意味著發(fā)現(xiàn)了新進(jìn)程;
- 然后由后面pipeline=TraceAttacherProvider去監(jiān)控新發(fā)現(xiàn)的進(jìn)程內(nèi)的http/grpc相關(guān)的kprobe/uprobe調(diào)用;
上述兩個(gè)流程以pipeline的形式組裝起來,之間通過chan []Event進(jìn)行數(shù)據(jù)連接;
- Chan []Event中保存了距上次輪訓(xùn)以來,新增的進(jìn)程或刪除的進(jìn)程;
源碼入口:
// beyla/pkg/internal/discover/finder.go func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) { gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen)) graph.RegisterStart(gb, WatcherProvider) graph.RegisterMiddle(gb, CriteriaMatcherProvider) ... graph.RegisterTerminal(gb, TraceAttacherProvider) pipeline, err := gb.Build(pf) ... go pipeline.Run() // 啟動(dòng)執(zhí)行 return pf.DiscoveredTracers, pf.DeleteTracers, nil }
WatchProvider
WatchProvider負(fù)責(zé)監(jiān)聽端口綁定的系統(tǒng)調(diào)用,然后檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除。
WatchProvider的實(shí)例化代碼如下,其中:
- loadBPFWatcher:負(fù)責(zé)監(jiān)聽ebpf端口綁定的系統(tǒng)調(diào)用;
- fetchProcessPorts:負(fù)責(zé)查詢當(dāng)前系統(tǒng)的進(jìn)程和監(jiān)聽端口情況;
// beyla/pkg/internal/discover/watcher.go func WatcherProvider(w Watcher) (node.StartFunc[[]Event[processPorts]], error) { acc := pollAccounter{ ctx: w.Ctx, cfg: w.Cfg, interval: w.Cfg.Discovery.PollInterval, pids: map[PID]processPorts{}, pidPorts: map[pidPort]processPorts{}, listProcesses: fetchProcessPorts, executableReady: executableReady, loadBPFWatcher: loadBPFWatcher, fetchPorts: true, // must be true until we've activated the bpf watcher component bpfWatcherEnabled: false, // async set by listening on the bpfWatchEvents channel stateMux: sync.Mutex{}, findingCriteria: FindingCriteria(w.Cfg), } if acc.interval == 0 { acc.interval = defaultPollInterval } return acc.Run, nil // acc.Run開始工作 }
具體工作由pollAccounter.Run()啟動(dòng)執(zhí)行:
- 首先,加載ebpf程序,該ebpf程序會(huì)監(jiān)聽端口綁定的系統(tǒng)調(diào)用;
- 然后,消費(fèi)ebpf端口綁定的事件,設(shè)置輪訓(xùn)processPorts的flag=true;
- 再后,通過listProcesses()查詢當(dāng)前系統(tǒng)內(nèi)的所有進(jìn)程和端口;
- 最后,通過snaphost(procs)獲得:自上次輪訓(xùn)以來,本地輪訓(xùn)得到的進(jìn)程創(chuàng)建/刪除事件;
- 這些事件被放入chan []Event,由下一級(jí)的pipeline消費(fèi)處理;
// beyla/pkg/internal/discover/watcher.go func (pa *pollAccounter) Run(out chan<- []Event[processPorts]) { ... bpfWatchEvents := make(chan watcher.Event, 100) // 加載ebpf程序 if err := pa.loadBPFWatcher(pa.cfg, bpfWatchEvents); err != nil { log.Error("Unable to load eBPF watcher for process events", "error", err) } // 消費(fèi)ebpf程序的端口綁定事件 go pa.watchForProcessEvents(log, bpfWatchEvents) for { // 查詢當(dāng)前系統(tǒng)內(nèi)的所有進(jìn)程和端口 procs, err := pa.listProcesses(pa.portFetchRequired()) // 參數(shù)=true if err != nil { log.Warn("can't get system processes", "error", err) } else { // 自上次輪訓(xùn)以來,本次輪訓(xùn)得到進(jìn)程創(chuàng)建/刪除事件; if events := pa.snapshot(procs); len(events) > 0 { log.Debug("new process watching events", "events", events) out <- events } } select { case <-pa.ctx.Done(): log.Debug("context canceled. Exiting") return case <-time.After(pa.interval): // 定期輪訓(xùn),默認(rèn)interval=5s // poll event starting again } } }
ebpf監(jiān)聽端口綁定
ebpf程序:
- 監(jiān)聽系統(tǒng)調(diào)用kprobe/security_socket_bind;
// beyla/bpf/watch_helper.c SEC("kprobe/security_socket_bind") int kprobe_security_socket_bind(struct pt_regs *ctx) { struct sockaddr *addr = (struct sockaddr *)PT_REGS_PARM2(ctx); ... u16 port = get_sockaddr_port(addr); ... watch_info_t *trace = bpf_ringbuf_reserve(&watch_events, sizeof(watch_info_t), 0); if (trace) { trace->flags = WATCH_BIND; trace->payload = port; bpf_dbg_printk("New port bound %d", trace->payload); bpf_ringbuf_submit(trace, 0); } return 0; }
然后在golang程序中,讀取ebpf的ringbuf,得到NewPort監(jiān)聽的事件:
// beyla/pkg/internal/ebpf/watcher/watcher.go func (p *Watcher) processWatchEvent(record *ringbuf.Record) (request.Span, bool, error) { var flags uint64 var event BPFWatchInfo err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &flags) ... if flags == 1 { // socket bind err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) if err == nil { p.log.Debug("New port bind event", "port", event.Payload) p.events <- Event{Type: NewPort, Payload: uint32(event.Payload)} } } return request.Span{}, true, nil }
上面p.events的消費(fèi)代碼:
- 若監(jiān)聽到目標(biāo)端口的進(jìn)程被創(chuàng)建,則執(zhí)行pa.refetchPorts();
// beyla/pkg/internal/discover/watcher.go func (pa *pollAccounter) watchForProcessEvents(log *slog.Logger, events <-chan watcher.Event) { for e := range events { switch e.Type { case watcher.Ready: pa.bpfWatcherIsReady() case watcher.NewPort: port := int(e.Payload) if pa.cfg.Port.Matches(port) || pa.findingCriteria.PortOfInterest(port) { pa.refetchPorts() } default: log.Warn("Unknown ebpf process watch event", "type", e.Type) } } }
pa.refetchPorts()僅設(shè)置了一個(gè)flag:pa.fetchPorts=true:
- 該flag=true意味著后面listProcess的時(shí)候,需要同時(shí)查詢進(jìn)程使用的端口;
func (pa *pollAccounter) refetchPorts() { pa.stateMux.Lock() defer pa.stateMux.Unlock() pa.fetchPorts = true }
檢查進(jìn)程的創(chuàng)建/刪除
首先,查詢進(jìn)程和進(jìn)程使用的ports:
- 參數(shù)scanPorts=true,意味著查詢進(jìn)程的ports;
// beyla/pkg/internal/discover/watcher.go // 參數(shù)scanPorts=true func fetchProcessPorts(scanPorts bool) (map[PID]processPorts, error) { processes := map[PID]processPorts{} pids, err := process.Pids() for _, pid := range pids { if !scanPorts { // 不查詢ports processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: []uint32{}} continue } // 查詢ports conns, err := net.ConnectionsPid("inet", pid) ... var openPorts []uint32 // TODO: Cap the size of this array, leaking client ephemeral ports will cause this to grow very long for _, conn := range conns { openPorts = append(openPorts, conn.Laddr.Port) } processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: openPorts} } return processes, nil }
然后,對(duì)比上次輪訓(xùn)的結(jié)果與本次的結(jié)果,生成進(jìn)程創(chuàng)建/刪除的events:
- events中保存進(jìn)程進(jìn)程和端口信息;
- events被放入chan,然后給下一級(jí)的pipeline使用;
// beyla/pkg/internal/discover/watcher.go func (pa *pollAccounter) snapshot(fetchedProcs map[PID]processPorts) []Event[processPorts] { var events []Event[processPorts] currentPidPorts := make(map[pidPort]processPorts, len(fetchedProcs)) reportedProcs := map[PID]struct{}{} notReadyProcs := map[PID]struct{}{} // notify processes that are new, or already existed but have a new connection for pid, proc := range fetchedProcs { // if the process does not have open ports, we might still notify it // for example, if it's a client with ephemeral connections, which might be later matched by executable name if len(proc.openPorts) == 0 { ... } else { for _, port := range proc.openPorts { if pa.checkNewProcessConnectionNotification(proc, port, currentPidPorts, reportedProcs, notReadyProcs) { events = append(events, Event[processPorts]{Type: EventCreated, Obj: proc}) // 進(jìn)程創(chuàng)建,同時(shí)保存進(jìn)程和端口 // skip checking new connections for that process continue } } } } // notify processes that are removed for pid, proc := range pa.pids { if _, ok := fetchedProcs[pid]; !ok { events = append(events, Event[processPorts]{Type: EventDeleted, Obj: proc}) // 進(jìn)程刪除,同時(shí)保存進(jìn)程和端口 } } .... pa.pids = currentProcs pa.pidPorts = currentPidPorts return events }
CriteriaMatchProvider
CriteriaMatchProvider負(fù)責(zé)檢查過濾滿足條件的進(jìn)程。
該P(yáng)ipeline的實(shí)例化代碼如下:
- 其中criteria=篩選標(biāo)準(zhǔn),從進(jìn)程配置中生成,后面的篩選均通過與criteria進(jìn)行比對(duì)完成;
// beyla/pkg/internal/discover/matcher.go func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processPorts], []Event[ProcessMatch]], error) { m := &matcher{ log: slog.With("component", "discover.CriteriaMatcher"), criteria: FindingCriteria(cm.Cfg), processHistory: map[PID]struct{}{}, } return m.run, nil }
具體檢查和篩選工作,由matcher.run()完成,該函數(shù)內(nèi):
- 針對(duì)每一批的[]Event,由m.filter進(jìn)行篩選;
- 篩選的結(jié)果最終保存到chan []Event,傳遞給下一級(jí)的Pipeline使用;
// beyla/pkg/internal/discover/matcher.go func (m *matcher) run(in <-chan []Event[processPorts], out chan<- []Event[ProcessMatch]) { m.log.Debug("starting criteria matcher node") for i := range in { m.log.Debug("filtering processes", "len", len(i)) o := m.filter(i) // 執(zhí)行篩選 m.log.Debug("processes matching selection criteria", "len", len(o)) out <- o } }
而m.fiter()在篩選時(shí):
- 針對(duì)每一個(gè)event,遍歷criteria,檢查event內(nèi)的進(jìn)程是否match criteria,若match,則意味著找到一個(gè)進(jìn)程;
- 其中一個(gè)event內(nèi)保存EventType(Create/Delete)和進(jìn)程信息(包含端口),只需滿足其中一個(gè)criteria即可;
// beyla/pkg/internal/discover/matcher.go func (m *matcher) filter(events []Event[processPorts]) []Event[ProcessMatch] { var matches []Event[ProcessMatch] // 針對(duì)每一個(gè)event for _, ev := range events { ... proc, err := processInfo(ev.Obj) ... // 遍歷m.criteria for i := range m.criteria { // 檢查是否滿足其中一個(gè)criteria if m.matchProcess(proc, &m.criteria[i]) { comm := proc.ExePath // match,找到一個(gè)符合條件的進(jìn)程 m.log.Debug("found process", "pid", proc.Pid, "comm", comm) matches = append(matches, Event[ProcessMatch]{ Type: EventCreated, Obj: ProcessMatch{Criteria: &m.criteria[i], Process: proc}, }) break } } } return matches }
具體看下m.matchProcesses()中關(guān)于port的匹配邏輯:
- 使用openPorts(即BEYLA_OPEN_PORT參數(shù))與進(jìn)程的port進(jìn)行比對(duì);
// beyla/pkg/internal/discover/matcher.go func (m *matcher) matchProcess(p *services.ProcessInfo, a *services.Attributes) bool { ... if a.OpenPorts.Len() > 0 { return m.matchByPort(p, a) // 檢查端口是否matcher } return true } func (m *matcher) matchByPort(p *services.ProcessInfo, a *services.Attributes) bool { for _, c := range p.OpenPorts { if a.OpenPorts.Matches(int(c)) { // openPorts=BEYLA_OPEN_PORT參數(shù) return true } } return false }
openPorts是PortEnum類型,一個(gè)range=[start,End],若僅指定一個(gè),則start=指定值,end=0;
// beyla/pkg/internal/discover/services/criteria.go type PortEnum struct { ranges []portRange } type portRange struct { start int // if end == 0, it means this entry is not a port range but a single port end int }
其match邏輯如下:
- 由于指定BEYLA_OPEN_PORT=8080,則start=8080,end=0,該函數(shù)返回=true;
// beyla/pkg/internal/discover/services/criteria.go func (p *PortEnum) Matches(port int) bool { for _, pr := range p.ranges { if pr.end == 0 && pr.start == port || pr.end != 0 && pr.start <= port && port <= pr.end { return true } } return false }
以上就是以go為例探究beyla從環(huán)境變量BEYLA_OPEN_PORT發(fā)現(xiàn)進(jìn)程原理的詳細(xì)內(nèi)容,更多關(guān)于go beyla BEYLA_OPEN_PORT進(jìn)程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言中的自定義函數(shù)類型的實(shí)現(xiàn)
在Go語言中,函數(shù)類型是一種將函數(shù)作為值的數(shù)據(jù)類型,本文主要介紹了Go語言中的自定義函數(shù)類型,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09golang的時(shí)區(qū)和神奇的time.Parse的使用方法
這篇文章主要介紹了golang的時(shí)區(qū)和神奇的time.Parse的使用方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04go語言規(guī)范RESTful?API業(yè)務(wù)錯(cuò)誤處理
這篇文章主要為大家介紹了go語言規(guī)范RESTful?API業(yè)務(wù)錯(cuò)誤處理方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03利用Golang解析json數(shù)據(jù)的方法示例
Go提供了原生的JSON庫,并且與語言本身有效的集成在了一起。下面這篇文章將給大家介紹關(guān)于利用Golang解析json數(shù)據(jù)的方法,文中給出了詳細(xì)的示例代碼供大家參考學(xué)習(xí),需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-07-07通過函數(shù)如何將golang?float64?保留2位小數(shù)(方法匯總)
這篇文章主要介紹了通過函數(shù)將golang?float64保留2位小數(shù),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08