欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

MySQL的commit是如何commit源碼解析

 更新時間:2023年06月02日 12:02:36   作者:愛可生開源社區(qū)  
這篇文章主要為大家介紹了MySQL的commit是如何commit源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

MySQL 的 commit 命令提交事務(wù)時,內(nèi)部會進(jìn)行兩階段(Prepare 和 Commit)提交,這篇文章基于 MySQL 8.0.33 對 MySQL 的兩階段提交進(jìn)行源碼分析,帶你了解提交事務(wù)過程中都經(jīng)歷了什么。

以下是整體邏輯:

一、Prepare 階段

1. Binlog Prepare

獲取上一個事務(wù)最大的 sequence number 時間戳。

2. InnoDB Prepare

  • 事務(wù)狀態(tài)設(shè)置為 prepared;

  • 釋放 RC 及以下隔離級別的 GAP Lock;

  • 將 Undo log segment 的狀態(tài)從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_PREPARED

  • Undo log 寫入事務(wù) XID。

二、Commit 階段

1. Stage 0

保證從實例的 commit order。

2. Flush Stage

  • 根據(jù) innodb_flush_log_at_trx_commit 參數(shù)進(jìn)行 redo log 的刷盤操作

    • 獲取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 隊列

    • 存儲引擎層將 prepare 狀態(tài)的 redo log 根據(jù) innodb_flush_log_at_trx_commit 參數(shù)刷盤

    • 不再阻塞 slave 的 preserve commit order 的執(zhí)行

  • 調(diào)用 get_server_sidno() 和 Gtid_state::get_automatic_gno() 生成 GTID

  • Flush binlog_cache_mngr

    • Flush stmt_cache

    • Flush trx_cache

      • 生成 last_committed 和 sequence_number

      • flush GTID log event

      • 將 trx_cache 中的數(shù)據(jù) flush 到 binlog cache 中

      • 準(zhǔn)備提交事務(wù)后的 Binlog pos

      • 遞增 prepread XID

  • 插樁調(diào)用 after_flush,將已經(jīng) flush 的 binlog file 和 position 注冊到半同步復(fù)制插件中

  • 如果 sync_binlog!=1,在 flush stage 更新 Binlog 位點,并廣播 update 信號,從庫的 Dump 線程可以由此感知 Binlog 的更新

3. Sync Stage

  • 根據(jù) sync_binlog 的參數(shù)設(shè)置進(jìn)行刷盤前的等待并調(diào)用 fsync () 進(jìn)行刷盤

  • 如果 sync_binlog==1,在 sync stage 階段更新 binog 位點,并廣播 update 信號,從庫的 Dump 線程可以由此感知 Binlog 的更新

4. Commit Stage

  • after_sync hook(半同步復(fù)制 after_sync 的鉤子)

  • 更新全局的 m_max_committed_transaction(用作后續(xù)事務(wù)的 last_committed),并初始化事務(wù)上下文的 sequence number

  • Binlog 層提交,什么也不做

  • 存儲引擎層提交

    • 為持久化 GTID 提前分配 update undo segment

    • 更新數(shù)據(jù)字典中被修改表的 update_time 時間

    • 分配 Mini-transaction handle 和 buffer

    • 更新 undo 狀態(tài)

      • 對于 insert 狀態(tài)從 TRX_UNDO_ACTIVE  修改為 TRX_UNDO_TO_FREE,update 修改為 TRX_UNDO_TO_PURGE

      • 如果事務(wù)為 update 還需要將 rollback segments 分配 trx no,并將其添加到 purge 隊列中

    • 將 update undo log header 添加到 history list 開頭釋放一些內(nèi)存對象

    • 在系統(tǒng)事務(wù)表記錄 binlog 位點

    • 關(guān)閉 mvcc read view

    • 持久化 GTID

    • 釋放 insert undo log

    • 喚醒后臺線程開始干活,如 master thread、purge thread、page_cleaner

  • 更新整組事務(wù)的 executed_gtid

  • 在存儲引擎層提交之后,遞減 Prepared 狀態(tài)下的 XID 計數(shù)器

  • after_sync hook(半同步復(fù)制 after_commit 的鉤子)

  • 廣播 m_stage_cond_binlog 信號變量,喚醒掛起的 follower

