欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用Python實(shí)現(xiàn)tail的示例代碼

 更新時(shí)間:2023年03月01日 14:15:09   作者:so1n  
tail是一個(gè)常用的Linux命令, 它可以打印文件的后面n行數(shù)據(jù), 也能實(shí)時(shí)輸出文件的追加數(shù)據(jù)。本文就來(lái)用Python實(shí)現(xiàn)tail,感興趣的可以了解一下

前記

tail是一個(gè)常用的Linux命令, 它可以打印文件的后面n行數(shù)據(jù), 也能實(shí)時(shí)輸出文件的追加數(shù)據(jù). tail的實(shí)現(xiàn)很簡(jiǎn)單,但是要實(shí)現(xiàn)一個(gè)完善的tail卻需要考慮很多細(xì)節(jié),如果要注重性能,則需要引入一些其他的機(jī)制. 一開(kāi)始只是為了單純的實(shí)現(xiàn)Linux的tail的基本功能,后面隨著需要對(duì)日志文件的高性能讀取則需要的Linux的inotify機(jī)制去完善. 相關(guān)源碼見(jiàn):https://github.com/so1n/example/tree/master/example_python/example_python/tail

1.第一版--從文件尾部讀取實(shí)時(shí)數(shù)據(jù)

主要思路是: 打開(kāi)文件, 把指針移動(dòng)到文件最后, 然后有數(shù)據(jù)則輸出數(shù)據(jù), 無(wú)數(shù)據(jù)則休眠一段時(shí)間.

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 從文件結(jié)尾處開(kāi)始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都會(huì)每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

之后只要做如下調(diào)用即可:

python xxx.py filename 

2.第二版--實(shí)現(xiàn)tail -f

tail -f默認(rèn)先讀取最后10行數(shù)據(jù),再?gòu)奈募膊孔x取實(shí)時(shí)數(shù)據(jù).如果對(duì)于小文件,可以先讀取所有文件內(nèi)容,并輸出最后10行, 但是讀取全文再獲取最后10行的性能不高, 而從后滾10行的邊界條件也很復(fù)雜, 先看先讀取全文再獲取最后10行的實(shí)現(xiàn):

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都會(huì)每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()   

可以看到實(shí)現(xiàn)很簡(jiǎn)單, 相比第一版只多了個(gè)read_last_line的函數(shù), 接下來(lái)就要解決性能的問(wèn)題了, 當(dāng)文件很大的時(shí)候, 這個(gè)邏輯是不行的, 特別是有些日志文件經(jīng)常有幾個(gè)G大, 如果全讀出來(lái)內(nèi)存就爆了. 而在Linux系統(tǒng)中, 沒(méi)有一個(gè)接口可以指定指針跳到倒數(shù)10行, 只能使用如下方法來(lái)模擬輸出倒數(shù)10行:

  • 首先游標(biāo)跳轉(zhuǎn)到最新的字符, 保存當(dāng)前游標(biāo), 然后預(yù)估一行數(shù)據(jù)的字符長(zhǎng)度, 最好偏多, 這里我按1024字符長(zhǎng)度為一行來(lái)處理
  • 然后利用seek的方法,跳轉(zhuǎn)到seek(-1024 * 10, 2)的字符, 這就是我們預(yù)估的倒數(shù)10行內(nèi)的內(nèi)容
  • 接著對(duì)內(nèi)容進(jìn)行判斷, 如果跳轉(zhuǎn)的字符長(zhǎng)度小于 10 * 1024, 則證明整個(gè)文件沒(méi)有10行, 則采用原來(lái)的read_last_line方法.
  • 如果跳轉(zhuǎn)到字符長(zhǎng)度等于1024 * 10, 則利用換行符計(jì)算已取字符長(zhǎng)度共有多少行,如果行數(shù)大于10,那只輸出最后10行,如果只讀了4行,則繼續(xù)讀6*1024,直到讀滿10行為止

