欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PostgreSQL事務回卷實戰(zhàn)案例詳析

 更新時間:2022年03月25日 09:43:18   作者:淵渱  
前段時間在公司小范圍做了一個關于PG事務實現(xiàn)的講座,最后總結了一個摘要性的東西,分享一下,這篇文章主要給大家介紹了關于PostgreSQL事務回卷實戰(zhàn)案例的相關資料,需要的朋友可以參考下

背景

前陣子某個客戶反饋他的RDS PostgreSQL無法寫入,報錯信息如下:

postgres=# select * from test;
 id 
----
(0 rows)

postgres=# insert into test select 1;
ERROR:  database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT:  Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.

隨后RDS工程師介入處理以后,該問題立馬得到了解決。

XID基礎原理

XID 定義

XID(Transaction ID)是 PostgreSQL 內部的事務編號,每個事務都會分配一個XID,依次遞增。PostgreSQL 數據中每個元組頭部都會保存著 插入 或者 刪除 這條元組的XID(Transaction ID),然后內核通過這個 XID 構造數據庫的一致性讀。在事務隔離級別是 可重復讀 的情況下,假設如有兩個事務,xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元組,看不到 t_xmin > 200 的元組。

typedef uint32 TransactionId;  /* 事務號定義,32位無符號整數 */
typedef struct HeapTupleFields
{
  TransactionId t_xmin;    /* 插入該元組的事務號 */
  TransactionId t_xmax;    /* 刪除或鎖定該元組的事務號 */

    /*** 其它屬性省略 ***/
} HeapTupleFields;

struct HeapTupleHeaderData
{
  union
  {
    HeapTupleFields t_heap;
    DatumTupleFields t_datum;
  }      t_choice;

    /*** 其它屬性省略 ***/
};

XID 發(fā)行機制

從上面結構中我們可以看到,XID 是一個32位無符號整數,也就是 XID 的范圍是 0到2^32-1;那么超過了 2^32-1的事務怎么辦呢?其實 XID 是一個環(huán),超過了 2^32-1 之后又會從頭開始分配。通過源代碼也證明了上述結論:

// 無效事務號
#define InvalidTransactionId    ((TransactionId) 0)
// 引導事務號,在數據庫初始化過程(BKI執(zhí)行)中使用
#define BootstrapTransactionId    ((TransactionId) 1)
// 凍結事務號用于表示非常陳舊的元組,它們比所有正常事務號都要早(也就是可見)
#define FrozenTransactionId      ((TransactionId) 2)
// 第一個正常事務號
#define FirstNormalTransactionId  ((TransactionId) 3)
// 把 FullTransactionId 的低32位作為無符號整數生成 xid
#define XidFromFullTransactionId(x)    ((uint32) (x).value)

static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
  dest->value++;
  while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
    dest->value++;
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
    /*** 省略 ***/
  FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
    /*** 省略 ***
  return full_xid;
}

static void
AssignTransactionId(TransactionState s)
{
    /*** 省略 ***/
  s->fullTransactionId = GetNewTransactionId(isSubXact);
  if (!isSubXact)
    XactTopFullTransactionId = s->fullTransactionId;
    /*** 省略 ***/
}

TransactionId
GetTopTransactionId(void)
{
  if (!FullTransactionIdIsValid(XactTopFullTransactionId))
    AssignTransactionId(&TopTransactionStateData);
  return XidFromFullTransactionId(XactTopFullTransactionId);
}

可以看到,新事務號保存在共享變量緩存中:ShmemVariableCache->nextFullXid,每發(fā)行一個事務號后,向上調整它的值,并跳過上述三個特殊值。三個特殊仠分別為0、1和2,作用可以看上面代碼注釋。

XID 回卷機制

前面說到,XID 是一個環(huán),分配到 2^32-1 之后又從 3 開始,那么內核是怎么比較兩個事務的大小的呢?比如 xid 經歷了這樣一個過程 3-> 2^32-1 -> 5,那么內核怎么樣知道 5 這個事務在 2^32-1 后面呢?我們再看一下代碼:

/*
 * TransactionIdPrecedes --- is id1 logically < id2?
 */
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
  /*
   * If either ID is a permanent XID then we can just do unsigned
   * comparison.  If both are normal, do a modulo-2^32 comparison.
   */
  int32    diff;

  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 < id2);

  diff = (int32) (id1 - id2);
  return (diff < 0);
}