了解完整體邏輯,對源碼分析感興趣的請繼續(xù)往下(建議 PC 端閱讀)。

ha_commit_trans 函數(shù)主要判斷是否需要寫入 GTID 信息,并開始兩階段提交:

int?ha_commit_trans(THD?*thd,?bool?all,?bool?ignore_global_read_lock)?{
??/*
????Save?transaction?owned?gtid?into?table?before?transaction?prepare
????if?binlog?is?disabled,?or?binlog?is?enabled?and?log_replica_updates
????is?disabled?with?slave?SQL?thread?or?slave?worker?thread.
??*/
??std::tie(error,?need_clear_owned_gtid)?=?commit_owned_gtids(thd,?all);
...
??//?Prepare?階段
??if?(!trn_ctx->no_2pc(trx_scope)?&&?(trn_ctx->rw_ha_count(trx_scope)?>?1))
????error?=?tc_log->prepare(thd,?all);
...
??//?Commit?階段
?if?(error?||?(error?=?tc_log->commit(thd,?all)))?{
????ha_rollback_trans(thd,?all);
????error?=?1;
????goto?end;
??}
}

Prepare 階段功能

兩階段提交的 Prepare 階段相對簡單,以下是 commit 命令入口及 Prepare 階段的堆棧和相關(guān)作用:

|mysql_execute_command
|--trans_commit
|----ha_commit_trans
|------MYSQL_BIN_LOG::prepare
// 開啟 binlog prepare 和 innodb prepare
|--------ha_prepare_low       
// Binlog prepare:獲取上一個事務(wù)最大的 sequence number 時間戳
|----------binlog_prepare   
// innodb prepare
|----------innobase_xa_prepare                
|------------trx_prepare_for_mysql
// 1. 調(diào)用 trx_prepare_low 
// 2. 事務(wù)狀態(tài)設(shè)置為Prepared 
// 3. 釋放 RC 及以下隔離級別的 GAP Lock 
// 4. 刷盤 Redo(已推遲到 Commit 階段的 Flush stage)
|--------------trx_prepare                        
|----------------trx_prepare_low
// 1. 將 undo log segment 的狀態(tài)從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_PREPARED 
// 2. undo log 寫入事務(wù) XID
|------------------trx_undo_set_state_at_prepare

Commit 階段功能

Commit 階段的功能實現(xiàn)主要集中在 MYSQL_BIN_LOG::ordered_commit 函數(shù)中。

Flush 階段

首先看下 Stage 0 和 Stage 1,stage 0 主要是 8.0 新增的一個階段,主要是針對從庫保證 commit order。stage 1 就是大家耳熟能詳?shù)?Commit 階段的三個小階段其一的 Flush 階段了:

int?MYSQL_BIN_LOG::ordered_commit(THD?*thd,?bool?all,?bool?skip_commit)?{
??/*
????Stage?#0:?保證從實例的?SQL?線程按照?Relay?log?的事務(wù)順序進(jìn)行提交
??*/
??if?(Commit_order_manager::wait_for_its_turn_before_flush_stage(thd)?||
??????ending_trans(thd,?all)?||
??????Commit_order_manager::get_rollback_status(thd))?{
????if?(Commit_order_manager::wait(thd))?{
??????return?thd->commit_error;
????}
??}
??/*
????Stage?#1:?flushing?transactions?to?binary?log
????While?flushing,?we?allow?new?threads?to?enter?and?will?process
????them?in?due?time.?Once?the?queue?was?empty,?we?cannot?reap
????anything?more?since?it?is?possible?that?a?thread?entered?and
????appointed?itself?leader?for?the?flush?phase.
??*/
??if?(change_stage(thd,?Commit_stage_manager::BINLOG_FLUSH_STAGE,?thd,?nullptr,
???????????????????&LOCK_log))?{
????DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(),
??????????????????????????thd->commit_error));
????return?finish_commit(thd);
??}
??THD?*wait_queue?=?nullptr,?*final_queue?=?nullptr;
??mysql_mutex_t?*leave_mutex_before_commit_stage?=?nullptr;
??my_off_t?flush_end_pos?=?0;
??bool?update_binlog_end_pos_after_sync;
??//?Flush?階段主要的處理邏輯
??flush_error?=
??????process_flush_stage_queue(&total_bytes,?&do_rotate,?&wait_queue);
??if?(flush_error?==?0?&&?total_bytes?>?0)
????/*
??????flush?binlog?cache到file?cache
????*/
????flush_error?=?flush_cache_to_file(&flush_end_pos);
??//?后面根據(jù)?sync_binlog?參數(shù)決定更新?binlog?pos?的位置并廣播?Binlog?更新信號
??update_binlog_end_pos_after_sync?=?(get_sync_period()?==?1);
??/*
????If?the?flush?finished?successfully,?we?can?call?the?after_flush
????hook.?Being?invoked?here,?we?have?the?guarantee?that?the?hook?is
????executed?before?the?before/after_send_hooks?on?the?dump?thread
????preventing?race?conditions?among?these?plug-ins.
??*/
??if?(flush_error?==?0)?{
????const?char?*file_name_ptr?=?log_file_name?+?dirname_length(log_file_name);
????assert(flush_end_pos?!=?0);
????/*
??????插樁調(diào)用?after_flush,將已經(jīng)?flush?的?binlog?file?和?position?注冊到半同步復(fù)制插件中,
??????用于后續(xù)對比 slave 應(yīng)答接受到的 binlog position。
????*/
????if?(RUN_HOOK(binlog_storage,?after_flush,
?????????????????(thd,?file_name_ptr,?flush_end_pos)))?{
??????LogErr(ERROR_LEVEL,?ER_BINLOG_FAILED_TO_RUN_AFTER_FLUSH_HOOK);
??????flush_error?=?ER_ERROR_ON_WRITE;
????}
????//?如果?sync_binlog!=1,在?flush?stage?更新?binlog?位點并廣播?update?信號,從庫的?Dump?線程可以由此感知?Binlog?的更新
????if?(!update_binlog_end_pos_after_sync)?update_binlog_end_pos();
??}

Flush stage 的主要處理邏輯集中在 process_flush_stage_queue

int?MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t?*total_bytes_var,
?????????????????????????????????????????????bool?*rotate_var,
?????????????????????????????????????????????THD?**out_queue_var)?{
??int?no_flushes?=?0;
??my_off_t?total_bytes?=?0;
??mysql_mutex_assert_owner(&LOCK_log);
??//?根據(jù)?innodb_flush_log_at_trx_commit?參數(shù)進(jìn)行?redo?log?的刷盤操作
??THD?*first_seen?=?fetch_and_process_flush_stage_queue();
??//?調(diào)用?get_server_sidno()?和?Gtid_state::get_automatic_gno?生成?GTID
??assign_automatic_gtids_to_flush_group(first_seen);
??/*?Flush?thread?caches?to?binary?log.?*/
??for?(THD?*head?=?first_seen;?head;?head?=?head->next_to_commit)?{
????Thd_backup_and_restore?switch_thd(current_thd,?head);
????/*
????? flush binlog_cache_mngr 的 stmt_cache和trx_cache。
????? flush trx_cache:
????????-?生成?last_committed 和 sequence_number
????????-?flush?GTID?log?event
????????-?將?trx_cache?中的數(shù)據(jù)?flush?到?binlog?cache?中
????????-?準(zhǔn)備提交事務(wù)后的?Binlog?pos
????????-?遞增?prepread?XID
????*/
????std::pair<int,?my_off_t>?result?=?flush_thread_caches(head);
????total_bytes?+=?result.second;
????if?(flush_error?==?1)?flush_error?=?result.first;
#ifndef?NDEBUG
????no_flushes++;
#endif
??}
??*out_queue_var?=?first_seen;
??*total_bytes_var?=?total_bytes;
??if?(total_bytes?>?0?&&
??????(m_binlog_file->get_real_file_size()?>=?(my_off_t)max_size?||
???????DBUG_EVALUATE_IF("simulate_max_binlog_size",?true,?false)))
????*rotate_var?=?true;
#ifndef?NDEBUG
??DBUG_PRINT("info",?("no_flushes:=?%d",?no_flushes));
??no_flushes?=?0;
#endif
??return?flush_error;
}

redo log 刷盤的堆棧如下:

// 獲取并清空 BINLOG_FLUSH_STAGE 和 COMMIT_ORDER_FLUSH_STAGE 隊列,flush 事務(wù)到磁盤;不再阻塞 slave 的 preservecommitorder 的執(zhí)行
|fetch_and_process_flush_stage_queue  
// 存儲引擎層將 prepare 狀態(tài)的 redolog 根據(jù) innodb_flush_log_at_trx_commit 參數(shù)刷盤
|--ha_flush_logs                      
|----innobase_flush_logs
|------log_buffer_flush_to_disk

SYNC 階段

Sync 階段的代碼如下:

/*
??Stage?#2:?Syncing?binary?log?file?to?disk
*/
if?(change_stage(thd,?Commit_stage_manager::SYNC_STAGE,?wait_queue,?&LOCK_log,
?????????????????&LOCK_sync))?{
??DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(),
????????????????????????thd->commit_error));
??return?finish_commit(thd);
}
/*
??- sync_counter:commit group的數(shù)量
??- get_sync_period():獲取sync_binlog參數(shù)的值
??-?如果sync?stage隊列中的commit?group大于等于sync_binlog的值,當(dāng)前l(fā)eader就調(diào)用fsync()進(jìn)行刷盤操作(sync_binlog_file(false)),
????在sync之前可能會進(jìn)行等待,等待更多的commit group入隊,等待的時間為binlog_group_commit_sync_no_delay_count或binlog_group_commit_sync_delay,默認(rèn)都為0。
??-?如果sync?stage隊列中的commit?group小于sync_binlog的值,當(dāng)前l(fā)eader不會調(diào)用fsync()進(jìn)行刷盤也不會等待
??-?如果sync_binlog為0,每個commit?group都會觸發(fā)等待動作,但是不會sync
??-?如果sync_binlog為1,每個commit?group都會觸發(fā)等待動作,且會sync
*/
if?(!flush_error?&&?(sync_counter?+?1?>=?get_sync_period()))
??Commit_stage_manager::get_instance().wait_count_or_timeout(
??????opt_binlog_group_commit_sync_no_delay_count,
??????opt_binlog_group_commit_sync_delay,?Commit_stage_manager::SYNC_STAGE);
final_queue?=?Commit_stage_manager::get_instance().fetch_queue_acquire_lock(
????Commit_stage_manager::SYNC_STAGE);
if?(flush_error?==?0?&&?total_bytes?>?0)?{
??DEBUG_SYNC(thd,?"before_sync_binlog_file");
??std::pair<bool,?bool>?result?=?sync_binlog_file(false);
??sync_error?=?result.first;
}
/*
?如果sync_binlog==1,在sync?stage階段更新binog位點,并廣播update信號,從庫的Dump線程可以由此感知Binlog的更新
?(位點在flush?stage中的process_flush_stage_queue()
???????????????????????|--flush_thread_caches()
???????????????????????|-----set_trans_pos()函數(shù)中設(shè)置)
*/
if?(update_binlog_end_pos_after_sync?&&?flush_error?==?0?&&?sync_error?==?0)?{
??THD?*tmp_thd?=?final_queue;
??const?char?*binlog_file?=?nullptr;
??my_off_t?pos?=?0;
??while?(tmp_thd?!=?nullptr)?{
????if?(tmp_thd->commit_error?==?THD::CE_NONE)?{
??????tmp_thd->get_trans_fixed_pos(&binlog_file,?&pos);
????}
????tmp_thd?=?tmp_thd->next_to_commit;
??}
??if?(binlog_file?!=?nullptr?&&?pos?>?0)?{
????update_binlog_end_pos(binlog_file,?pos);
??}
}
DEBUG_SYNC(thd,?"bgc_after_sync_stage_before_commit_stage");
leave_mutex_before_commit_stage?=?&LOCK_sync;

COMMIT 階段

Commit 階段的代碼如下:

??/*
????Stage?#3:?Commit?all?transactions?in?order.
??*/
commit_stage:
??/* binlog_order_commits:是否進(jìn)行 order commit,即保持 redo 和 binlog 的提交順序一致?*/
??if?((opt_binlog_order_commits?||?Clone_handler::need_commit_order())?&&
??????(sync_error?==?0?||?binlog_error_action?!=?ABORT_SERVER))?{
????if?(change_stage(thd,?Commit_stage_manager::COMMIT_STAGE,?final_queue,
?????????????????????leave_mutex_before_commit_stage,?&LOCK_commit))?{
??????DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(),
????????????????????????????thd->commit_error));
??????return?finish_commit(thd);
????}
????THD?*commit_queue?=
????????Commit_stage_manager::get_instance().fetch_queue_acquire_lock(
????????????Commit_stage_manager::COMMIT_STAGE);
????DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
????????????????????DEBUG_SYNC(thd,?"before_process_commit_stage_queue"););
????if?(flush_error?==?0?&&?sync_error?==?0)
??????/*?after_sync?hook?*/
??????sync_error?=?call_after_sync_hook(commit_queue);
????/*
??????Commit?階段的主要處理邏輯
????*/
????process_commit_stage_queue(thd,?commit_queue);
????/**
?????*?After?commit?stage
?????*/
????if?(change_stage(thd,?Commit_stage_manager::AFTER_COMMIT_STAGE,
?????????????????????commit_queue,?&LOCK_commit,?&LOCK_after_commit))?{
??????DBUG_PRINT("return",?("Thread?ID:?%u,?commit_error:?%d",?thd->thread_id(),
????????????????????????????thd->commit_error));
??????return?finish_commit(thd);
????}
????THD?*after_commit_queue?=
????????Commit_stage_manager::get_instance().fetch_queue_acquire_lock(
????????????Commit_stage_manager::AFTER_COMMIT_STAGE);
????/*?after_commit?hook?*/
????process_after_commit_stage_queue(thd,?after_commit_queue);
????final_queue?=?after_commit_queue;
????mysql_mutex_unlock(&LOCK_after_commit);
??}?else?{
????if?(leave_mutex_before_commit_stage)
??????mysql_mutex_unlock(leave_mutex_before_commit_stage);
????if?(flush_error?==?0?&&?sync_error?==?0)
??????sync_error?=?call_after_sync_hook(final_queue);
??}
??/*?廣播?m_stage_cond_binlog?信號變量,喚醒掛起的?follower?*/
??Commit_stage_manager::get_instance().signal_done(final_queue);
??DBUG_EXECUTE_IF("block_leader_after_delete",?{
????const?char?action[]?=?"now?SIGNAL?leader_proceed";
????assert(!debug_sync_set_action(thd,?STRING_WITH_LEN(action)));
??};);
??/*
????Finish?the?commit?before?executing?a?rotate,?or?run?the?risk?of?a
????deadlock.?We?don't?need?the?return?value?here?since?it?is?in
????thd->commit_error,?which?is?returned?below.
??*/
??(void)finish_commit(thd);
??DEBUG_SYNC(thd,?"bgc_after_commit_stage_before_rotation");
??return?thd->commit_error?==?THD::CE_COMMIT_ERROR;
}

Commit 階段的主要處理邏輯集中在 process_commit_stage_queue 函數(shù)中:

