以go為例探究beyla從環(huán)境變量BEYLA_OPEN_PORT發(fā)現(xiàn)進(jìn)程原理
beyla源碼中,關(guān)于BEYLA_OPEN_PORT的定義
// beyla/pkg/internal/pipe/config.go
type Config struct {
...
Port services.PortEnum `yaml:"open_port" env:"BEYLA_OPEN_PORT"`
...
}
type PortEnum struct {
ranges []portRange
}
type portRange struct {
start int
// if end == 0, it means this entry is not a port range but a single port
end int
}可以看出,BEYLA_OPEN_PORT是個環(huán)境變量,它對應(yīng)的變量類型是ProtEnum,它是一個數(shù)值范圍的集合。
這里僅以指定單個值為例,如BEYLA_OPEN_PORT=8080,此時start=8080,end=0。
啟動golang程序并發(fā)現(xiàn)它
啟動beyla/exmpales下的example-http-service進(jìn)程:
- 該golang進(jìn)程會監(jiān)聽8080端口;
# curl -OL https://raw.githubusercontent.com/grafana/beyla/main/examples/example-http-service/example-http-service.go # go run ./example-http-service.go
然后啟動beyla,指定BEYLA_OPEN_PORT=8080,通過端口發(fā)現(xiàn)進(jìn)程:
# BEYLA_PROMETHEUS_PORT=9400 BEYLA_OPEN_PORT=8080 BEYLA_LOG_LEVEL=DEBUG beyla
最后beyla的日志中,就可以發(fā)現(xiàn)成功發(fā)現(xiàn)了該進(jìn)程:
...
time=2023-12-12T21:43:42.358-05:00 level=DEBUG msg="filtering processes" component=discover.CriteriaMatcher len=337
time=2023-12-12T21:43:42.435-05:00 level=DEBUG msg="found process" component=discover.CriteriaMatcher pid=612536 comm=~/go/src/github.com/grafana/beyla/examples/example-http-service/example-http-service.go
...
源碼的整體架構(gòu)

