欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

以go為例探究beyla從環(huán)境變量BEYLA_OPEN_PORT發(fā)現(xiàn)進(jìn)程原理

 更新時(shí)間:2023年12月24日 09:07:07   作者:a朋  
這篇文章主要為大家介紹了以golang進(jìn)程為例,研究beyla從環(huán)境變量BEYLA_OPEN_PORT(即通過端口)發(fā)現(xiàn)進(jìn)程的原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

beyla源碼中,關(guān)于BEYLA_OPEN_PORT的定義

// beyla/pkg/internal/pipe/config.go
type Config struct {
    ...
    Port services.PortEnum `yaml:"open_port" env:"BEYLA_OPEN_PORT"`
    ...
}
type PortEnum struct {
    ranges []portRange
}
type portRange struct {
    start int
    // if end == 0, it means this entry is not a port range but a single port
    end int
}

可以看出,BEYLA_OPEN_PORT是個(gè)環(huán)境變量,它對(duì)應(yīng)的變量類型是ProtEnum,它是一個(gè)數(shù)值范圍的集合。

這里僅以指定單個(gè)值為例,如BEYLA_OPEN_PORT=8080,此時(shí)start=8080,end=0。

啟動(dòng)golang程序并發(fā)現(xiàn)它

啟動(dòng)beyla/exmpales下的example-http-service進(jìn)程:

  • 該golang進(jìn)程會(huì)監(jiān)聽8080端口;
# curl -OL https://raw.githubusercontent.com/grafana/beyla/main/examples/example-http-service/example-http-service.go
# go run ./example-http-service.go

然后啟動(dòng)beyla,指定BEYLA_OPEN_PORT=8080,通過端口發(fā)現(xiàn)進(jìn)程:

# BEYLA_PROMETHEUS_PORT=9400 BEYLA_OPEN_PORT=8080 BEYLA_LOG_LEVEL=DEBUG beyla

最后beyla的日志中,就可以發(fā)現(xiàn)成功發(fā)現(xiàn)了該進(jìn)程:

...
time=2023-12-12T21:43:42.358-05:00 level=DEBUG msg="filtering processes" component=discover.CriteriaMatcher len=337
time=2023-12-12T21:43:42.435-05:00 level=DEBUG msg="found process" component=discover.CriteriaMatcher pid=612536 comm=~/go/src/github.com/grafana/beyla/examples/example-http-service/example-http-service.go
...

源碼的整體架構(gòu)

beyla源碼中,發(fā)現(xiàn)進(jìn)程的代碼流程分為2個(gè)部分:

  • WatchProvier:負(fù)責(zé)監(jiān)聽端口綁定的系統(tǒng)調(diào)用,然后檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除;

    • 監(jiān)聽端口綁定:

      • 通過ebpf完成的,監(jiān)聽kprobe/security_socket_bind;
      • 若監(jiān)聽到kprobe調(diào)用,則設(shè)置fetchPorts=true,該flag意味著后面在查詢進(jìn)程信息的時(shí)候,同時(shí)查詢進(jìn)程的端口信息;
    • 檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除:

      • 查詢當(dāng)前系統(tǒng)上所有進(jìn)程及進(jìn)程使用的端口信息;
      • 緩存上次輪訓(xùn)的進(jìn)程,然后對(duì)比本次輪訓(xùn)的結(jié)果,得到進(jìn)程的創(chuàng)建/刪除事件;
      • 進(jìn)程的創(chuàng)建/刪除信息,保存在chan[] Event中傳遞給下一個(gè)pipeline;
  • CriteriaMatchProvider:負(fù)責(zé)檢查過并濾滿足條件的進(jìn)程;

    • 指定BEYLA_OPEN_PORT的話,就檢查chan []Event中,是否有監(jiān)聽該端口的進(jìn)程;
    • 若有,則意味著發(fā)現(xiàn)了新進(jìn)程;
    • 然后由后面pipeline=TraceAttacherProvider去監(jiān)控新發(fā)現(xiàn)的進(jìn)程內(nèi)的http/grpc相關(guān)的kprobe/uprobe調(diào)用;
  • 上述兩個(gè)流程以pipeline的形式組裝起來,之間通過chan []Event進(jìn)行數(shù)據(jù)連接;

    • Chan []Event中保存了距上次輪訓(xùn)以來,新增的進(jìn)程或刪除的進(jìn)程;