可以看到,內核使用了一個比較取巧的方法:(int32) (id1 - id2) < 0,32位有符號整數的取值范圍是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以轉換成 int32 會變成負數。但是這里面有一個問題,「最新事務號-最老事務號」 必須小于 2^31,一旦大于就會出現(xiàn)回卷,導致老事務產生的數據對新事務不可見。

XID 回卷預防

前面講到,「最新事務號-最老事務號」 必須小于 2^31,否則會發(fā)生回卷導致老事務產生的數據對新事務不可見,那內核是怎么避免這個問題的呢?內核是這樣處理的:通過定期把老事務產生的元組的 XID 更新為 FrozenTransactionId,即更新為2,來回收 XID,而 XID 為2 的元組對所有的事務可見,這個過程稱為 XID 凍結,通過這個方式可以回收 XID 來保證 |最新事務號-最老事務號| < 2^31。
除了內核自動凍結回收XID,我們也可以通過命令或者 sql 的方式手動進行 xid 凍結回收

  • 查詢數據庫或表的年齡,數據庫年齡指的是:「最新事務號-數據庫中最老事務號」,表年齡指的是:「最新事務號-表中最老事務號」
# 查看每個庫的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;

# 1個庫每個表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 

# 查看1個表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;

手動凍結回收一張表的元組的 xid 的sql:

vacuum freeze 表名;

手動凍結回收一個庫里面的所有表 xid 的命令:

vacuumdb -d 庫名 --freeze --jobs=30 -h 連接串 -p 端口號 -U 庫Owner

凍結回收過程是一個重 IO 的操作,這個過程內核會描述表的所有頁面,然后把符合要求的元組的 t_xmin 字段更新為 2,所以這個過程需要在業(yè)務低峰進行,避免影響業(yè)務。

與凍結回收相關的內核參數有三個:vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于筆者對于這三個參數理解不深,就不在這里班門弄斧了,感興趣的同學可以自行找資料了解一下。

解決方案

問題分析

基于上面的原理分析,我們知道,「最新事務號-最老事務號」 =  2^31-1000000,即當前可用的 xid 僅剩下一百萬的時候,內核就會禁止實例寫入并報錯:database is not accepting commands to avoid wraparound data loss in database, 這個時候必須連到提示中的 "xxxx" 對表進行 freeze 回收更多的 XID。