通過(guò)以上步奏, 就把倒數(shù)10行的數(shù)據(jù)計(jì)算好了可以打印出來(lái), 可以進(jìn)入追加數(shù)據(jù)了, 但是這時(shí)候文件內(nèi)容可能發(fā)生改變了, 我們的游標(biāo)也發(fā)生改變了, 這時(shí)候要把游標(biāo)跳回到剛才保存的游標(biāo),防止漏打或者重復(fù)打印數(shù)據(jù).

分析完畢后, 就可以開(kāi)始重構(gòu)read_last_line函數(shù)了.

import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都會(huì)每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳轉(zhuǎn)游標(biāo)到最后
        file.seek(0, 2)
        # 獲取當(dāng)前結(jié)尾的游標(biāo)位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳轉(zhuǎn)的字符長(zhǎng)度大于原來(lái)文件長(zhǎng)度,那就把所有文件內(nèi)容打印出來(lái)
                file.seek(0) # 由于read方法是按照游標(biāo)進(jìn)行打印, 所以要重置游標(biāo)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新獲取游標(biāo)位置
                now_tell: int = file.tell()
                break
            # 跳轉(zhuǎn)到我們預(yù)估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果獲取的行數(shù)大于要求的行數(shù),則獲取前n行的行數(shù)
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果獲取的行數(shù)小于要求的行數(shù),則預(yù)估需要獲取的行數(shù),繼續(xù)獲取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游標(biāo),確保接下來(lái)打印的數(shù)據(jù)不重復(fù)
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3.第三版--優(yōu)雅的讀取輸出日志文件

可以發(fā)現(xiàn)實(shí)時(shí)讀取那塊的邏輯性能還是很差, 如果每秒讀一次文件,實(shí)時(shí)性就太慢了,把間隔改小了,則處理器占用太多. 性能最好的情況是如果能得知文件更新再進(jìn)行打印文件, 那性能就能得到保障了.慶幸的是,在Linux中inotify提供了這樣的功能. 此外,日志文件有一個(gè)特點(diǎn)就是會(huì)進(jìn)行l(wèi)ogrotate,如果日志被logrotate了,那我們就需要重新打開(kāi)文件,并進(jìn)一步讀取數(shù)據(jù), 這種情況也可以利用到inotify, 當(dāng)inotify獲取到文件重新打開(kāi)的事件時(shí),我們就重新打開(kāi)文件,再讀取.

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 監(jiān)控多個(gè)事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件處理類,注意繼承
    """
    執(zhí)行inotify event的封裝
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接繼承__init__, 而是要重寫my_init, 我們重寫這一段并進(jìn)行初始化"""

        # 獲取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只監(jiān)控路徑,這樣就能知道文件是否移動(dòng)
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """統(tǒng)一的輸出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必須為process_事件名稱,event表示事件對(duì)象, 這里表示監(jiān)控到文件發(fā)生變化, 進(jìn)行文件讀取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必須為process_事件名稱,event表示事件對(duì)象, 這里表示監(jiān)控到文件發(fā)生重新打開(kāi), 進(jìn)行文件讀取"""
        if event.pathname == self.filename:
            # 檢測(cè)到文件被移動(dòng)重新打開(kāi)文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 創(chuàng)建WatchManager對(duì)象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 實(shí)例化我們定制化后的事件處理類, 采用**dict傳參數(shù)
        wm.add_watch('/tmp', multi_event)  # 添加監(jiān)控的目錄,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier實(shí)例化時(shí)傳入,notifier會(huì)自動(dòng)執(zhí)行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通過(guò)inotify的with管理打開(kāi)文件"""
        with self.inotify_event_handle as i:
            # 先讀取指定的行數(shù)
            self.read_last_line(i.f, n)
            # 啟用inotify的監(jiān)聽(tīng)
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 獲取當(dāng)前結(jié)尾的游標(biāo)位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳轉(zhuǎn)的字符長(zhǎng)度大于原來(lái)文件長(zhǎng)度,那就把所有文件內(nèi)容打印出來(lái)
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新獲取游標(biāo)位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果獲取的行數(shù)大于要求的行數(shù),則獲取前n行的行數(shù)
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果獲取的行數(shù)小于要求的行數(shù),則預(yù)估需要獲取的行數(shù),繼續(xù)獲取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游標(biāo),確保接下來(lái)打印的數(shù)據(jù)不重復(fù)
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