源碼入口:

// beyla/pkg/internal/discover/finder.go
func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
    gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen))
    graph.RegisterStart(gb, WatcherProvider)
    graph.RegisterMiddle(gb, CriteriaMatcherProvider)
    ...
    graph.RegisterTerminal(gb, TraceAttacherProvider)
    pipeline, err := gb.Build(pf)
    ...
    go pipeline.Run()   // 啟動(dòng)執(zhí)行
    return pf.DiscoveredTracers, pf.DeleteTracers, nil
}

WatchProvider

WatchProvider負(fù)責(zé)監(jiān)聽端口綁定的系統(tǒng)調(diào)用,然后檢查當(dāng)前系統(tǒng)的進(jìn)程創(chuàng)建/刪除。

WatchProvider的實(shí)例化代碼如下,其中:

  • loadBPFWatcher:負(fù)責(zé)監(jiān)聽ebpf端口綁定的系統(tǒng)調(diào)用;
  • fetchProcessPorts:負(fù)責(zé)查詢當(dāng)前系統(tǒng)的進(jìn)程和監(jiān)聽端口情況;
// beyla/pkg/internal/discover/watcher.go
func WatcherProvider(w Watcher) (node.StartFunc[[]Event[processPorts]], error) {
    acc := pollAccounter{
        ctx:               w.Ctx,
        cfg:               w.Cfg,
        interval:          w.Cfg.Discovery.PollInterval,
        pids:              map[PID]processPorts{},
        pidPorts:          map[pidPort]processPorts{},
        listProcesses:     fetchProcessPorts,
        executableReady:   executableReady,
        loadBPFWatcher:    loadBPFWatcher,
        fetchPorts:        true,  // must be true until we've activated the bpf watcher component
        bpfWatcherEnabled: false, // async set by listening on the bpfWatchEvents channel
        stateMux:          sync.Mutex{},
        findingCriteria:   FindingCriteria(w.Cfg),
    }
    if acc.interval == 0 {
        acc.interval = defaultPollInterval
    }
    return acc.Run, nil    // acc.Run開始工作
}

具體工作由pollAccounter.Run()啟動(dòng)執(zhí)行:

  • 首先,加載ebpf程序,該ebpf程序會(huì)監(jiān)聽端口綁定的系統(tǒng)調(diào)用;
  • 然后,消費(fèi)ebpf端口綁定的事件,設(shè)置輪訓(xùn)processPorts的flag=true;
  • 再后,通過listProcesses()查詢當(dāng)前系統(tǒng)內(nèi)的所有進(jìn)程和端口;
  • 最后,通過snaphost(procs)獲得:自上次輪訓(xùn)以來,本地輪訓(xùn)得到的進(jìn)程創(chuàng)建/刪除事件;
  • 這些事件被放入chan []Event,由下一級(jí)的pipeline消費(fèi)處理;
// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) Run(out chan<- []Event[processPorts]) {
    ...
    bpfWatchEvents := make(chan watcher.Event, 100)
    // 加載ebpf程序
    if err := pa.loadBPFWatcher(pa.cfg, bpfWatchEvents); err != nil {
        log.Error("Unable to load eBPF watcher for process events", "error", err)
    }
    // 消費(fèi)ebpf程序的端口綁定事件
    go pa.watchForProcessEvents(log, bpfWatchEvents)
    for {
        // 查詢當(dāng)前系統(tǒng)內(nèi)的所有進(jìn)程和端口
        procs, err := pa.listProcesses(pa.portFetchRequired())    // 參數(shù)=true
        if err != nil {
            log.Warn("can't get system processes", "error", err)
        } else {
            // 自上次輪訓(xùn)以來,本次輪訓(xùn)得到進(jìn)程創(chuàng)建/刪除事件;
            if events := pa.snapshot(procs); len(events) > 0 {
                log.Debug("new process watching events", "events", events)
                out <- events
            }
        }
        select {
        case <-pa.ctx.Done():
            log.Debug("context canceled. Exiting")
            return
        case <-time.After(pa.interval):     // 定期輪訓(xùn),默認(rèn)interval=5s
            // poll event starting again
        }
    }
}

ebpf監(jiān)聽端口綁定

ebpf程序:

  • 監(jiān)聽系統(tǒng)調(diào)用kprobe/security_socket_bind;
