欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python解析MySQL Binlog日志分析情況

 更新時(shí)間:2025年01月22日 14:59:15   作者:Bing@DBA  
文章介紹了如何使用Python解析MySQL的binlog日志,并通過(guò)分析binlog來(lái)了解數(shù)據(jù)庫(kù)的變動(dòng)情況,如大事務(wù)、頻繁更新的表等,文章詳細(xì)描述了binlog的結(jié)構(gòu)、事件類(lèi)型以及如何通過(guò)解析這些事件來(lái)獲取所需的信息,最后,文章提供了一個(gè)示例代碼

Python解析MySQL Binlog日志分析情況

1. 分析目的

Binlog 中記錄了 MySQL 數(shù)據(jù)變動(dòng),經(jīng)常用于時(shí)間點(diǎn)恢復(fù)、數(shù)據(jù)閃回、分析一些 “奇怪” 的問(wèn)題。

例如是否有大事務(wù),哪張表涉及的更新最多?是否有一些事務(wù)沒(méi)有及時(shí)提交,都可以通過(guò)分析 binlog 來(lái)得到答案。

2. 代碼邏輯

收集數(shù)據(jù)的第一步就是要解析 binlog 文件,binlog 是由事件組成的,例如:GTID 事件、XID 事件、Table Map 事件、DML 事件,只要獲得所有的事件,就可以分析到很多東西。

每個(gè)事件都由兩部分組成,事件頭 - 事件體,事件頭的存儲(chǔ)格式是這樣的。

類(lèi)型占用
timestamp4 bytes
type_code1 bytes
server_id4 bytes
event_length4 bytes
next_position4 bytes
flags2 bytes

一共占 19 bytes 我們通過(guò) header 可以知道事件類(lèi)型,發(fā)生時(shí)間、事件長(zhǎng)度、下一個(gè)事件開(kāi)始位置。

讀取頭信息后,我們就可以通過(guò) next_position 跳到下一個(gè)事件開(kāi)始的位置,讀取事件頭,如果遇到 Table_map 事件則表示要開(kāi)啟一個(gè) DML 事務(wù),那么 Table Map 事件中存儲(chǔ)的是什么呢?

從 Table Map 中可以獲得 DML 要操作的數(shù)據(jù)庫(kù)與表信息,這樣我們就可以定位到 DML 操作的是哪張表,開(kāi)啟一個(gè)事務(wù)時(shí),binlog 會(huì)先記錄 Table_map 事件,涉及到多張表就會(huì)有多個(gè) Table_map 事件,然后就是 DML 事件,最后是一個(gè) XID 事件,表示事務(wù)提交。

腳本通過(guò)解析 Query_event 獲得事務(wù)的起點(diǎn),解析 Table map 事件獲得涉及的表,通過(guò) XID 事件獲得事務(wù)結(jié)束。

3. 實(shí)戰(zhàn)分析

直接上代碼吧~

需要安裝 pandas 模塊,這個(gè)模塊處理數(shù)據(jù)非常方便,如果沒(méi)有使用過(guò)的朋友,建議去把玩下,用好了提升工作效率。

# -*- coding: utf-8 -*-
import sys
import math
import time
import struct
import argparse
import pandas as pd
from datetime import datetime

binlog_quer_event_stern = 4
binlog_event_fix_part = 13
table_map_event_fix_length = 8
BINLOG_FILE_HEADER = b'\xFE\x62\x69\x6E'
binlog_event_header_len = 19