可以看到, 從原本的open打開(kāi)文件改為用inotify打開(kāi)文件(這時(shí)候會(huì)調(diào)用my_init方法進(jìn)行初始化), 打開(kāi)后還是運(yùn)行我們打開(kāi)原來(lái)n行的代碼, 然后就交給inotify運(yùn)行. 在inotify運(yùn)行之前, 我們把重新打開(kāi)文件方法和打印文件方法都掛載在inotifiy對(duì)應(yīng)的事件里, 之后inotify運(yùn)行時(shí), 會(huì)根據(jù)對(duì)應(yīng)的事件執(zhí)行對(duì)應(yīng)的方法.

到此這篇關(guān)于使用Python實(shí)現(xiàn)tail的示例代碼的文章就介紹到這了,更多相關(guān)Python tail內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解Python中的List 2

    詳解Python中的List 2

    這篇文章主要為大家介紹了Python中的List,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2021-12-12
  • 200行python代碼實(shí)現(xiàn)2048游戲

    200行python代碼實(shí)現(xiàn)2048游戲

    這篇文章主要為大家詳細(xì)介紹了200行Python代碼實(shí)現(xiàn)2048游戲,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-07-07
  • python中封裝token問(wèn)題

    python中封裝token問(wèn)題

    這篇文章主要介紹了python中封裝token問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • 詳解numpy.ndarray.reshape()函數(shù)的參數(shù)問(wèn)題

    詳解numpy.ndarray.reshape()函數(shù)的參數(shù)問(wèn)題

    這篇文章主要介紹了詳解numpy.ndarray.reshape()函數(shù)的參數(shù)問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • python爬蟲(chóng)解決驗(yàn)證碼的思路及示例

    python爬蟲(chóng)解決驗(yàn)證碼的思路及示例

    這篇文章主要介紹了python爬蟲(chóng)解決驗(yàn)證碼的思路及示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Python讀取sqlite數(shù)據(jù)庫(kù)文件的方法分析

    Python讀取sqlite數(shù)據(jù)庫(kù)文件的方法分析

    這篇文章主要介紹了Python讀取sqlite數(shù)據(jù)庫(kù)文件的方法,結(jié)合實(shí)例形式分析了Python引入sqlite3模塊操作sqlite數(shù)據(jù)庫(kù)的讀取、SQL命令執(zhí)行等相關(guān)操作技巧,需要的朋友可以參考下
    2017-08-08
  • Python如何讀取PDF文檔(或TXT)

    Python如何讀取PDF文檔(或TXT)

    這篇文章主要介紹了Python如何讀取PDF文檔(或TXT),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • python對(duì)一個(gè)數(shù)向上取整的實(shí)例方法

    python對(duì)一個(gè)數(shù)向上取整的實(shí)例方法

    在本篇文章中小編給大家整理了關(guān)于python對(duì)一個(gè)數(shù)向上取整的實(shí)例方法,需要的朋友們可以跟著學(xué)習(xí)下。
    2020-06-06
  • Python中的八大核心語(yǔ)句你知道幾個(gè)呢?

    Python中的八大核心語(yǔ)句你知道幾個(gè)呢?

    Python?是一種代表簡(jiǎn)單思想的語(yǔ)言,其語(yǔ)法相對(duì)簡(jiǎn)單,很容易上手。本文精心篩選了Python中的八大核心語(yǔ)句,快來(lái)看看你都掌握了幾個(gè)呢
    2023-02-02
  • Python?clip與range函數(shù)保姆級(jí)使用教程

    Python?clip與range函數(shù)保姆級(jí)使用教程

    本文主要和大家介紹了詳解Python中clip與range函數(shù)的用法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參,希望能幫助到大家
    2022-06-06

最新評(píng)論