// beyla/bpf/watch_helper.c
SEC("kprobe/security_socket_bind")
int kprobe_security_socket_bind(struct pt_regs *ctx) {
    struct sockaddr *addr = (struct sockaddr *)PT_REGS_PARM2(ctx);
    ...
    u16 port = get_sockaddr_port(addr);
    ...
    watch_info_t *trace = bpf_ringbuf_reserve(&watch_events, sizeof(watch_info_t), 0);
    if (trace) {
        trace->flags = WATCH_BIND;
        trace->payload = port; 
        bpf_dbg_printk("New port bound %d", trace->payload);
        bpf_ringbuf_submit(trace, 0);
    }
    return 0;
}

然后在golang程序中,讀取ebpf的ringbuf,得到NewPort監(jiān)聽的事件:

// beyla/pkg/internal/ebpf/watcher/watcher.go
func (p *Watcher) processWatchEvent(record *ringbuf.Record) (request.Span, bool, error) {
    var flags uint64
    var event BPFWatchInfo
    err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &flags)
    ...
    if flags == 1 { // socket bind
        err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
        if err == nil {
            p.log.Debug("New port bind event", "port", event.Payload)
            p.events <- Event{Type: NewPort, Payload: uint32(event.Payload)}
        }
    }
    return request.Span{}, true, nil
}

上面p.events的消費(fèi)代碼:

  • 若監(jiān)聽到目標(biāo)端口的進(jìn)程被創(chuàng)建,則執(zhí)行pa.refetchPorts();
// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) watchForProcessEvents(log *slog.Logger, events <-chan watcher.Event) {
    for e := range events {
        switch e.Type {
        case watcher.Ready:
            pa.bpfWatcherIsReady()
        case watcher.NewPort:
            port := int(e.Payload)
            if pa.cfg.Port.Matches(port) || pa.findingCriteria.PortOfInterest(port) {
                pa.refetchPorts()
            }
        default:
            log.Warn("Unknown ebpf process watch event", "type", e.Type)
        }
    }
}

pa.refetchPorts()僅設(shè)置了一個(gè)flag:pa.fetchPorts=true:

  • 該flag=true意味著后面listProcess的時(shí)候,需要同時(shí)查詢進(jìn)程使用的端口;
func (pa *pollAccounter) refetchPorts() {
   pa.stateMux.Lock()
   defer pa.stateMux.Unlock()
   pa.fetchPorts = true
}

檢查進(jìn)程的創(chuàng)建/刪除

首先,查詢進(jìn)程和進(jìn)程使用的ports:

  • 參數(shù)scanPorts=true,意味著查詢進(jìn)程的ports;
// beyla/pkg/internal/discover/watcher.go
// 參數(shù)scanPorts=true
func fetchProcessPorts(scanPorts bool) (map[PID]processPorts, error) {
    processes := map[PID]processPorts{}
    pids, err := process.Pids()
    for _, pid := range pids {
        if !scanPorts { // 不查詢ports
            processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: []uint32{}}
            continue
        }
        // 查詢ports
        conns, err := net.ConnectionsPid("inet", pid)
        ...
        var openPorts []uint32
        // TODO: Cap the size of this array, leaking client ephemeral ports will cause this to grow very long
        for _, conn := range conns {
            openPorts = append(openPorts, conn.Laddr.Port)
        }
        processes[PID(pid)] = processPorts{pid: PID(pid), openPorts: openPorts}
    }
    return processes, nil
}

然后,對(duì)比上次輪訓(xùn)的結(jié)果與本次的結(jié)果,生成進(jìn)程創(chuàng)建/刪除的events:

  • events中保存進(jìn)程進(jìn)程和端口信息;
  • events被放入chan,然后給下一級(jí)的pipeline使用;