class BinlogEvent:
    UNKNOWN_EVENT = 0
    START_EVENT_V3 = 1
    QUERY_EVENT = 2
    STOP_EVENT = 3
    ROTATE_EVENT = 4
    INTVAR_EVENT = 5
    LOAD_EVENT = 6
    SLAVE_EVENT = 7
    CREATE_FILE_EVENT = 8
    APPEND_BLOCK_EVENT = 9
    EXEC_LOAD_EVENT = 10
    DELETE_FILE_EVENT = 11
    NEW_LOAD_EVENT = 12
    RAND_EVENT = 13
    USER_VAR_EVENT = 14
    FORMAT_DESCRIPTION_EVENT = 15
    XID_EVENT = 16
    BEGIN_LOAD_QUERY_EVENT = 17
    EXECUTE_LOAD_QUERY_EVENT = 18
    TABLE_MAP_EVENT = 19
    PRE_GA_WRITE_ROWS_EVENT = 20
    PRE_GA_UPDATE_ROWS_EVENT = 21
    PRE_GA_DELETE_ROWS_EVENT = 22
    WRITE_ROWS_EVENT = 23
    UPDATE_ROWS_EVENT = 24
    DELETE_ROWS_EVENT = 25
    INCIDENT_EVENT = 26
    HEARTBEAT_LOG_EVENT = 27
    IGNORABLE_LOG_EVENT = 28
    ROWS_QUERY_LOG_EVENT = 29
    WRITE_ROWS_EVENT_V2 = 30
    UPDATE_ROWS_EVENT_V2 = 31
    DELETE_ROWS_EVENT_V2 = 32
    GTID_LOG_EVENT = 33
    ANONYMOUS_GTID_LOG_EVENT = 34
    PREVIOUS_GTIDS_LOG_EVENT = 35


