MySQL的commit是如何commit源碼解析
引言
MySQL 的 commit
命令提交事務(wù)時,內(nèi)部會進(jìn)行兩階段(Prepare 和 Commit)提交,這篇文章基于 MySQL 8.0.33 對 MySQL 的兩階段提交進(jìn)行源碼分析,帶你了解提交事務(wù)過程中都經(jīng)歷了什么。
以下是整體邏輯:
一、Prepare 階段
1. Binlog Prepare
獲取上一個事務(wù)最大的 sequence number
時間戳。
2. InnoDB Prepare
事務(wù)狀態(tài)設(shè)置為
prepared
;釋放 RC 及以下隔離級別的 GAP Lock;
將
Undo log
segment 的狀態(tài)從TRX_UNDO_ACTIVE
修改為TRX_UNDO_PREPARED
;Undo log
寫入事務(wù) XID。
二、Commit 階段
1. Stage 0
保證從實例的 commit order。
2. Flush Stage
根據(jù)
innodb_flush_log_at_trx_commit
參數(shù)進(jìn)行redo log
的刷盤操作獲取并清空
BINLOG_FLUSH_STAGE
和COMMIT_ORDER_FLUSH_STAGE
隊列存儲引擎層將
prepare
狀態(tài)的redo log
根據(jù)innodb_flush_log_at_trx_commit
參數(shù)刷盤不再阻塞 slave 的 preserve commit order 的執(zhí)行
調(diào)用
get_server_sidno()
和Gtid_state::get_automatic_gno()
生成 GTIDFlush binlog_cache_mngr
Flush stmt_cache
Flush trx_cache
生成 last_committed 和 sequence_number
flush GTID log event
將 trx_cache 中的數(shù)據(jù) flush 到 binlog cache 中
準(zhǔn)備提交事務(wù)后的 Binlog pos
遞增 prepread XID
插樁調(diào)用 after_flush,將已經(jīng) flush 的 binlog file 和 position 注冊到半同步復(fù)制插件中
如果
sync_binlog!=1
,在 flush stage 更新 Binlog 位點,并廣播 update 信號,從庫的 Dump 線程可以由此感知 Binlog 的更新
3. Sync Stage
根據(jù) sync_binlog 的參數(shù)設(shè)置進(jìn)行刷盤前的等待并調(diào)用 fsync () 進(jìn)行刷盤
如果 sync_binlog==1,在 sync stage 階段更新 binog 位點,并廣播 update 信號,從庫的 Dump 線程可以由此感知 Binlog 的更新
4. Commit Stage
after_sync hook(半同步復(fù)制 after_sync 的鉤子)
更新全局的 m_max_committed_transaction(用作后續(xù)事務(wù)的 last_committed),并初始化事務(wù)上下文的 sequence number
Binlog 層提交,什么也不做
存儲引擎層提交
為持久化 GTID 提前分配 update undo segment
更新數(shù)據(jù)字典中被修改表的 update_time 時間
分配 Mini-transaction handle 和 buffer
更新 undo 狀態(tài)
對于 insert 狀態(tài)從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_TO_FREE,update 修改為 TRX_UNDO_TO_PURGE
如果事務(wù)為 update 還需要將 rollback segments 分配 trx no,并將其添加到 purge 隊列中
將 update undo log header 添加到 history list 開頭釋放一些內(nèi)存對象
在系統(tǒng)事務(wù)表記錄 binlog 位點
關(guān)閉 mvcc read view
持久化 GTID
釋放 insert undo log
喚醒后臺線程開始干活,如 master thread、purge thread、page_cleaner
更新整組事務(wù)的 executed_gtid
在存儲引擎層提交之后,遞減 Prepared 狀態(tài)下的 XID 計數(shù)器
after_sync hook(半同步復(fù)制 after_commit 的鉤子)
廣播
m_stage_cond_binlog
信號變量,喚醒掛起的 follower
了解完整體邏輯,對源碼分析感興趣的請繼續(xù)往下(建議 PC 端閱讀)。
ha_commit_trans
函數(shù)主要判斷是否需要寫入 GTID 信息,并開始兩階段提交:
int?ha_commit_trans(THD?*thd,?bool?all,?bool?ignore_global_read_lock)?{ ??/* ????Save?transaction?owned?gtid?into?table?before?transaction?prepare ????if?binlog?is?disabled,?or?binlog?is?enabled?and?log_replica_updates ????is?disabled?with?slave?SQL?thread?or?slave?worker?thread. ??*/ ??std::tie(error,?need_clear_owned_gtid)?=?commit_owned_gtids(thd,?all); ... ??//?Prepare?階段 ??if?(!trn_ctx->no_2pc(trx_scope)?&&?(trn_ctx->rw_ha_count(trx_scope)?>?1)) ????error?=?tc_log->prepare(thd,?all); ... ??//?Commit?階段 ?if?(error?||?(error?=?tc_log->commit(thd,?all)))?{ ????ha_rollback_trans(thd,?all); ????error?=?1; ????goto?end; ??} }
Prepare 階段功能
兩階段提交的 Prepare 階段相對簡單,以下是 commit
命令入口及 Prepare 階段的堆棧和相關(guān)作用:
|mysql_execute_command |--trans_commit |----ha_commit_trans |------MYSQL_BIN_LOG::prepare // 開啟 binlog prepare 和 innodb prepare |--------ha_prepare_low // Binlog prepare:獲取上一個事務(wù)最大的 sequence number 時間戳 |----------binlog_prepare // innodb prepare |----------innobase_xa_prepare |------------trx_prepare_for_mysql // 1. 調(diào)用 trx_prepare_low // 2. 事務(wù)狀態(tài)設(shè)置為Prepared // 3. 釋放 RC 及以下隔離級別的 GAP Lock // 4. 刷盤 Redo(已推遲到 Commit 階段的 Flush stage) |--------------trx_prepare |----------------trx_prepare_low // 1. 將 undo log segment 的狀態(tài)從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_PREPARED // 2. undo log 寫入事務(wù) XID |------------------trx_undo_set_state_at_prepare
Commit 階段功能
Commit 階段的功能實現(xiàn)主要集中在 MYSQL_BIN_LOG::ordered_commit
函數(shù)中。
Flush 階段
首先看下 Stage 0 和 Stage 1,stage 0 主要是 8.0 新增的一個階段,主要是針對從庫保證 commit order。stage 1 就是大家耳熟能詳?shù)?Commit 階段的三個小階段其一的 Flush 階段了:
int?MYSQL_BIN_LOG::ordered_commit(THD?*thd,?bool?all,?bool?skip_commit)?{ ??/* ????Stage?#0:?保證從實例的?SQL?線程按照?Relay?log?的事務(wù)順序進(jìn)行提交 ??*/ ??if?(Commit_order_manager::wait_for_its_turn_before_flush_stage(thd)?|| ??????ending_trans(thd,?all)?|| ??????Commit_order_manager::get_rollback_status(thd))?{ ????if?(Commit_order_manager::wait(thd))?{ ??????return?thd->commit_error; ????} ??} ??/* ????Stage?#1:?flushing?transactions?to?binary?log ????While?flushing,?we?allow?new?threads?to?enter?and?will?process ????them?in?due?time.?Once?the?queue?was?empty,?we?cannot?reap ????anything?more?since?it?is?possible?that?a?thread?entered?and ????appointed?itself?leader?for?the?flush?phase. ??*/ ??if?(change_stage(thd,?Commit_stage_manager::BINLOG_FLUSH_STAGE,?thd,?nullptr, ???????????????????&LOCK_log))?{ ????DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(), ??????????????????????????thd->commit_error)); ????return?finish_commit(thd); ??} ??THD?*wait_queue?=?nullptr,?*final_queue?=?nullptr; ??mysql_mutex_t?*leave_mutex_before_commit_stage?=?nullptr; ??my_off_t?flush_end_pos?=?0; ??bool?update_binlog_end_pos_after_sync; ??//?Flush?階段主要的處理邏輯 ??flush_error?= ??????process_flush_stage_queue(&total_bytes,?&do_rotate,?&wait_queue); ??if?(flush_error?==?0?&&?total_bytes?>?0) ????/* ??????flush?binlog?cache到file?cache ????*/ ????flush_error?=?flush_cache_to_file(&flush_end_pos); ??//?后面根據(jù)?sync_binlog?參數(shù)決定更新?binlog?pos?的位置并廣播?Binlog?更新信號 ??update_binlog_end_pos_after_sync?=?(get_sync_period()?==?1); ??/* ????If?the?flush?finished?successfully,?we?can?call?the?after_flush ????hook.?Being?invoked?here,?we?have?the?guarantee?that?the?hook?is ????executed?before?the?before/after_send_hooks?on?the?dump?thread ????preventing?race?conditions?among?these?plug-ins. ??*/ ??if?(flush_error?==?0)?{ ????const?char?*file_name_ptr?=?log_file_name?+?dirname_length(log_file_name); ????assert(flush_end_pos?!=?0); ????/* ??????插樁調(diào)用?after_flush,將已經(jīng)?flush?的?binlog?file?和?position?注冊到半同步復(fù)制插件中, ??????用于后續(xù)對比 slave 應(yīng)答接受到的 binlog position。 ????*/ ????if?(RUN_HOOK(binlog_storage,?after_flush, ?????????????????(thd,?file_name_ptr,?flush_end_pos)))?{ ??????LogErr(ERROR_LEVEL,?ER_BINLOG_FAILED_TO_RUN_AFTER_FLUSH_HOOK); ??????flush_error?=?ER_ERROR_ON_WRITE; ????} ????//?如果?sync_binlog!=1,在?flush?stage?更新?binlog?位點并廣播?update?信號,從庫的?Dump?線程可以由此感知?Binlog?的更新 ????if?(!update_binlog_end_pos_after_sync)?update_binlog_end_pos(); ??}
Flush stage 的主要處理邏輯集中在 process_flush_stage_queue
:
int?MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t?*total_bytes_var, ?????????????????????????????????????????????bool?*rotate_var, ?????????????????????????????????????????????THD?**out_queue_var)?{ ??int?no_flushes?=?0; ??my_off_t?total_bytes?=?0; ??mysql_mutex_assert_owner(&LOCK_log); ??//?根據(jù)?innodb_flush_log_at_trx_commit?參數(shù)進(jìn)行?redo?log?的刷盤操作 ??THD?*first_seen?=?fetch_and_process_flush_stage_queue(); ??//?調(diào)用?get_server_sidno()?和?Gtid_state::get_automatic_gno?生成?GTID ??assign_automatic_gtids_to_flush_group(first_seen); ??/*?Flush?thread?caches?to?binary?log.?*/ ??for?(THD?*head?=?first_seen;?head;?head?=?head->next_to_commit)?{ ????Thd_backup_and_restore?switch_thd(current_thd,?head); ????/* ????? flush binlog_cache_mngr 的 stmt_cache和trx_cache。 ????? flush trx_cache: ????????-?生成?last_committed 和 sequence_number ????????-?flush?GTID?log?event ????????-?將?trx_cache?中的數(shù)據(jù)?flush?到?binlog?cache?中 ????????-?準(zhǔn)備提交事務(wù)后的?Binlog?pos ????????-?遞增?prepread?XID ????*/ ????std::pair<int,?my_off_t>?result?=?flush_thread_caches(head); ????total_bytes?+=?result.second; ????if?(flush_error?==?1)?flush_error?=?result.first; #ifndef?NDEBUG ????no_flushes++; #endif ??} ??*out_queue_var?=?first_seen; ??*total_bytes_var?=?total_bytes; ??if?(total_bytes?>?0?&& ??????(m_binlog_file->get_real_file_size()?>=?(my_off_t)max_size?|| ???????DBUG_EVALUATE_IF("simulate_max_binlog_size",?true,?false))) ????*rotate_var?=?true; #ifndef?NDEBUG ??DBUG_PRINT("info",?("no_flushes:=?%d",?no_flushes)); ??no_flushes?=?0; #endif ??return?flush_error; }
redo log 刷盤的堆棧如下:
// 獲取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 隊列,flush 事務(wù)到磁盤;不再阻塞 slave 的 preservecommitorder 的執(zhí)行 |fetch_and_process_flush_stage_queue // 存儲引擎層將 prepare 狀態(tài)的 redolog 根據(jù) innodb_flush_log_at_trx_commit 參數(shù)刷盤 |--ha_flush_logs |----innobase_flush_logs |------log_buffer_flush_to_disk
SYNC 階段
Sync 階段的代碼如下:
/* ??Stage?#2:?Syncing?binary?log?file?to?disk */ if?(change_stage(thd,?Commit_stage_manager::SYNC_STAGE,?wait_queue,?&LOCK_log, ?????????????????&LOCK_sync))?{ ??DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(), ????????????????????????thd->commit_error)); ??return?finish_commit(thd); } /* ??- sync_counter:commit group的數(shù)量 ??- get_sync_period():獲取sync_binlog參數(shù)的值 ??-?如果sync?stage隊列中的commit?group大于等于sync_binlog的值,當(dāng)前l(fā)eader就調(diào)用fsync()進(jìn)行刷盤操作(sync_binlog_file(false)), ????在sync之前可能會進(jìn)行等待,等待更多的commit group入隊,等待的時間為binlog_group_commit_sync_no_delay_count或binlog_group_commit_sync_delay,默認(rèn)都為0。 ??-?如果sync?stage隊列中的commit?group小于sync_binlog的值,當(dāng)前l(fā)eader不會調(diào)用fsync()進(jìn)行刷盤也不會等待 ??-?如果sync_binlog為0,每個commit?group都會觸發(fā)等待動作,但是不會sync ??-?如果sync_binlog為1,每個commit?group都會觸發(fā)等待動作,且會sync */ if?(!flush_error?&&?(sync_counter?+?1?>=?get_sync_period())) ??Commit_stage_manager::get_instance().wait_count_or_timeout( ??????opt_binlog_group_commit_sync_no_delay_count, ??????opt_binlog_group_commit_sync_delay,?Commit_stage_manager::SYNC_STAGE); final_queue?=?Commit_stage_manager::get_instance().fetch_queue_acquire_lock( ????Commit_stage_manager::SYNC_STAGE); if?(flush_error?==?0?&&?total_bytes?>?0)?{ ??DEBUG_SYNC(thd,?"before_sync_binlog_file"); ??std::pair<bool,?bool>?result?=?sync_binlog_file(false); ??sync_error?=?result.first; } /* ?如果sync_binlog==1,在sync?stage階段更新binog位點,并廣播update信號,從庫的Dump線程可以由此感知Binlog的更新 ?(位點在flush?stage中的process_flush_stage_queue() ???????????????????????|--flush_thread_caches() ???????????????????????|-----set_trans_pos()函數(shù)中設(shè)置) */ if?(update_binlog_end_pos_after_sync?&&?flush_error?==?0?&&?sync_error?==?0)?{ ??THD?*tmp_thd?=?final_queue; ??const?char?*binlog_file?=?nullptr; ??my_off_t?pos?=?0; ??while?(tmp_thd?!=?nullptr)?{ ????if?(tmp_thd->commit_error?==?THD::CE_NONE)?{ ??????tmp_thd->get_trans_fixed_pos(&binlog_file,?&pos); ????} ????tmp_thd?=?tmp_thd->next_to_commit; ??} ??if?(binlog_file?!=?nullptr?&&?pos?>?0)?{ ????update_binlog_end_pos(binlog_file,?pos); ??} } DEBUG_SYNC(thd,?"bgc_after_sync_stage_before_commit_stage"); leave_mutex_before_commit_stage?=?&LOCK_sync;
COMMIT 階段
Commit 階段的代碼如下:
??/* ????Stage?#3:?Commit?all?transactions?in?order. ??*/ commit_stage: ??/* binlog_order_commits:是否進(jìn)行 order commit,即保持 redo 和 binlog 的提交順序一致?*/ ??if?((opt_binlog_order_commits?||?Clone_handler::need_commit_order())?&& ??????(sync_error?==?0?||?binlog_error_action?!=?ABORT_SERVER))?{ ????if?(change_stage(thd,?Commit_stage_manager::COMMIT_STAGE,?final_queue, ?????????????????????leave_mutex_before_commit_stage,?&LOCK_commit))?{ ??????DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(), ????????????????????????????thd->commit_error)); ??????return?finish_commit(thd); ????} ????THD?*commit_queue?= ????????Commit_stage_manager::get_instance().fetch_queue_acquire_lock( ????????????Commit_stage_manager::COMMIT_STAGE); ????DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", ????????????????????DEBUG_SYNC(thd,?"before_process_commit_stage_queue");); ????if?(flush_error?==?0?&&?sync_error?==?0) ??????/*?after_sync?hook?*/ ??????sync_error?=?call_after_sync_hook(commit_queue); ????/* ??????Commit?階段的主要處理邏輯 ????*/ ????process_commit_stage_queue(thd,?commit_queue); ????/** ?????*?After?commit?stage ?????*/ ????if?(change_stage(thd,?Commit_stage_manager::AFTER_COMMIT_STAGE, ?????????????????????commit_queue,?&LOCK_commit,?&LOCK_after_commit))?{ ??????DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(), ????????????????????????????thd->commit_error)); ??????return?finish_commit(thd); ????} ????THD?*after_commit_queue?= ????????Commit_stage_manager::get_instance().fetch_queue_acquire_lock( ????????????Commit_stage_manager::AFTER_COMMIT_STAGE); ????/*?after_commit?hook?*/ ????process_after_commit_stage_queue(thd,?after_commit_queue); ????final_queue?=?after_commit_queue; ????mysql_mutex_unlock(&LOCK_after_commit); ??}?else?{ ????if?(leave_mutex_before_commit_stage) ??????mysql_mutex_unlock(leave_mutex_before_commit_stage); ????if?(flush_error?==?0?&&?sync_error?==?0) ??????sync_error?=?call_after_sync_hook(final_queue); ??} ??/*?廣播?m_stage_cond_binlog?信號變量,喚醒掛起的?follower?*/ ??Commit_stage_manager::get_instance().signal_done(final_queue); ??DBUG_EXECUTE_IF("block_leader_after_delete",?{ ????const?char?action[]?=?"now?SIGNAL?leader_proceed"; ????assert(!debug_sync_set_action(thd,?STRING_WITH_LEN(action))); ??};); ??/* ????Finish?the?commit?before?executing?a?rotate,?or?run?the?risk?of?a ????deadlock.?We?don't?need?the?return?value?here?since?it?is?in ????thd->commit_error,?which?is?returned?below. ??*/ ??(void)finish_commit(thd); ??DEBUG_SYNC(thd,?"bgc_after_commit_stage_before_rotation"); ??return?thd->commit_error?==?THD::CE_COMMIT_ERROR; }
Commit 階段的主要處理邏輯集中在 process_commit_stage_queue
函數(shù)中:
void?MYSQL_BIN_LOG::process_commit_stage_queue(THD?*thd,?THD?*first)?{ ??mysql_mutex_assert_owner(&LOCK_commit); #ifndef?NDEBUG ??thd->get_transaction()->m_flags.ready_preempt?= ??????true;??//?formality?by?the?leader #endif ??for?(THD?*head?=?first;?head;?head?=?head->next_to_commit)?{ ????DBUG_PRINT("debug",?("Thread?ID:?%u,?commit_error:?%d,?commit_pending:?%s", ?????????????????????????head->thread_id(),?head->commit_error, ?????????????????????????YESNO(head->tx_commit_pending))); ????DBUG_EXECUTE_IF( ????????"block_leader_after_delete", ????????if?(thd?!=?head)?{?DBUG_SET("+d,after_delete_wait");?};); ????/* ??????If?flushing?failed,?set?commit_error?for?the?session,?skip?the ??????transaction?and?proceed?with?the?next?transaction?instead.?This ??????will?mark?all?threads?as?failed,?since?the?flush?failed. ??????If?flush?succeeded,?attach?to?the?session?and?commit?it?in?the ??????engines. ????*/ #ifndef?NDEBUG ????Commit_stage_manager::get_instance().clear_preempt_status(head); #endif ????/* ??????更新全局的?m_max_committed_transaction(用作后續(xù)事務(wù)的 last_committed), ??????并初始本事務(wù)上下文的?sequence?number ????*/ ????if?(head->get_transaction()->sequence_number?!=?SEQ_UNINIT)?{ ??????mysql_mutex_lock(&LOCK_replica_trans_dep_tracker); ??????m_dependency_tracker.update_max_committed(head); ??????mysql_mutex_unlock(&LOCK_replica_trans_dep_tracker); ????} ????/* ??????Flush/Sync?error?should?be?ignored?and?continue ??????to?commit?phase.?And?thd->commit_error?cannot?be ??????COMMIT_ERROR?at?this?moment. ????*/ ????assert(head->commit_error?!=?THD::CE_COMMIT_ERROR); ????Thd_backup_and_restore?switch_thd(thd,?head); ????bool?all?=?head->get_transaction()->m_flags.real_commit; ????assert(!head->get_transaction()->m_flags.commit_low?|| ???????????head->get_transaction()->m_flags.ready_preempt);<br>??//?Binlog?Commit、Innodb?Commit ????::finish_transaction_in_engines(head,?all,?false); ????DBUG_PRINT("debug",?("commit_error:?%d,?commit_pending:?%s", ?????????????????????????head->commit_error,?YESNO(head->tx_commit_pending))); ??} ??/* ????鎖定?sidno,更新整組事務(wù)?的executed_gtid ????-?如果沒開啟?binlog,@@GLOBAL.GTID_PURGED?的值是從?executed_gtid?獲取的, ??????此時?@@GLOBAL.GTID_PURGED?的值和?@@GLOBAL.GTID_EXECUTED?永遠(yuǎn)是一致的, ??????就不需要在記錄?lost_gtids ????-?如果開啟了?binlog,但是未開啟?log_replica_updates,slave?的?SQL?線程或?slave?worker?線程 ??????將自身的?GTID?更新到?executed_gtids、lost_gtids ??*/ ??gtid_state->update_commit_group(first); ??for?(THD?*head?=?first;?head;?head?=?head->next_to_commit)?{ ????Thd_backup_and_restore?switch_thd(thd,?head); ????auto?all?=?head->get_transaction()->m_flags.real_commit; ????//?只針對外部?XA?事務(wù),在存儲引擎層將事務(wù)標(biāo)記為?Prepared ????trx_coordinator::set_prepared_in_tc_in_engines(head,?all); ????/* ??????在存儲引擎層提交之后,遞減?Prepared?狀態(tài)下的?XID?計數(shù)器 ????*/ ????if?(head->get_transaction()->m_flags.xid_written)?dec_prep_xids(head); ??} }
其中 ::finish_transaction_in_engines
函數(shù)是主要的存儲引擎層提交邏輯,相關(guān)堆棧如下:
|::finish_transaction_in_engines |--trx_coordinator::commit_in_engines |----ha_commit_low // Binlog 層提交什么也不做(空函數(shù)) |------binlog_commit // 存儲引擎層提交 |------innobase_commit |--------innobase_commit_low |----------trx_commit_for_mysql // 為持久化 GTID 提前分配 update undo segment |------------trx_undo_gtid_add_update_undo // 更新數(shù)據(jù)字典中被修改表的 update_time 時間 |------------trx_update_mod_tables_timestamp // 分配 Mini-transaction handle 和 buffer |------------trx_commit // 提交 mini-transaction |--------------trx_commit_low |----------------trx_write_serialisation_history // 更新 undo 狀態(tài): // 對于 insert 狀態(tài)從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_TO_FREE // update 修改為 TRX_UNDO_TO_PURGE // 如果事務(wù)為 update 還需要將 rollback segments 分配 trx no,并將其添加到 purge 隊列中 |------------------trx_undo_set_state_at_finish //將 update undo log header 添加到 history list 開頭釋放一些內(nèi)存對象; |------------------trx_undo_update_cleanup // 在系統(tǒng)事務(wù)表記錄 binlog 位點 |------------------trx_sys_update_mysql_binlog_offset |----------------trx_commit_in_memory //- 關(guān)閉 mvcc read view //- 持久化 GTID //- 釋放 insert undo log //- 喚醒后臺線程開始干活,如:master thread、purge thread、page_cleaner
階段轉(zhuǎn)換
階段轉(zhuǎn)換的邏輯主要是由 change_stage
中的 enroll_for
函數(shù)實現(xiàn):
進(jìn)入隊列的第一個線程會作為整組事務(wù)的 leader
后續(xù)進(jìn)入隊列的線程會作為整組事務(wù)的 follower
follower 線程掛起等待 m_stage_cond_binlog 信號變量喚醒
leader 負(fù)責(zé)提交整組事務(wù),提交完成后,發(fā)送 m_stage_cond_binlog 信號變量喚醒掛起的 follower
隊列轉(zhuǎn)化的主要邏輯是線程先入下個階段的隊列,然后再釋放上一個階段的 mutex,然后再獲取下一個階段的 mutex
Flush Stage 不會獲取 mutex
Sync Stage 需要獲取 LOCK_sync
Commit Stage 需要獲取 LOCK_commit mutex
After Commit Stage 需要獲取 LOCK_after_commit mutex
bool?Commit_stage_manager::enroll_for(StageID?stage,?THD?*thd, ?????????????????????????????????????mysql_mutex_t?*stage_mutex, ?????????????????????????????????????mysql_mutex_t?*enter_mutex)?{ ?//?如果隊列為空,線程就是?leader ?thd->rpl_thd_ctx.binlog_group_commit_ctx().assign_ticket(); ?bool?leader?=?this->append_to(stage,?thd); ?/* ??如果 FLUSH stage 隊列((BINLOG_FLUSH_STAGE 或 COMMIT_ORDER_FLUSH_STAGE)不為空,此線程就不能成為 leader。leader ??需要獲取?enter_mutex ?*/ ?if?(leader)?{ ???if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{ ?????leader?=?m_queue[BINLOG_FLUSH_STAGE].is_empty(); ???/* ???? leader 轉(zhuǎn)換的邏輯。 ???? session 的隊列有5種: ???????-?Binlog?flush?queue:?flush?redo?并寫?Binlog?File ???????- Commit order flush queue:?針對 commit order 的事務(wù),但是會參與 group commit 的開頭部分,直到引擎層的 flush。 ???????-?Sync?queue:?sync?transaction ???????-?Commit?queue:?提交事務(wù) ???????-?After?commit?queue:?調(diào)用事務(wù)的?after_commit?hook ????*/ ???}?else?if?(stage?==?BINLOG_FLUSH_STAGE?&&??//?當(dāng)前線程是 BINLOG_FLUSH_STAGE 中的第一個線程;但是 COMMIT_ORDER_FLUSH_STAGE ??????????????????????????????????????????????//?已經(jīng)有了?leader,此時當(dāng)前線程會掛起,等待?COMMIT_ORDER_FLUSH_STAGE?的?leader?的信號喚醒 ??????????????!m_queue[COMMIT_ORDER_FLUSH_STAGE].is_empty())?{ ?????/* ???????當(dāng)前事務(wù)是 binlog queue 中的第一個線程,但是在 commit order queue 中已經(jīng)有了一個 leader。 ???????此時當(dāng)前線程會作為 leader,而 commit order leader 會轉(zhuǎn)變?yōu)?follower。 ???????改變?leader?的原因是?commit?order?leader?不能作為?binlog?線程的?leader,因為?commit?order?threads ???????必須在 binlog threads 操作完之前離開 commit group。 ???????轉(zhuǎn)變 leader 為 followers 的步驟如下: ?????? 1. commit order thread 首先進(jìn)入 flush stage,并成為 commit order leader。 ???????2.?commit?order?leader?嘗試獲取?stage?mutex,這可能會需要一些時間,比如?mutex?已經(jīng)被上一個 ?????? commit group的leader獲取。 ?????? 3. 在此期間,一個 binlog 線程進(jìn)入了 flush stage。它需要等待來自 commit order leader 的信號。 ???????4.?commit?order?leader?獲取了?stage?mutex,然后它會檢查是否有??binlog?thread進(jìn)入了?flush?stage, ???????如果發(fā)現(xiàn)了就轉(zhuǎn)變 leader。 ???????5.?commit?order?leader?給??binlog?leader發(fā)送一個信號,并成為?follower,等待?commit?的完成 ???????(和其他 follower 的行為一致)。 ?????? 6. binlog leader 被 commit order leader 的信號喚醒并執(zhí)行 group commit。 ?????*/ ?????CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_binlog_leader_wait"); ?????while?(thd->tx_commit_pending) ???????mysql_cond_wait(&m_stage_cond_leader, ???????????????????????&m_queue_lock[BINLOG_FLUSH_STAGE]); ???} ?} ?unlock_queue(stage); ?/* ???通知下一個組提交事務(wù)進(jìn)入隊列 ?*/ ?if?(stage?==?BINLOG_FLUSH_STAGE)?{ ???Commit_order_manager::finish_one(thd); ???CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_binlog_leader_wait"); ?}?else?if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{ ???Commit_order_manager::finish_one(thd); ?} ?/* ???當(dāng)進(jìn)入第一個?stage?時,可以不用獲取?stage?mutex ?*/ ?if?(stage_mutex?&&?need_unlock_stage_mutex)?mysql_mutex_unlock(stage_mutex); ?/* ???如果隊列非空,當(dāng)前線程作為?follower?等待?leader?處理隊列 ?*/ ?if?(!leader)?{ ???CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_follower_wait"); ???mysql_mutex_lock(&m_lock_done); #ifndef?NDEBUG ???thd->get_transaction()->m_flags.ready_preempt?=?true; ???if?(leader_await_preempt_status)?mysql_cond_signal(&m_cond_preempt); #endif ???// tx_commit_pending:還有事務(wù) commit 操作未完成 ???while?(thd->tx_commit_pending)?{ ?????if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{ ???????mysql_cond_wait(&m_stage_cond_commit_order,?&m_lock_done); ?????}?else?{ ???????//?follower?線程在此處掛起,等待?leader?提交事務(wù)完成后被喚醒 ???????mysql_cond_wait(&m_stage_cond_binlog,?&m_lock_done); ?????} ???} ???mysql_mutex_unlock(&m_lock_done); ???return?false; ?} #ifndef?NDEBUG ?if?(stage?==?Commit_stage_manager::SYNC_STAGE) ???DEBUG_SYNC(thd,?"bgc_between_flush_and_sync"); #endif ?bool?need_lock_enter_mutex?=?false; ?if?(leader?&&?enter_mutex?!=?nullptr)?{ ???/* ?????如果由于在輪替 Binlog 時已經(jīng)獲取了 LOCK_log,就不在需要獲取 enter_mutex。 ???*/ ???need_lock_enter_mutex?=?!(mysql_bin_log.is_rotating_caused_by_incident?&& ?????????????????????????????enter_mutex?==?mysql_bin_log.get_log_lock()); ???if?(need_lock_enter_mutex) ?????mysql_mutex_lock(enter_mutex); ???else ?????mysql_mutex_assert_owner(enter_mutex); ?} ?//?leader?轉(zhuǎn)換的邏輯 ?if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{ ???CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP( ???????"after_commit_order_thread_becomes_leader"); ???lock_queue(stage); ???if?(!m_queue[BINLOG_FLUSH_STAGE].is_empty())?{ ?????if?(need_lock_enter_mutex)?mysql_mutex_unlock(enter_mutex); ?????THD?*binlog_leader?=?m_queue[BINLOG_FLUSH_STAGE].get_leader(); ?????binlog_leader->tx_commit_pending?=?false; ?????mysql_cond_signal(&m_stage_cond_leader); ?????unlock_queue(stage); ?????mysql_mutex_lock(&m_lock_done); ?????/*?wait?for?signal?from?binlog?leader?*/ ?????CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP( ?????????"before_commit_order_leader_waits_for_binlog_leader"); ?????while?(thd->tx_commit_pending) ???????mysql_cond_wait(&m_stage_cond_commit_order,?&m_lock_done); ?????mysql_mutex_unlock(&m_lock_done); ?????leader?=?false; ?????return?leader; ???} ?} ?return?leader;
以上就是MySQL的commit是如何commit源碼解析的詳細(xì)內(nèi)容,更多關(guān)于MySQL commit 的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MySQL用B+樹作為索引結(jié)構(gòu)有什么好處
這篇文章主要介紹了MySQL用B+樹作為索引結(jié)構(gòu)有什么好處,幫助大家更好的理解和使用MySQL 索引,感興趣的朋友可以了解下2021-01-01MySQL優(yōu)化GROUP BY(松散索引掃描與緊湊索引掃描)
這篇文章主要介紹了MySQL優(yōu)化GROUP BY(松散索引掃描與緊湊索引掃描),需要的朋友可以參考下2016-05-05MySQL安裝第四步報錯(initializing?database報錯)的圖文解決方法
這篇文章主要給大家介紹了關(guān)于MySQL安裝第四步報錯(initializing?database報錯)的解決方法,"initializing?database"?通常出現(xiàn)在安裝MySQL的過程中,表示MySQL數(shù)據(jù)庫初始化過程中遇到了問題,需要的朋友可以參考下2024-06-06