void
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
  TransactionId xidVacLimit;
  TransactionId xidWarnLimit;
  TransactionId xidStopLimit;
  TransactionId xidWrapLimit;
  TransactionId curXid;

  Assert(TransactionIdIsNormal(oldest_datfrozenxid));

  /*
     * xidWrapLimit = 最老的事務號 + 0x7FFFFFFF,當前事務號一旦到達xidWrapLimit將發(fā)生回卷
   */
  xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  if (xidWrapLimit < FirstNormalTransactionId)
    xidWrapLimit += FirstNormalTransactionId;

  /*
     * 一旦當前事務號到達xidStopLimit,實例將不可寫入,保留 1000000 的xid用于vacuum
     * 每 vacuum 一張表需要占用一個xid
   */
  xidStopLimit = xidWrapLimit - 1000000;
  if (xidStopLimit < FirstNormalTransactionId)
    xidStopLimit -= FirstNormalTransactionId;

  /*
     * 一旦當前事務號到達xidWarnLimit,將不停地收到
     * WARNING:  database "xxxx" must be vacuumed within 2740112 transactions
   */
  xidWarnLimit = xidStopLimit - 10000000;
  if (xidWarnLimit < FirstNormalTransactionId)
    xidWarnLimit -= FirstNormalTransactionId;

  /*
     * 一旦當前事務號到達xidVacLimit將觸發(fā)force autovacuums
   */
  xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
  if (xidVacLimit < FirstNormalTransactionId)
    xidVacLimit += FirstNormalTransactionId;

  /* Grab lock for just long enough to set the new limit values */
  LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
  ShmemVariableCache->oldestXid = oldest_datfrozenxid;
  ShmemVariableCache->xidVacLimit = xidVacLimit;
  ShmemVariableCache->xidWarnLimit = xidWarnLimit;
  ShmemVariableCache->xidStopLimit = xidStopLimit;
  ShmemVariableCache->xidWrapLimit = xidWrapLimit;
  ShmemVariableCache->oldestXidDB = oldest_datoid;
  curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
  LWLockRelease(XidGenLock);

  /* Log the info */
  ereport(DEBUG1,
      (errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
          xidWrapLimit, oldest_datoid)));

  /*
     * 如果 當前事務號>=最老事務號+autovacuum_freeze_max_age
     * 觸發(fā) autovacuum 對年齡最老的數據庫進行清理,如果有多個數據庫達到要求,按年齡最老的順序依次清理
   * 通過設置標志位標記當前 autovacuum 結束之后再來一次 autovacuum
     */
  if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
    IsUnderPostmaster && !InRecovery)
    SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

  /* Give an immediate warning if past the wrap warn point */
  if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
  {
    char     *oldest_datname;

    if (IsTransactionState())
      oldest_datname = get_database_name(oldest_datoid);
    else
      oldest_datname = NULL;

    if (oldest_datname)
      ereport(WARNING,
          (errmsg("database \"%s\" must be vacuumed within %u transactions",
              oldest_datname,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
    else
      ereport(WARNING,
          (errmsg("database with OID %u must be vacuumed within %u transactions",
              oldest_datoid,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
  }
}

bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 >= id2);

  diff = (int32) (id1 - id2);
  return (diff >= 0);
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);

  if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
  {
    TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
    TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
    TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
    Oid      oldest_datoid = ShmemVariableCache->oldestXidDB;

        /*** 省略 ***/
    if (IsUnderPostmaster &&
      TransactionIdFollowsOrEquals(xid, xidStopLimit))
    {
      char     *oldest_datname = get_database_name(oldest_datoid);

      /* complain even if that DB has disappeared */
      if (oldest_datname)
        ereport(ERROR,
            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
             errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
                oldest_datname),
             errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
            /*** 省略 ***/
    }
        /*** 省略 ***/
  }
    /*** 省略 ***/
}

問題定位

# 查看每個庫的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;

# 1個庫每個表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 

# 查看1個表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;

問題解決

  1. 通過上面的第一個 sql,查找年齡最大的數據庫,數據庫年齡指的是:|最新事務號-數據庫中最老事務號|
  2. 通過上面第二個 sql,查找年齡最大的表,然后對表依次執(zhí)行:vacuum freeze 表名,把表中的老事務號凍結回收,表年齡指的是:|最新事務號-表中最老事務號|
  3. 運維腳本

單進程 Shell 腳本

# 對指定數據庫中年齡最大的前 50 張表進行 vacuum freeze

for cmd in `psql -U用戶名 -p端口號 -h連接串 -d數據庫名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd  | grep -v row | grep vacuum`; do
    psql -U用戶名 -p端口號 -h連接串 -d數據庫名 -c "$cmd"
done

多進程 Python 腳本

from multiprocessing import Pool
import psycopg2

args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='數據庫名',
            user='用戶名', password='密碼')

def vacuum_handler(sql):
    sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        cur = conn.cursor()
        cur.execute(sql_str)
        print cur.fetchall()
        conn.close()
    except Exception as e:
        print str(e)

# 對指定數據庫中年齡最大的前 1000 張表進行 vacuum freeze,32 個進程并發(fā)執(zhí)行
def multi_vacuum():
    pool = Pool(processes=32)
    sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql_str)
        rows = cur.fetchall()
        for row in rows:
            cmd = row['vacuum_cmd']
            pool.apply_async(vacuum_handler, (cmd, ))
        conn.close()
        pool.close()
        pool.join()
    except Exception as e:
        print str(e)


multi_vacuum()

友情提示

vacuum freeze 會掃描表的所有頁面并更新,是一個重 IO 的操作,操作過程中一定要控制好并發(fā)數,否則非常容易把實例打掛。

作者信息

謝桂起(花名:淵渱) 2020年畢業(yè)后加入阿里云,一直從事RDS PostgreSQL相關工作,善于解決線上各類RDS PostgreSQL運維管控相關問題。

總結

到此這篇關于PostgreSQL事務回卷的文章就介紹到這了,更多相關PostgreSQL事務回卷內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

最新評論