class BinlogEventGet(object):
    def __init__(self, binlog_path, outfile_path):
        self.file_handle = open(binlog_path, 'rb')
        # 分析文件導(dǎo)出的位置
        self.outfile_path = outfile_path

    def __del__(self):
        self.file_handle.close()

    def read_table_map_event(self, event_length, next_position):
        """
        fix_part = 8
            table_id : 6bytes
            Reserved : 2bytes
        variable_part:
            database_name_length : 1bytes
            database_name : database_name_length bytes + 1
            table_name_length : 1bytes
            table_name : table_name_length bytes + 1
            cloums_count : 1bytes
            colums_type_array : one byte per column
            mmetadata_lenth : 1bytes
            metadata : .....(only available in the variable length field,varchar:2bytes,text、blob:1bytes,time、timestamp、datetime: 1bytes
                            blob、float、decimal : 1bytes, char、enum、binary、set: 2bytes(column type id :1bytes metadatea: 1bytes))
            bit_filed : 1bytes
            crc : 4bytes
            .........
        :return:
        """
        self.read_bytes(table_map_event_fix_length)
        database_name_length, = struct.unpack('B', self.read_bytes(1))
        database_name, _a, = struct.unpack('{}ss'.format(database_name_length),
                                           self.read_bytes(database_name_length + 1))

        table_name_length, = struct.unpack('B', self.read_bytes(1))
        table_name, _a, = struct.unpack('{}ss'.format(table_name_length), self.read_bytes(table_name_length + 1))

        self.file_handle.seek(next_position, 0)
        return database_name, table_name

    def read_bytes(self, count):
        """
        讀取固定 bytes 的數(shù)據(jù)
        :param count:
        :return:
        """
        return self.file_handle.read(count)

    def main(self):
        if not self.read_bytes(4) == BINLOG_FILE_HEADER:
            print("Error: Is not a standard binlog file format.")
            sys.exit(0)

        # 事務(wù)記錄字典
        temp_transaction_dict = {
            'id': None,
            'db_name': None,
            'ld_table_name': None,
            'table_set': set(),
            'start_time': None,
            'end_time': None,
            'diff_second': None,
            'event_type': set(),
            'start_position': None,
            'end_position': None
        }

        tem_id = 0
        df = list()
        start_position, end_position = None, None
        print('loading.....')
        while True:
            type_code, event_length, timestamp, next_position = self.read_header()

            # 終止循環(huán)判斷
            if type_code is None:
                break

            # 事務(wù)信息收集邏輯判斷
            if type_code == BinlogEvent.QUERY_EVENT:
                thread_id, db_name, info = self.read_query_event(event_length)

                if info == 'BEGIN':
                    temp_transaction_dict['start_position'] = next_position - event_length
                    temp_transaction_dict['start_time'] = timestamp
                    temp_transaction_dict['db_name'] = db_name
                    # print('Time:', timestamp, 'DB:', db_name, 'SQL:', info)
                self.file_handle.seek(next_position, 0)

            elif type_code == BinlogEvent.TABLE_MAP_EVENT:
                with_database, with_table = self.read_table_map_event(event_length, next_position)
                # 只記錄最開(kāi)始的一張表
                if temp_transaction_dict['ld_table_name'] is None:
                    temp_transaction_dict['ld_table_name'] = str(with_table.decode())
                # 一個(gè)事務(wù)涉及的所有表集合
                temp_transaction_dict['table_set'].add(str(with_table.decode()))

            elif type_code in (BinlogEvent.WRITE_ROWS_EVENT, BinlogEvent.WRITE_ROWS_EVENT_V2):
                # print('INSERT:', type_code, event_length, timestamp, next_position)
                temp_transaction_dict['event_type'].add('INSERT')
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

            elif type_code in (BinlogEvent.UPDATE_ROWS_EVENT, BinlogEvent.UPDATE_ROWS_EVENT_V2):
                # print('UPDATE:', type_code, event_length, timestamp, next_position)
                temp_transaction_dict['event_type'].add('UPDATE')
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

            elif type_code in (BinlogEvent.DELETE_ROWS_EVENT, BinlogEvent.DELETE_ROWS_EVENT_V2):
                # print('DELETE:', type_code, event_length, timestamp, next_position)
                temp_transaction_dict['event_type'].add('DELETE')
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

            elif type_code == BinlogEvent.XID_EVENT:
                # 補(bǔ)充事務(wù)結(jié)束信息
                temp_transaction_dict['id'] = tem_id
                temp_transaction_dict['end_time'] = timestamp
                temp_transaction_dict['end_position'] = next_position

                _start = datetime.strptime(temp_transaction_dict['start_time'], '%Y-%m-%d %H:%M:%S')
                _end = datetime.strptime(temp_transaction_dict['end_time'], '%Y-%m-%d %H:%M:%S')

                temp_transaction_dict['diff_second'] = (_end - _start).seconds

                df.append(temp_transaction_dict)

                # print(temp_transaction_dict)
                # 收尾
                temp_transaction_dict = {
                    'id': None,
                    'db_name': None,
                    'ld_table_name': None,
                    'table_set': set(),
                    'start_time': None,
                    'end_time': None,
                    'diff_second': None,
                    'event_type': set(),
                    'start_position': None,
                    'end_position': None
                }
                self.file_handle.seek(event_length - binlog_event_header_len, 1)
                tem_id += 1
            else:
                # 如果讀取的是一個(gè) header 事件,直接跳過(guò)即可。
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

        outfile = pd.DataFrame(df)
        outfile['transaction_size_bytes'] = (outfile['end_position'] - outfile['start_position'])
        outfile["transaction_size"] = outfile["transaction_size_bytes"].map(lambda x: self.bit_conversion(x))
        outfile.to_csv(self.outfile_path, encoding='utf_8_sig')
        print('File Export directory: {}'.format(self.outfile_path))
        print('complete ok!')

    def read_header(self):
        """
        binlog_event_header_len = 19
        timestamp : 4bytes
        type_code : 1bytes
        server_id : 4bytes
        event_length : 4bytes
        next_position : 4bytes
        flags : 2bytes
        """
        read_byte = self.read_bytes(binlog_event_header_len)

        if read_byte:
            result = struct.unpack('=IBIIIH', read_byte)
            type_code, event_length, timestamp, next_position = result[1], result[3], result[0], result[4]

            return type_code, event_length, time.strftime('%Y-%m-%d %H:%M:%S',
                                                          time.localtime(
                                                              timestamp)), next_position
        else:
            return None, None, None, None

    def read_query_event(self, event_length=None):
        """
        fix_part = 13:
                thread_id : 4bytes
                execute_seconds : 4bytes
                database_length : 1bytes
                error_code : 2bytes
                variable_block_length : 2bytes
            variable_part :
                variable_block_length = fix_part.variable_block_length
                database_name = fix_part.database_length
                sql_statement = event_header.event_length - 19 - 13 - variable_block_length - database_length - 4
        """
        read_byte = self.read_bytes(binlog_event_fix_part)
        fix_result = struct.unpack('=IIBHH', read_byte)
        thread_id = fix_result[0]
        self.read_bytes(fix_result[4])
        read_byte = self.read_bytes(fix_result[2])
        database_name, = struct.unpack('{}s'.format(fix_result[2]), read_byte)
        statement_length = event_length - binlog_event_fix_part - binlog_event_header_len \
                           - fix_result[4] - fix_result[2] - binlog_quer_event_stern
        read_byte = self.read_bytes(statement_length)
        _a, sql_statement, = struct.unpack('1s{}s'.format(statement_length - 1), read_byte)
        return thread_id, database_name.decode(), sql_statement.decode()

    @staticmethod
    def bit_conversion(size, dot=2):
        size = float(size)
        if 0 <= size < 1:
            human_size = str(round(size / 0.125, dot)) + ' b'
        elif 1 <= size < 1024:
            human_size = str(round(size, dot)) + ' B'
        elif math.pow(1024, 1) <= size < math.pow(1024, 2):
            human_size = str(round(size / math.pow(1024, 1), dot)) + ' KB'
        elif math.pow(1024, 2) <= size < math.pow(1024, 3):
            human_size = str(round(size / math.pow(1024, 2), dot)) + ' MB'
        elif math.pow(1024, 3) <= size < math.pow(1024, 4):
            human_size = str(round(size / math.pow(1024, 3), dot)) + ' GB'
        elif math.pow(1024, 4) <= size < math.pow(1024, 5):
            human_size = str(round(size / math.pow(1024, 4), dot)) + ' TB'
        elif math.pow(1024, 5) <= size < math.pow(1024, 6):
            human_size = str(round(size / math.pow(1024, 5), dot)) + ' PB'
        elif math.pow(1024, 6) <= size < math.pow(1024, 7):
            human_size = str(round(size / math.pow(1024, 6), dot)) + ' EB'
        elif math.pow(1024, 7) <= size < math.pow(1024, 8):
            human_size = str(round(size / math.pow(1024, 7), dot)) + ' ZB'
        elif math.pow(1024, 8) <= size < math.pow(1024, 9):
            human_size = str(round(size / math.pow(1024, 8), dot)) + ' YB'
        elif math.pow(1024, 9) <= size < math.pow(1024, 10):
            human_size = str(round(size / math.pow(1024, 9), dot)) + ' BB'
        elif math.pow(1024, 10) <= size < math.pow(1024, 11):
            human_size = str(round(size / math.pow(1024, 10), dot)) + ' NB'
        elif math.pow(1024, 11) <= size < math.pow(1024, 12):
            human_size = str(round(size / math.pow(1024, 11), dot)) + ' DB'
        elif math.pow(1024, 12) <= size:
            human_size = str(round(size / math.pow(1024, 12), dot)) + ' CB'
        else:
            raise ValueError('bit_conversion Error')
        return human_size


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='A piece of binlog analysis code.')
    parser.add_argument('--binlog', type=str, help='Binlog file path.', default=None)
    parser.add_argument('--outfile', type=str, help='Analyze the file export directory.', default=None)
    args = parser.parse_args()

    if not args.binlog or not args.outfile:
        parser.print_help()
        sys.exit(0)

    binlog_show = BinlogEventGet(args.binlog, args.outfile)
    binlog_show.main()