void?MYSQL_BIN_LOG::process_commit_stage_queue(THD?*thd,?THD?*first)?{
??mysql_mutex_assert_owner(&LOCK_commit);
#ifndef?NDEBUG
??thd->get_transaction()->m_flags.ready_preempt?=
??????true;??//?formality?by?the?leader
#endif
??for?(THD?*head?=?first;?head;?head?=?head->next_to_commit)?{
????DBUG_PRINT("debug",?("Thread?ID:?%u,?commit_error:?%d,?commit_pending:?%s",
?????????????????????????head->thread_id(),?head->commit_error,
?????????????????????????YESNO(head->tx_commit_pending)));
????DBUG_EXECUTE_IF(
????????"block_leader_after_delete",
????????if?(thd?!=?head)?{?DBUG_SET("+d,after_delete_wait");?};);
????/*
??????If?flushing?failed,?set?commit_error?for?the?session,?skip?the
??????transaction?and?proceed?with?the?next?transaction?instead.?This
??????will?mark?all?threads?as?failed,?since?the?flush?failed.
??????If?flush?succeeded,?attach?to?the?session?and?commit?it?in?the
??????engines.
????*/
#ifndef?NDEBUG
????Commit_stage_manager::get_instance().clear_preempt_status(head);
#endif
????/*
??????更新全局的?m_max_committed_transaction(用作后續(xù)事務(wù)的 last_committed),
??????并初始本事務(wù)上下文的?sequence?number
????*/
????if?(head->get_transaction()->sequence_number?!=?SEQ_UNINIT)?{
??????mysql_mutex_lock(&LOCK_replica_trans_dep_tracker);
??????m_dependency_tracker.update_max_committed(head);
??????mysql_mutex_unlock(&LOCK_replica_trans_dep_tracker);
????}
????/*
??????Flush/Sync?error?should?be?ignored?and?continue
??????to?commit?phase.?And?thd->commit_error?cannot?be
??????COMMIT_ERROR?at?this?moment.
????*/
????assert(head->commit_error?!=?THD::CE_COMMIT_ERROR);
????Thd_backup_and_restore?switch_thd(thd,?head);
????bool?all?=?head->get_transaction()->m_flags.real_commit;
????assert(!head->get_transaction()->m_flags.commit_low?||
???????????head->get_transaction()->m_flags.ready_preempt);<br>??//?Binlog?Commit、Innodb?Commit
????::finish_transaction_in_engines(head,?all,?false);
????DBUG_PRINT("debug",?("commit_error:?%d,?commit_pending:?%s",
?????????????????????????head->commit_error,?YESNO(head->tx_commit_pending)));
??}
??/*
????鎖定?sidno,更新整組事務(wù)?的executed_gtid
????-?如果沒開啟?binlog,@@GLOBAL.GTID_PURGED?的值是從?executed_gtid?獲取的,
??????此時?@@GLOBAL.GTID_PURGED?的值和?@@GLOBAL.GTID_EXECUTED?永遠(yuǎn)是一致的,
??????就不需要在記錄?lost_gtids
????-?如果開啟了?binlog,但是未開啟?log_replica_updates,slave?的?SQL?線程或?slave?worker?線程
??????將自身的?GTID?更新到?executed_gtids、lost_gtids
??*/
??gtid_state->update_commit_group(first);
??for?(THD?*head?=?first;?head;?head?=?head->next_to_commit)?{
????Thd_backup_and_restore?switch_thd(thd,?head);
????auto?all?=?head->get_transaction()->m_flags.real_commit;
????//?只針對外部?XA?事務(wù),在存儲引擎層將事務(wù)標(biāo)記為?Prepared
????trx_coordinator::set_prepared_in_tc_in_engines(head,?all);
????/*
??????在存儲引擎層提交之后,遞減?Prepared?狀態(tài)下的?XID?計數(shù)器
????*/
????if?(head->get_transaction()->m_flags.xid_written)?dec_prep_xids(head);
??}
}

其中 ::finish_transaction_in_engines  函數(shù)是主要的存儲引擎層提交邏輯,相關(guān)堆棧如下:

|::finish_transaction_in_engines
|--trx_coordinator::commit_in_engines
|----ha_commit_low
// Binlog 層提交什么也不做(空函數(shù))
|------binlog_commit
// 存儲引擎層提交
|------innobase_commit                                
|--------innobase_commit_low
|----------trx_commit_for_mysql
// 為持久化 GTID 提前分配 update undo segment
|------------trx_undo_gtid_add_update_undo  
// 更新數(shù)據(jù)字典中被修改表的 update_time 時間
|------------trx_update_mod_tables_timestamp     
// 分配 Mini-transaction handle 和 buffer
|------------trx_commit          
// 提交 mini-transaction
|--------------trx_commit_low                         
|----------------trx_write_serialisation_history
// 更新 undo 狀態(tài):
// 對于 insert 狀態(tài)從 TRX_UNDO_ACTIVE 修改為 TRX_UNDO_TO_FREE
// update 修改為 TRX_UNDO_TO_PURGE
// 如果事務(wù)為 update 還需要將 rollback segments 分配 trx no,并將其添加到 purge 隊列中
|------------------trx_undo_set_state_at_finish      
//將 update undo log header 添加到 history list 開頭釋放一些內(nèi)存對象;
|------------------trx_undo_update_cleanup  
 // 在系統(tǒng)事務(wù)表記錄 binlog 位點