beyla源碼中,發(fā)現(xiàn)進(jìn)程的代碼流程分為2個部分:
WatchProvier:負(fù)責(zé)監(jiān)聽端口綁定的系統(tǒng)調(diào)用,然后檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除;
監(jiān)聽端口綁定:
- 通過ebpf完成的,監(jiān)聽kprobe/security_socket_bind;
- 若監(jiān)聽到kprobe調(diào)用,則設(shè)置fetchPorts=true,該flag意味著后面在查詢進(jìn)程信息的時候,同時查詢進(jìn)程的端口信息;
檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除:
- 查詢當(dāng)前系統(tǒng)上所有進(jìn)程及進(jìn)程使用的端口信息;
- 緩存上次輪訓(xùn)的進(jìn)程,然后對比本次輪訓(xùn)的結(jié)果,得到進(jìn)程的創(chuàng)建/刪除事件;
- 進(jìn)程的創(chuàng)建/刪除信息,保存在chan[] Event中傳遞給下一個pipeline;
CriteriaMatchProvider:負(fù)責(zé)檢查過并濾滿足條件的進(jìn)程;
- 指定BEYLA_OPEN_PORT的話,就檢查chan []Event中,是否有監(jiān)聽該端口的進(jìn)程;
- 若有,則意味著發(fā)現(xiàn)了新進(jìn)程;
- 然后由后面pipeline=TraceAttacherProvider去監(jiān)控新發(fā)現(xiàn)的進(jìn)程內(nèi)的http/grpc相關(guān)的kprobe/uprobe調(diào)用;
上述兩個流程以pipeline的形式組裝起來,之間通過chan []Event進(jìn)行數(shù)據(jù)連接;
- Chan []Event中保存了距上次輪訓(xùn)以來,新增的進(jìn)程或刪除的進(jìn)程;
源碼入口:
// beyla/pkg/internal/discover/finder.go
func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen))
graph.RegisterStart(gb, WatcherProvider)
graph.RegisterMiddle(gb, CriteriaMatcherProvider)
...
graph.RegisterTerminal(gb, TraceAttacherProvider)
pipeline, err := gb.Build(pf)
...
go pipeline.Run() // 啟動執(zhí)行
return pf.DiscoveredTracers, pf.DeleteTracers, nil
}WatchProvider
WatchProvider負(fù)責(zé)監(jiān)聽端口綁定的系統(tǒng)調(diào)用,然后檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除。
WatchProvider的實例化代碼如下,其中:
- loadBPFWatcher:負(fù)責(zé)監(jiān)聽ebpf端口綁定的系統(tǒng)調(diào)用;
- fetchProcessPorts:負(fù)責(zé)查詢當(dāng)前系統(tǒng)的進(jìn)程和監(jiān)聽端口情況;
// beyla/pkg/internal/discover/watcher.go
func WatcherProvider(w Watcher) (node.StartFunc[[]Event[processPorts]], error) {
acc := pollAccounter{
ctx: w.Ctx,
cfg: w.Cfg,
interval: w.Cfg.Discovery.PollInterval,
pids: map[PID]processPorts{},
pidPorts: map[pidPort]processPorts{},
listProcesses: fetchProcessPorts,
executableReady: executableReady,
loadBPFWatcher: loadBPFWatcher,
fetchPorts: true, // must be true until we've activated the bpf watcher component
bpfWatcherEnabled: false, // async set by listening on the bpfWatchEvents channel
stateMux: sync.Mutex{},
findingCriteria: FindingCriteria(w.Cfg),
}
if acc.interval == 0 {
acc.interval = defaultPollInterval
}
return acc.Run, nil // acc.Run開始工作
}具體工作由pollAccounter.Run()啟動執(zhí)行:
- 首先,加載ebpf程序,該ebpf程序會監(jiān)聽端口綁定的系統(tǒng)調(diào)用;
- 然后,消費ebpf端口綁定的事件,設(shè)置輪訓(xùn)processPorts的flag=true;
- 再后,通過listProcesses()查詢當(dāng)前系統(tǒng)內(nèi)的所有進(jìn)程和端口;
- 最后,通過snaphost(procs)獲得:自上次輪訓(xùn)以來,本地輪訓(xùn)得到的進(jìn)程創(chuàng)建/刪除事件;
- 這些事件被放入chan []Event,由下一級的pipeline消費處理;
// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) Run(out chan<- []Event[processPorts]) {
...
bpfWatchEvents := make(chan watcher.Event, 100)
// 加載ebpf程序
if err := pa.loadBPFWatcher(pa.cfg, bpfWatchEvents); err != nil {
log.Error("Unable to load eBPF watcher for process events", "error", err)
}
// 消費ebpf程序的端口綁定事件
go pa.watchForProcessEvents(log, bpfWatchEvents)
for {
// 查詢當(dāng)前系統(tǒng)內(nèi)的所有進(jìn)程和端口
procs, err := pa.listProcesses(pa.portFetchRequired()) // 參數(shù)=true
if err != nil {
log.Warn("can't get system processes", "error", err)
} else {
// 自上次輪訓(xùn)以來,本次輪訓(xùn)得到進(jìn)程創(chuàng)建/刪除事件;
if events := pa.snapshot(procs); len(events) > 0 {
log.Debug("new process watching events", "events", events)
out <- events
}
}
select {
case <-pa.ctx.Done():
log.Debug("context canceled. Exiting")
return
case <-time.After(pa.interval): // 定期輪訓(xùn),默認(rèn)interval=5s
// poll event starting again
}
}
}ebpf監(jiān)聽端口綁定
ebpf程序:
- 監(jiān)聽系統(tǒng)調(diào)用kprobe/security_socket_bind;
// beyla/bpf/watch_helper.c
SEC("kprobe/security_socket_bind")
int kprobe_security_socket_bind(struct pt_regs *ctx) {
struct sockaddr *addr = (struct sockaddr *)PT_REGS_PARM2(ctx);
...
u16 port = get_sockaddr_port(addr);
...
watch_info_t *trace = bpf_ringbuf_reserve(&watch_events, sizeof(watch_info_t), 0);
if (trace) {
trace->flags = WATCH_BIND;
trace->payload = port;
bpf_dbg_printk("New port bound %d", trace->payload);
bpf_ringbuf_submit(trace, 0);
}
return 0;
}然后在golang程序中,讀取ebpf的ringbuf,得到NewPort監(jiān)聽的事件:
// beyla/pkg/internal/ebpf/watcher/watcher.go
func (p *Watcher) processWatchEvent(record *ringbuf.Record) (request.Span, bool, error) {
var flags uint64
var event BPFWatchInfo
err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &flags)
...
if flags == 1 { // socket bind
err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
if err == nil {
p.log.Debug("New port bind event", "port", event.Payload)
p.events <- Event{Type: NewPort, Payload: uint32(event.Payload)}
}
}
return request.Span{}, true, nil
}上面p.events的消費代碼:
- 若監(jiān)聽到目標(biāo)端口的進(jìn)程被創(chuàng)建,則執(zhí)行pa.refetchPorts();
// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) watchForProcessEvents(log *slog.Logger, events <-chan watcher.Event) {
for e := range events {
switch e.Type {
case watcher.Ready:
pa.bpfWatcherIsReady()
case watcher.NewPort:
port := int(e.Payload)
if pa.cfg.Port.Matches(port) || pa.findingCriteria.PortOfInterest(port) {
pa.refetchPorts()
}
default:
log.Warn("Unknown ebpf process watch event", "type", e.Type)
}
}
}pa.refetchPorts()僅設(shè)置了一個flag:pa.fetchPorts=true:
- 該flag=true意味著后面listProcess的時候,需要同時查詢進(jìn)程使用的端口;
func (pa *pollAccounter) refetchPorts() {
pa.stateMux.Lock()
defer pa.stateMux.Unlock()
pa.fetchPorts = true
}檢查進(jìn)程的創(chuàng)建/刪除
首先,查詢進(jìn)程和進(jìn)程使用的ports:
- 參數(shù)scanPorts=true,意味著查詢進(jìn)程的ports;
// beyla/pkg/internal/discover/watcher.go
// 參數(shù)scanPorts=true
func fetchProcessPorts(scanPorts bool) (map[PID]processPorts, error) {
processes := map[PID]processPorts{}
pids, err := process.Pids()
for _, pid := range pids {
if !scanPorts { // 不查詢ports
processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: []uint32{}}
continue
}
// 查詢ports
conns, err := net.ConnectionsPid("inet", pid)
...
var openPorts []uint32
// TODO: Cap the size of this array, leaking client ephemeral ports will cause this to grow very long
for _, conn := range conns {
openPorts = append(openPorts, conn.Laddr.Port)
}
processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: openPorts}
}
return processes, nil
}然后,對比上次輪訓(xùn)的結(jié)果與本次的結(jié)果,生成進(jìn)程創(chuàng)建/刪除的events:
- events中保存進(jìn)程進(jìn)程和端口信息;
- events被放入chan,然后給下一級的pipeline使用;
// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) snapshot(fetchedProcs map[PID]processPorts) []Event[processPorts] {
var events []Event[processPorts]
currentPidPorts := make(map[pidPort]processPorts, len(fetchedProcs))
reportedProcs := map[PID]struct{}{}
notReadyProcs := map[PID]struct{}{}
// notify processes that are new, or already existed but have a new connection
for pid, proc := range fetchedProcs {
// if the process does not have open ports, we might still notify it
// for example, if it's a client with ephemeral connections, which might be later matched by executable name
if len(proc.openPorts) == 0 {
...
} else {
for _, port := range proc.openPorts {
if pa.checkNewProcessConnectionNotification(proc, port, currentPidPorts, reportedProcs, notReadyProcs) {
events = append(events, Event[processPorts]{Type: EventCreated, Obj: proc}) // 進(jìn)程創(chuàng)建,同時保存進(jìn)程和端口
// skip checking new connections for that process
continue
}
}
}
}
// notify processes that are removed
for pid, proc := range pa.pids {
if _, ok := fetchedProcs[pid]; !ok {
events = append(events, Event[processPorts]{Type: EventDeleted, Obj: proc}) // 進(jìn)程刪除,同時保存進(jìn)程和端口
}
}
....
pa.pids = currentProcs
pa.pidPorts = currentPidPorts
return events
}CriteriaMatchProvider
CriteriaMatchProvider負(fù)責(zé)檢查過濾滿足條件的進(jìn)程。
該Pipeline的實例化代碼如下:
- 其中criteria=篩選標(biāo)準(zhǔn),從進(jìn)程配置中生成,后面的篩選均通過與criteria進(jìn)行比對完成;
// beyla/pkg/internal/discover/matcher.go
func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processPorts], []Event[ProcessMatch]], error) {
m := &matcher{
log: slog.With("component", "discover.CriteriaMatcher"),
criteria: FindingCriteria(cm.Cfg),
processHistory: map[PID]struct{}{},
}
return m.run, nil
}具體檢查和篩選工作,由matcher.run()完成,該函數(shù)內(nèi):
- 針對每一批的[]Event,由m.filter進(jìn)行篩選;
- 篩選的結(jié)果最終保存到chan []Event,傳遞給下一級的Pipeline使用;
// beyla/pkg/internal/discover/matcher.go
func (m *matcher) run(in <-chan []Event[processPorts], out chan<- []Event[ProcessMatch]) {
m.log.Debug("starting criteria matcher node")
for i := range in {
m.log.Debug("filtering processes", "len", len(i))
o := m.filter(i) // 執(zhí)行篩選
m.log.Debug("processes matching selection criteria", "len", len(o))
out <- o
}
}而m.fiter()在篩選時:
- 針對每一個event,遍歷criteria,檢查event內(nèi)的進(jìn)程是否match criteria,若match,則意味著找到一個進(jìn)程;
- 其中一個event內(nèi)保存EventType(Create/Delete)和進(jìn)程信息(包含端口),只需滿足其中一個criteria即可;
// beyla/pkg/internal/discover/matcher.go
func (m *matcher) filter(events []Event[processPorts]) []Event[ProcessMatch] {
var matches []Event[ProcessMatch]
// 針對每一個event
for _, ev := range events {
...
proc, err := processInfo(ev.Obj)
...
// 遍歷m.criteria
for i := range m.criteria {
// 檢查是否滿足其中一個criteria
if m.matchProcess(proc, &m.criteria[i]) {
comm := proc.ExePath
// match,找到一個符合條件的進(jìn)程
m.log.Debug("found process", "pid", proc.Pid, "comm", comm)
matches = append(matches, Event[ProcessMatch]{
Type: EventCreated,
Obj: ProcessMatch{Criteria: &m.criteria[i], Process: proc},
})
break
}
}
}
return matches
}具體看下m.matchProcesses()中關(guān)于port的匹配邏輯:
- 使用openPorts(即BEYLA_OPEN_PORT參數(shù))與進(jìn)程的port進(jìn)行比對;
// beyla/pkg/internal/discover/matcher.go
func (m *matcher) matchProcess(p *services.ProcessInfo, a *services.Attributes) bool {
...
if a.OpenPorts.Len() > 0 {
return m.matchByPort(p, a) // 檢查端口是否matcher
}
return true
}
func (m *matcher) matchByPort(p *services.ProcessInfo, a *services.Attributes) bool {
for _, c := range p.OpenPorts {
if a.OpenPorts.Matches(int(c)) { // openPorts=BEYLA_OPEN_PORT參數(shù)
return true
}
}
return false
}openPorts是PortEnum類型,一個range=[start,End],若僅指定一個,則start=指定值,end=0;
// beyla/pkg/internal/discover/services/criteria.go
type PortEnum struct {
ranges []portRange
}
type portRange struct {
start int
// if end == 0, it means this entry is not a port range but a single port
end int
}其match邏輯如下:
- 由于指定BEYLA_OPEN_PORT=8080,則start=8080,end=0,該函數(shù)返回=true;
// beyla/pkg/internal/discover/services/criteria.go
func (p *PortEnum) Matches(port int) bool {
for _, pr := range p.ranges {
if pr.end == 0 && pr.start == port ||
pr.end != 0 && pr.start <= port && port <= pr.end {
return true
}
}
return false
}以上就是以go為例探究beyla從環(huán)境變量BEYLA_OPEN_PORT發(fā)現(xiàn)進(jìn)程原理的詳細(xì)內(nèi)容,更多關(guān)于go beyla BEYLA_OPEN_PORT進(jìn)程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang的時區(qū)和神奇的time.Parse的使用方法
這篇文章主要介紹了golang的時區(qū)和神奇的time.Parse的使用方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
go語言規(guī)范RESTful?API業(yè)務(wù)錯誤處理
這篇文章主要為大家介紹了go語言規(guī)范RESTful?API業(yè)務(wù)錯誤處理方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
利用Golang解析json數(shù)據(jù)的方法示例
Go提供了原生的JSON庫,并且與語言本身有效的集成在了一起。下面這篇文章將給大家介紹關(guān)于利用Golang解析json數(shù)據(jù)的方法,文中給出了詳細(xì)的示例代碼供大家參考學(xué)習(xí),需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-07-07
通過函數(shù)如何將golang?float64?保留2位小數(shù)(方法匯總)
這篇文章主要介紹了通過函數(shù)將golang?float64保留2位小數(shù),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08