?  Desktop python3 BinlogShow.py --help
usage: BinlogShow.py [-h] [--binlog BINLOG] [--outfile OUTFILE]

A piece of binlog analysis code.

optional arguments:
  -h, --help         show this help message and exit
  --binlog BINLOG    Binlog file path.
  --outfile OUTFILE  Analyze the file export directory.
?  Desktop

指定 binlog 文件目錄和導(dǎo)出分析文件目錄即可。

?  Desktop python3 BinlogShow.py --binlog=/Users/cooh/Desktop/mysql-bin.009549 --outfile=/Users/cooh/Desktop/binlogshow.csv
loading.....
File Export directory: /Users/cooh/Desktop/binlogshow.csv
complete ok!

運(yùn)行完成后就會(huì)得到程序解析后的信息,我們根據(jù)這份文件,寫(xiě)一些分析代碼即可。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Ubuntu權(quán)限不足無(wú)法創(chuàng)建文件夾解決方案

    Ubuntu權(quán)限不足無(wú)法創(chuàng)建文件夾解決方案

    這篇文章主要介紹了Ubuntu權(quán)限不足無(wú)法創(chuàng)建文件夾解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11
  • Python列表推導(dǎo)式詳解

    Python列表推導(dǎo)式詳解

    列表推導(dǎo)式是Python構(gòu)建列表(list)的一種快捷方式,可以使用簡(jiǎn)潔的代碼就創(chuàng)建出一個(gè)列表.本文通過(guò)代碼示例詳細(xì)介紹了python列表推導(dǎo)式,感興趣的同學(xué)可以參考閱讀
    2023-04-04
  • 詳解Python利用APScheduler框架實(shí)現(xiàn)定時(shí)任務(wù)

    詳解Python利用APScheduler框架實(shí)現(xiàn)定時(shí)任務(wù)

    在做一些python工具的時(shí)候,常常會(huì)碰到定時(shí)器問(wèn)題,總覺(jué)著使用threading.timer或者schedule模塊非常不優(yōu)雅。所以本文將利用APScheduler框架實(shí)現(xiàn)定時(shí)任務(wù),需要的可以參考一下
    2022-03-03
  • python之yield和Generator深入解析

    python之yield和Generator深入解析

    這篇文章主要介紹了python之yield和Generator深入解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09
  • 深度學(xué)習(xí)中shape[0]、shape[1]、shape[2]的區(qū)別詳解

    深度學(xué)習(xí)中shape[0]、shape[1]、shape[2]的區(qū)別詳解

    本文主要介紹了深度學(xué)習(xí)中shape[0]、shape[1]、shape[2]的區(qū)別詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • python裝飾器原理與用法深入詳解

    python裝飾器原理與用法深入詳解

    這篇文章主要介紹了python裝飾器原理與用法,結(jié)合實(shí)例形式深入分析了Python裝飾器的概念、原理、使用方法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下
    2019-12-12
  • 一步步教你用Python畫(huà)五彩氣球

    一步步教你用Python畫(huà)五彩氣球

    這篇文章主要給大家介紹了關(guān)于如何用Python畫(huà)五彩氣球的相關(guān)資料,主要是用turtle庫(kù)自帶的畫(huà)筆turtle.Turtle()來(lái)繪制氣球,文中給出了詳細(xì)的實(shí)例代碼,需要的朋友可以參考下
    2023-06-06
  • Pytorch中的數(shù)據(jù)集劃分&正則化方法

    Pytorch中的數(shù)據(jù)集劃分&正則化方法

    這篇文章主要介紹了Pytorch中的數(shù)據(jù)集劃分&正則化方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-05-05
  • Python virtualenv虛擬環(huán)境實(shí)現(xiàn)過(guò)程解析

    Python virtualenv虛擬環(huán)境實(shí)現(xiàn)過(guò)程解析

    這篇文章主要介紹了Python virtualenv虛擬環(huán)境實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Tensorflow之MNIST CNN實(shí)現(xiàn)并保存、加載模型

    Tensorflow之MNIST CNN實(shí)現(xiàn)并保存、加載模型

    這篇文章主要為大家詳細(xì)介紹了Tensorflow之MNIST CNN實(shí)現(xiàn)并保存、加載模型,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-06-06

最新評(píng)論