|------------------trx_sys_update_mysql_binlog_offset 
|----------------trx_commit_in_memory
//- 關(guān)閉 mvcc read view
//- 持久化 GTID
//- 釋放 insert undo log
//- 喚醒后臺線程開始干活,如:master thread、purge thread、page_cleaner

階段轉(zhuǎn)換

階段轉(zhuǎn)換的邏輯主要是由 change_stage 中的 enroll_for 函數(shù)實現(xiàn):

  • 進(jìn)入隊列的第一個線程會作為整組事務(wù)的 leader

  • 后續(xù)進(jìn)入隊列的線程會作為整組事務(wù)的 follower

  • follower 線程掛起等待 m_stage_cond_binlog 信號變量喚醒

  • leader 負(fù)責(zé)提交整組事務(wù),提交完成后,發(fā)送 m_stage_cond_binlog 信號變量喚醒掛起的 follower

  • 隊列轉(zhuǎn)化的主要邏輯是線程先入下個階段的隊列,然后再釋放上一個階段的 mutex,然后再獲取下一個階段的 mutex

  • Flush Stage 不會獲取 mutex

  • Sync Stage 需要獲取 LOCK_sync

  • Commit Stage 需要獲取 LOCK_commit mutex

  • After Commit Stage 需要獲取 LOCK_after_commit mutex

