Python解析MySQL Binlog日志分析情況
Python解析MySQL Binlog日志分析情況
1. 分析目的
Binlog 中記錄了 MySQL 數(shù)據(jù)變動(dòng),經(jīng)常用于時(shí)間點(diǎn)恢復(fù)、數(shù)據(jù)閃回、分析一些 “奇怪” 的問(wèn)題。
例如是否有大事務(wù),哪張表涉及的更新最多?是否有一些事務(wù)沒(méi)有及時(shí)提交,都可以通過(guò)分析 binlog 來(lái)得到答案。
2. 代碼邏輯
收集數(shù)據(jù)的第一步就是要解析 binlog 文件,binlog 是由事件組成的,例如:GTID 事件、XID 事件、Table Map 事件、DML 事件,只要獲得所有的事件,就可以分析到很多東西。
每個(gè)事件都由兩部分組成,事件頭 - 事件體,事件頭的存儲(chǔ)格式是這樣的。
類(lèi)型 | 占用 |
---|---|
timestamp | 4 bytes |
type_code | 1 bytes |
server_id | 4 bytes |
event_length | 4 bytes |
next_position | 4 bytes |
flags | 2 bytes |
一共占 19 bytes 我們通過(guò) header 可以知道事件類(lèi)型,發(fā)生時(shí)間、事件長(zhǎng)度、下一個(gè)事件開(kāi)始位置。
讀取頭信息后,我們就可以通過(guò) next_position 跳到下一個(gè)事件開(kāi)始的位置,讀取事件頭,如果遇到 Table_map 事件則表示要開(kāi)啟一個(gè) DML 事務(wù),那么 Table Map 事件中存儲(chǔ)的是什么呢?
從 Table Map 中可以獲得 DML 要操作的數(shù)據(jù)庫(kù)與表信息,這樣我們就可以定位到 DML 操作的是哪張表,開(kāi)啟一個(gè)事務(wù)時(shí),binlog 會(huì)先記錄 Table_map 事件,涉及到多張表就會(huì)有多個(gè) Table_map 事件,然后就是 DML 事件,最后是一個(gè) XID 事件,表示事務(wù)提交。
腳本通過(guò)解析 Query_event 獲得事務(wù)的起點(diǎn),解析 Table map 事件獲得涉及的表,通過(guò) XID 事件獲得事務(wù)結(jié)束。
3. 實(shí)戰(zhàn)分析
直接上代碼吧~
需要安裝 pandas 模塊,這個(gè)模塊處理數(shù)據(jù)非常方便,如果沒(méi)有使用過(guò)的朋友,建議去把玩下,用好了提升工作效率。
# -*- coding: utf-8 -*- import sys import math import time import struct import argparse import pandas as pd from datetime import datetime binlog_quer_event_stern = 4 binlog_event_fix_part = 13 table_map_event_fix_length = 8 BINLOG_FILE_HEADER = b'\xFE\x62\x69\x6E' binlog_event_header_len = 19 class BinlogEvent: UNKNOWN_EVENT = 0 START_EVENT_V3 = 1 QUERY_EVENT = 2 STOP_EVENT = 3 ROTATE_EVENT = 4 INTVAR_EVENT = 5 LOAD_EVENT = 6 SLAVE_EVENT = 7 CREATE_FILE_EVENT = 8 APPEND_BLOCK_EVENT = 9 EXEC_LOAD_EVENT = 10 DELETE_FILE_EVENT = 11 NEW_LOAD_EVENT = 12 RAND_EVENT = 13 USER_VAR_EVENT = 14 FORMAT_DESCRIPTION_EVENT = 15 XID_EVENT = 16 BEGIN_LOAD_QUERY_EVENT = 17 EXECUTE_LOAD_QUERY_EVENT = 18 TABLE_MAP_EVENT = 19 PRE_GA_WRITE_ROWS_EVENT = 20 PRE_GA_UPDATE_ROWS_EVENT = 21 PRE_GA_DELETE_ROWS_EVENT = 22 WRITE_ROWS_EVENT = 23 UPDATE_ROWS_EVENT = 24 DELETE_ROWS_EVENT = 25 INCIDENT_EVENT = 26 HEARTBEAT_LOG_EVENT = 27 IGNORABLE_LOG_EVENT = 28 ROWS_QUERY_LOG_EVENT = 29 WRITE_ROWS_EVENT_V2 = 30 UPDATE_ROWS_EVENT_V2 = 31 DELETE_ROWS_EVENT_V2 = 32 GTID_LOG_EVENT = 33 ANONYMOUS_GTID_LOG_EVENT = 34 PREVIOUS_GTIDS_LOG_EVENT = 35 class BinlogEventGet(object): def __init__(self, binlog_path, outfile_path): self.file_handle = open(binlog_path, 'rb') # 分析文件導(dǎo)出的位置 self.outfile_path = outfile_path def __del__(self): self.file_handle.close() def read_table_map_event(self, event_length, next_position): """ fix_part = 8 table_id : 6bytes Reserved : 2bytes variable_part: database_name_length : 1bytes database_name : database_name_length bytes + 1 table_name_length : 1bytes table_name : table_name_length bytes + 1 cloums_count : 1bytes colums_type_array : one byte per column mmetadata_lenth : 1bytes metadata : .....(only available in the variable length field,varchar:2bytes,text、blob:1bytes,time、timestamp、datetime: 1bytes blob、float、decimal : 1bytes, char、enum、binary、set: 2bytes(column type id :1bytes metadatea: 1bytes)) bit_filed : 1bytes crc : 4bytes ......... :return: """ self.read_bytes(table_map_event_fix_length) database_name_length, = struct.unpack('B', self.read_bytes(1)) database_name, _a, = struct.unpack('{}ss'.format(database_name_length), self.read_bytes(database_name_length + 1)) table_name_length, = struct.unpack('B', self.read_bytes(1)) table_name, _a, = struct.unpack('{}ss'.format(table_name_length), self.read_bytes(table_name_length + 1)) self.file_handle.seek(next_position, 0) return database_name, table_name def read_bytes(self, count): """ 讀取固定 bytes 的數(shù)據(jù) :param count: :return: """ return self.file_handle.read(count) def main(self): if not self.read_bytes(4) == BINLOG_FILE_HEADER: print("Error: Is not a standard binlog file format.") sys.exit(0) # 事務(wù)記錄字典 temp_transaction_dict = { 'id': None, 'db_name': None, 'ld_table_name': None, 'table_set': set(), 'start_time': None, 'end_time': None, 'diff_second': None, 'event_type': set(), 'start_position': None, 'end_position': None } tem_id = 0 df = list() start_position, end_position = None, None print('loading.....') while True: type_code, event_length, timestamp, next_position = self.read_header() # 終止循環(huán)判斷 if type_code is None: break # 事務(wù)信息收集邏輯判斷 if type_code == BinlogEvent.QUERY_EVENT: thread_id, db_name, info = self.read_query_event(event_length) if info == 'BEGIN': temp_transaction_dict['start_position'] = next_position - event_length temp_transaction_dict['start_time'] = timestamp temp_transaction_dict['db_name'] = db_name # print('Time:', timestamp, 'DB:', db_name, 'SQL:', info) self.file_handle.seek(next_position, 0) elif type_code == BinlogEvent.TABLE_MAP_EVENT: with_database, with_table = self.read_table_map_event(event_length, next_position) # 只記錄最開(kāi)始的一張表 if temp_transaction_dict['ld_table_name'] is None: temp_transaction_dict['ld_table_name'] = str(with_table.decode()) # 一個(gè)事務(wù)涉及的所有表集合 temp_transaction_dict['table_set'].add(str(with_table.decode())) elif type_code in (BinlogEvent.WRITE_ROWS_EVENT, BinlogEvent.WRITE_ROWS_EVENT_V2): # print('INSERT:', type_code, event_length, timestamp, next_position) temp_transaction_dict['event_type'].add('INSERT') self.file_handle.seek(event_length - binlog_event_header_len, 1) elif type_code in (BinlogEvent.UPDATE_ROWS_EVENT, BinlogEvent.UPDATE_ROWS_EVENT_V2): # print('UPDATE:', type_code, event_length, timestamp, next_position) temp_transaction_dict['event_type'].add('UPDATE') self.file_handle.seek(event_length - binlog_event_header_len, 1) elif type_code in (BinlogEvent.DELETE_ROWS_EVENT, BinlogEvent.DELETE_ROWS_EVENT_V2): # print('DELETE:', type_code, event_length, timestamp, next_position) temp_transaction_dict['event_type'].add('DELETE') self.file_handle.seek(event_length - binlog_event_header_len, 1) elif type_code == BinlogEvent.XID_EVENT: # 補(bǔ)充事務(wù)結(jié)束信息 temp_transaction_dict['id'] = tem_id temp_transaction_dict['end_time'] = timestamp temp_transaction_dict['end_position'] = next_position _start = datetime.strptime(temp_transaction_dict['start_time'], '%Y-%m-%d %H:%M:%S') _end = datetime.strptime(temp_transaction_dict['end_time'], '%Y-%m-%d %H:%M:%S') temp_transaction_dict['diff_second'] = (_end - _start).seconds df.append(temp_transaction_dict) # print(temp_transaction_dict) # 收尾 temp_transaction_dict = { 'id': None, 'db_name': None, 'ld_table_name': None, 'table_set': set(), 'start_time': None, 'end_time': None, 'diff_second': None, 'event_type': set(), 'start_position': None, 'end_position': None } self.file_handle.seek(event_length - binlog_event_header_len, 1) tem_id += 1 else: # 如果讀取的是一個(gè) header 事件,直接跳過(guò)即可。 self.file_handle.seek(event_length - binlog_event_header_len, 1) outfile = pd.DataFrame(df) outfile['transaction_size_bytes'] = (outfile['end_position'] - outfile['start_position']) outfile["transaction_size"] = outfile["transaction_size_bytes"].map(lambda x: self.bit_conversion(x)) outfile.to_csv(self.outfile_path, encoding='utf_8_sig') print('File Export directory: {}'.format(self.outfile_path)) print('complete ok!') def read_header(self): """ binlog_event_header_len = 19 timestamp : 4bytes type_code : 1bytes server_id : 4bytes event_length : 4bytes next_position : 4bytes flags : 2bytes """ read_byte = self.read_bytes(binlog_event_header_len) if read_byte: result = struct.unpack('=IBIIIH', read_byte) type_code, event_length, timestamp, next_position = result[1], result[3], result[0], result[4] return type_code, event_length, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime( timestamp)), next_position else: return None, None, None, None def read_query_event(self, event_length=None): """ fix_part = 13: thread_id : 4bytes execute_seconds : 4bytes database_length : 1bytes error_code : 2bytes variable_block_length : 2bytes variable_part : variable_block_length = fix_part.variable_block_length database_name = fix_part.database_length sql_statement = event_header.event_length - 19 - 13 - variable_block_length - database_length - 4 """ read_byte = self.read_bytes(binlog_event_fix_part) fix_result = struct.unpack('=IIBHH', read_byte) thread_id = fix_result[0] self.read_bytes(fix_result[4]) read_byte = self.read_bytes(fix_result[2]) database_name, = struct.unpack('{}s'.format(fix_result[2]), read_byte) statement_length = event_length - binlog_event_fix_part - binlog_event_header_len \ - fix_result[4] - fix_result[2] - binlog_quer_event_stern read_byte = self.read_bytes(statement_length) _a, sql_statement, = struct.unpack('1s{}s'.format(statement_length - 1), read_byte) return thread_id, database_name.decode(), sql_statement.decode() @staticmethod def bit_conversion(size, dot=2): size = float(size) if 0 <= size < 1: human_size = str(round(size / 0.125, dot)) + ' b' elif 1 <= size < 1024: human_size = str(round(size, dot)) + ' B' elif math.pow(1024, 1) <= size < math.pow(1024, 2): human_size = str(round(size / math.pow(1024, 1), dot)) + ' KB' elif math.pow(1024, 2) <= size < math.pow(1024, 3): human_size = str(round(size / math.pow(1024, 2), dot)) + ' MB' elif math.pow(1024, 3) <= size < math.pow(1024, 4): human_size = str(round(size / math.pow(1024, 3), dot)) + ' GB' elif math.pow(1024, 4) <= size < math.pow(1024, 5): human_size = str(round(size / math.pow(1024, 4), dot)) + ' TB' elif math.pow(1024, 5) <= size < math.pow(1024, 6): human_size = str(round(size / math.pow(1024, 5), dot)) + ' PB' elif math.pow(1024, 6) <= size < math.pow(1024, 7): human_size = str(round(size / math.pow(1024, 6), dot)) + ' EB' elif math.pow(1024, 7) <= size < math.pow(1024, 8): human_size = str(round(size / math.pow(1024, 7), dot)) + ' ZB' elif math.pow(1024, 8) <= size < math.pow(1024, 9): human_size = str(round(size / math.pow(1024, 8), dot)) + ' YB' elif math.pow(1024, 9) <= size < math.pow(1024, 10): human_size = str(round(size / math.pow(1024, 9), dot)) + ' BB' elif math.pow(1024, 10) <= size < math.pow(1024, 11): human_size = str(round(size / math.pow(1024, 10), dot)) + ' NB' elif math.pow(1024, 11) <= size < math.pow(1024, 12): human_size = str(round(size / math.pow(1024, 11), dot)) + ' DB' elif math.pow(1024, 12) <= size: human_size = str(round(size / math.pow(1024, 12), dot)) + ' CB' else: raise ValueError('bit_conversion Error') return human_size if __name__ == '__main__': parser = argparse.ArgumentParser(description='A piece of binlog analysis code.') parser.add_argument('--binlog', type=str, help='Binlog file path.', default=None) parser.add_argument('--outfile', type=str, help='Analyze the file export directory.', default=None) args = parser.parse_args() if not args.binlog or not args.outfile: parser.print_help() sys.exit(0) binlog_show = BinlogEventGet(args.binlog, args.outfile) binlog_show.main()
? Desktop python3 BinlogShow.py --help usage: BinlogShow.py [-h] [--binlog BINLOG] [--outfile OUTFILE] A piece of binlog analysis code. optional arguments: -h, --help show this help message and exit --binlog BINLOG Binlog file path. --outfile OUTFILE Analyze the file export directory. ? Desktop
指定 binlog 文件目錄和導(dǎo)出分析文件目錄即可。
? Desktop python3 BinlogShow.py --binlog=/Users/cooh/Desktop/mysql-bin.009549 --outfile=/Users/cooh/Desktop/binlogshow.csv loading..... File Export directory: /Users/cooh/Desktop/binlogshow.csv complete ok!
運(yùn)行完成后就會(huì)得到程序解析后的信息,我們根據(jù)這份文件,寫(xiě)一些分析代碼即可。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- MySQL使用binlog日志恢復(fù)數(shù)據(jù)的方法步驟
- MySQL刪除binlog日志文件的三種實(shí)現(xiàn)方式
- 開(kāi)啟mysql的binlog日志步驟詳解
- 使用Canal監(jiān)聽(tīng)MySQL Binlog日志的實(shí)現(xiàn)方案
- mysql查看binlog日志的實(shí)現(xiàn)方法
- MySQL使用binlog日志進(jìn)行數(shù)據(jù)庫(kù)遷移和數(shù)據(jù)恢復(fù)
- Docker內(nèi)部MySQL如何開(kāi)啟binlog日志
- mysql binlog日志查詢(xún)不出語(yǔ)句問(wèn)題及解決
- MySQL中根據(jù)binlog日志進(jìn)行恢復(fù)的實(shí)現(xiàn)
相關(guān)文章
Ubuntu權(quán)限不足無(wú)法創(chuàng)建文件夾解決方案
這篇文章主要介紹了Ubuntu權(quán)限不足無(wú)法創(chuàng)建文件夾解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11詳解Python利用APScheduler框架實(shí)現(xiàn)定時(shí)任務(wù)
在做一些python工具的時(shí)候,常常會(huì)碰到定時(shí)器問(wèn)題,總覺(jué)著使用threading.timer或者schedule模塊非常不優(yōu)雅。所以本文將利用APScheduler框架實(shí)現(xiàn)定時(shí)任務(wù),需要的可以參考一下2022-03-03深度學(xué)習(xí)中shape[0]、shape[1]、shape[2]的區(qū)別詳解
本文主要介紹了深度學(xué)習(xí)中shape[0]、shape[1]、shape[2]的區(qū)別詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Python virtualenv虛擬環(huán)境實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了Python virtualenv虛擬環(huán)境實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Tensorflow之MNIST CNN實(shí)現(xiàn)并保存、加載模型
這篇文章主要為大家詳細(xì)介紹了Tensorflow之MNIST CNN實(shí)現(xiàn)并保存、加載模型,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-06-06