// beyla/pkg/internal/discover/watcher.go
func (pa *pollAccounter) snapshot(fetchedProcs map[PID]processPorts) []Event[processPorts] {
    var events []Event[processPorts]
    currentPidPorts := make(map[pidPort]processPorts, len(fetchedProcs))
    reportedProcs := map[PID]struct{}{}
    notReadyProcs := map[PID]struct{}{}
    // notify processes that are new, or already existed but have a new connection
    for pid, proc := range fetchedProcs {
        // if the process does not have open ports, we might still notify it
        // for example, if it's a client with ephemeral connections, which might be later matched by executable name
        if len(proc.openPorts) == 0 {
            ...
        } else {
            for _, port := range proc.openPorts {
                if pa.checkNewProcessConnectionNotification(proc, port, currentPidPorts, reportedProcs, notReadyProcs) {
                    events = append(events, Event[processPorts]{Type: EventCreated, Obj: proc})         // 進(jìn)程創(chuàng)建,同時(shí)保存進(jìn)程和端口
                    // skip checking new connections for that process
                    continue
                }
            }
        }
    }
    // notify processes that are removed
    for pid, proc := range pa.pids {
        if _, ok := fetchedProcs[pid]; !ok {
            events = append(events, Event[processPorts]{Type: EventDeleted, Obj: proc})     // 進(jìn)程刪除,同時(shí)保存進(jìn)程和端口
        }
    }
    ....
    pa.pids = currentProcs
    pa.pidPorts = currentPidPorts
    return events
}

CriteriaMatchProvider

CriteriaMatchProvider負(fù)責(zé)檢查過濾滿足條件的進(jìn)程。

該P(yáng)ipeline的實(shí)例化代碼如下:

  • 其中criteria=篩選標(biāo)準(zhǔn),從進(jìn)程配置中生成,后面的篩選均通過與criteria進(jìn)行比對(duì)完成;
// beyla/pkg/internal/discover/matcher.go
func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processPorts], []Event[ProcessMatch]], error) {
   m := &matcher{
      log:            slog.With("component", "discover.CriteriaMatcher"),
      criteria:       FindingCriteria(cm.Cfg),
      processHistory: map[PID]struct{}{},
   }
   return m.run, nil
}

具體檢查和篩選工作,由matcher.run()完成,該函數(shù)內(nèi):

  • 針對(duì)每一批的[]Event,由m.filter進(jìn)行篩選;
  • 篩選的結(jié)果最終保存到chan []Event,傳遞給下一級(jí)的Pipeline使用;
// beyla/pkg/internal/discover/matcher.go
func (m *matcher) run(in <-chan []Event[processPorts], out chan<- []Event[ProcessMatch]) {
    m.log.Debug("starting criteria matcher node")
    for i := range in {
        m.log.Debug("filtering processes", "len", len(i))
        o := m.filter(i)        // 執(zhí)行篩選
        m.log.Debug("processes matching selection criteria", "len", len(o))
        out <- o
    }
}

而m.fiter()在篩選時(shí):

  • 針對(duì)每一個(gè)event,遍歷criteria,檢查event內(nèi)的進(jìn)程是否match criteria,若match,則意味著找到一個(gè)進(jìn)程;
  • 其中一個(gè)event內(nèi)保存EventType(Create/Delete)和進(jìn)程信息(包含端口),只需滿足其中一個(gè)criteria即可;
// beyla/pkg/internal/discover/matcher.go
func (m *matcher) filter(events []Event[processPorts]) []Event[ProcessMatch] {
    var matches []Event[ProcessMatch]
    // 針對(duì)每一個(gè)event
    for _, ev := range events {
        ...
        proc, err := processInfo(ev.Obj)
        ...
        // 遍歷m.criteria
        for i := range m.criteria {
            // 檢查是否滿足其中一個(gè)criteria
            if m.matchProcess(proc, &m.criteria[i]) {
                comm := proc.ExePath
                // match,找到一個(gè)符合條件的進(jìn)程
                m.log.Debug("found process", "pid", proc.Pid, "comm", comm)
                matches = append(matches, Event[ProcessMatch]{
                    Type: EventCreated,
                    Obj:  ProcessMatch{Criteria: &m.criteria[i], Process: proc},
                })
                break
            }
        }
    }
    return matches
}

具體看下m.matchProcesses()中關(guān)于port的匹配邏輯:

  • 使用openPorts(即BEYLA_OPEN_PORT參數(shù))與進(jìn)程的port進(jìn)行比對(duì);
// beyla/pkg/internal/discover/matcher.go
func (m *matcher) matchProcess(p *services.ProcessInfo, a *services.Attributes) bool {
    ...
    if a.OpenPorts.Len() > 0 {
        return m.matchByPort(p, a)    // 檢查端口是否matcher
    }
    return true
}
func (m *matcher) matchByPort(p *services.ProcessInfo, a *services.Attributes) bool {
    for _, c := range p.OpenPorts {
        if a.OpenPorts.Matches(int(c)) {    // openPorts=BEYLA_OPEN_PORT參數(shù)
            return true
        }
    }
    return false
}