bool?Commit_stage_manager::enroll_for(StageID?stage,?THD?*thd,
?????????????????????????????????????mysql_mutex_t?*stage_mutex,
?????????????????????????????????????mysql_mutex_t?*enter_mutex)?{
?//?如果隊列為空,線程就是?leader
?thd->rpl_thd_ctx.binlog_group_commit_ctx().assign_ticket();
?bool?leader?=?this->append_to(stage,?thd);
?/*
??如果 FLUSH stage 隊列((BINLOG_FLUSH_STAGE 或 COMMIT_ORDER_FLUSH_STAGE)不為空,此線程就不能成為 leader。leader
??需要獲取?enter_mutex
?*/
?if?(leader)?{
???if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{
?????leader?=?m_queue[BINLOG_FLUSH_STAGE].is_empty();
???/*
???? leader 轉(zhuǎn)換的邏輯。
???? session 的隊列有5種:
???????-?Binlog?flush?queue:?flush?redo?并寫?Binlog?File
???????- Commit order flush queue:?針對 commit order 的事務(wù),但是會參與 group commit 的開頭部分,直到引擎層的 flush。
???????-?Sync?queue:?sync?transaction
???????-?Commit?queue:?提交事務(wù)
???????-?After?commit?queue:?調(diào)用事務(wù)的?after_commit?hook
????*/
???}?else?if?(stage?==?BINLOG_FLUSH_STAGE?&&??//?當(dāng)前線程是 BINLOG_FLUSH_STAGE 中的第一個線程;但是 COMMIT_ORDER_FLUSH_STAGE
??????????????????????????????????????????????//?已經(jīng)有了?leader,此時當(dāng)前線程會掛起,等待?COMMIT_ORDER_FLUSH_STAGE?的?leader?的信號喚醒
??????????????!m_queue[COMMIT_ORDER_FLUSH_STAGE].is_empty())?{
?????/*
???????當(dāng)前事務(wù)是 binlog queue 中的第一個線程,但是在 commit order queue 中已經(jīng)有了一個 leader。
???????此時當(dāng)前線程會作為 leader,而 commit order leader 會轉(zhuǎn)變?yōu)?follower。
???????改變?leader?的原因是?commit?order?leader?不能作為?binlog?線程的?leader,因為?commit?order?threads
???????必須在 binlog threads 操作完之前離開 commit group。
???????轉(zhuǎn)變 leader 為 followers 的步驟如下:
?????? 1. commit order thread 首先進(jìn)入 flush stage,并成為 commit order leader。
???????2.?commit?order?leader?嘗試獲取?stage?mutex,這可能會需要一些時間,比如?mutex?已經(jīng)被上一個
?????? commit group的leader獲取。
?????? 3. 在此期間,一個 binlog 線程進(jìn)入了 flush stage。它需要等待來自 commit order leader 的信號。
???????4.?commit?order?leader?獲取了?stage?mutex,然后它會檢查是否有??binlog?thread進(jìn)入了?flush?stage,
???????如果發(fā)現(xiàn)了就轉(zhuǎn)變 leader。
???????5.?commit?order?leader?給??binlog?leader發(fā)送一個信號,并成為?follower,等待?commit?的完成
???????(和其他 follower 的行為一致)。
?????? 6. binlog leader 被 commit order leader 的信號喚醒并執(zhí)行 group commit。
?????*/
?????CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_binlog_leader_wait");
?????while?(thd->tx_commit_pending)
???????mysql_cond_wait(&m_stage_cond_leader,
???????????????????????&m_queue_lock[BINLOG_FLUSH_STAGE]);
???}
?}
?unlock_queue(stage);
?/*
???通知下一個組提交事務(wù)進(jìn)入隊列
?*/
?if?(stage?==?BINLOG_FLUSH_STAGE)?{
???Commit_order_manager::finish_one(thd);
???CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("after_binlog_leader_wait");
?}?else?if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{
???Commit_order_manager::finish_one(thd);
?}
?/*
???當(dāng)進(jìn)入第一個?stage?時,可以不用獲取?stage?mutex
?*/
?if?(stage_mutex?&&?need_unlock_stage_mutex)?mysql_mutex_unlock(stage_mutex);
?/*
???如果隊列非空,當(dāng)前線程作為?follower?等待?leader?處理隊列
?*/
?if?(!leader)?{
???CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_follower_wait");
???mysql_mutex_lock(&m_lock_done);
#ifndef?NDEBUG
???thd->get_transaction()->m_flags.ready_preempt?=?true;
???if?(leader_await_preempt_status)?mysql_cond_signal(&m_cond_preempt);
#endif
???// tx_commit_pending:還有事務(wù) commit 操作未完成
???while?(thd->tx_commit_pending)?{
?????if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{
???????mysql_cond_wait(&m_stage_cond_commit_order,?&m_lock_done);
?????}?else?{
???????//?follower?線程在此處掛起,等待?leader?提交事務(wù)完成后被喚醒
???????mysql_cond_wait(&m_stage_cond_binlog,?&m_lock_done);
?????}
???}
???mysql_mutex_unlock(&m_lock_done);
???return?false;
?}
#ifndef?NDEBUG
?if?(stage?==?Commit_stage_manager::SYNC_STAGE)
???DEBUG_SYNC(thd,?"bgc_between_flush_and_sync");
#endif
?bool?need_lock_enter_mutex?=?false;
?if?(leader?&&?enter_mutex?!=?nullptr)?{
???/*
?????如果由于在輪替 Binlog 時已經(jīng)獲取了 LOCK_log,就不在需要獲取 enter_mutex。
???*/
???need_lock_enter_mutex?=?!(mysql_bin_log.is_rotating_caused_by_incident?&&
?????????????????????????????enter_mutex?==?mysql_bin_log.get_log_lock());
???if?(need_lock_enter_mutex)
?????mysql_mutex_lock(enter_mutex);
???else
?????mysql_mutex_assert_owner(enter_mutex);
?}
?//?leader?轉(zhuǎn)換的邏輯
?if?(stage?==?COMMIT_ORDER_FLUSH_STAGE)?{
???CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP(
???????"after_commit_order_thread_becomes_leader");
???lock_queue(stage);
???if?(!m_queue[BINLOG_FLUSH_STAGE].is_empty())?{
?????if?(need_lock_enter_mutex)?mysql_mutex_unlock(enter_mutex);
?????THD?*binlog_leader?=?m_queue[BINLOG_FLUSH_STAGE].get_leader();
?????binlog_leader->tx_commit_pending?=?false;
?????mysql_cond_signal(&m_stage_cond_leader);
?????unlock_queue(stage);
?????mysql_mutex_lock(&m_lock_done);
?????/*?wait?for?signal?from?binlog?leader?*/
?????CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP(
?????????"before_commit_order_leader_waits_for_binlog_leader");
?????while?(thd->tx_commit_pending)
???????mysql_cond_wait(&m_stage_cond_commit_order,?&m_lock_done);
?????mysql_mutex_unlock(&m_lock_done);
?????leader?=?false;
?????return?leader;
???}
?}
?return?leader;

以上就是MySQL的commit是如何commit源碼解析的詳細(xì)內(nèi)容,更多關(guān)于MySQL commit 的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論