openPorts是PortEnum類型,一個(gè)range=[start,End],若僅指定一個(gè),則start=指定值,end=0;

// beyla/pkg/internal/discover/services/criteria.go
type PortEnum struct {
   ranges []portRange
}
type portRange struct {
   start int
   // if end == 0, it means this entry is not a port range but a single port
   end int
}

其match邏輯如下:

  • 由于指定BEYLA_OPEN_PORT=8080,則start=8080,end=0,該函數(shù)返回=true;
// beyla/pkg/internal/discover/services/criteria.go
func (p *PortEnum) Matches(port int) bool {
    for _, pr := range p.ranges {
        if pr.end == 0 && pr.start == port ||
            pr.end != 0 && pr.start <= port && port <= pr.end {
            return true
        }
    }
    return false
}

以上就是以go為例探究beyla從環(huán)境變量BEYLA_OPEN_PORT發(fā)現(xiàn)進(jìn)程原理的詳細(xì)內(nèi)容,更多關(guān)于go beyla BEYLA_OPEN_PORT進(jìn)程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go語言中的自定義函數(shù)類型的實(shí)現(xiàn)

    Go語言中的自定義函數(shù)類型的實(shí)現(xiàn)

    在Go語言中,函數(shù)類型是一種將函數(shù)作為值的數(shù)據(jù)類型,本文主要介紹了Go語言中的自定義函數(shù)類型,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-09-09
  • 解決goland新建項(xiàng)目文件名為紅色的問題

    解決goland新建項(xiàng)目文件名為紅色的問題

    這篇文章主要介紹了解決goland新建項(xiàng)目文件名為紅色的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • golang的時(shí)區(qū)和神奇的time.Parse的使用方法

    golang的時(shí)區(qū)和神奇的time.Parse的使用方法

    這篇文章主要介紹了golang的時(shí)區(qū)和神奇的time.Parse的使用方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Go語言常用的打log方式詳解

    Go語言常用的打log方式詳解

    Golang的log包短小精悍,可以非常輕松的實(shí)現(xiàn)日志打印轉(zhuǎn)存功能,下面這篇文章主要給大家介紹了關(guān)于Go語言常用的打log方式的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-10-10
  • Go語言中的switch用法實(shí)例分析

    Go語言中的switch用法實(shí)例分析

    這篇文章主要介紹了Go語言中的switch用法,實(shí)例分析了switch的功能及使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-02-02
  • 詳解Go 并發(fā)

    詳解Go 并發(fā)

    這篇文章主要介紹了Go 并發(fā)的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)go語言,感興趣的朋友可以了解下
    2020-09-09
  • go語言規(guī)范RESTful?API業(yè)務(wù)錯(cuò)誤處理

    go語言規(guī)范RESTful?API業(yè)務(wù)錯(cuò)誤處理

    這篇文章主要為大家介紹了go語言規(guī)范RESTful?API業(yè)務(wù)錯(cuò)誤處理方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • 利用Golang解析json數(shù)據(jù)的方法示例

    利用Golang解析json數(shù)據(jù)的方法示例

    Go提供了原生的JSON庫,并且與語言本身有效的集成在了一起。下面這篇文章將給大家介紹關(guān)于利用Golang解析json數(shù)據(jù)的方法,文中給出了詳細(xì)的示例代碼供大家參考學(xué)習(xí),需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-07-07
  • 詳解golang各種類型是如何進(jìn)行比較的

    詳解golang各種類型是如何進(jìn)行比較的

    在日常開發(fā)中,比較操作是最常用的基本操作之一,可以用來判斷變量之間是否相等或者對(duì)應(yīng)的大小關(guān)系,比較操作對(duì)于排序、查找和集合數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)至關(guān)重要,本文將深入解析golang各種類型是如何進(jìn)行比較的,需要的朋友可以參考下
    2024-01-01
  • 通過函數(shù)如何將golang?float64?保留2位小數(shù)(方法匯總)

    通過函數(shù)如何將golang?float64?保留2位小數(shù)(方法匯總)

    這篇文章主要介紹了通過函數(shù)將golang?float64保留2位小數(shù),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-08-08

最新評(píng)論