欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PostgreSQL事務(wù)回卷實(shí)戰(zhàn)案例詳析

 更新時(shí)間:2022年03月25日 09:43:18   作者:淵渱  
前段時(shí)間在公司小范圍做了一個(gè)關(guān)于PG事務(wù)實(shí)現(xiàn)的講座,最后總結(jié)了一個(gè)摘要性的東西,分享一下,這篇文章主要給大家介紹了關(guān)于PostgreSQL事務(wù)回卷實(shí)戰(zhàn)案例的相關(guān)資料,需要的朋友可以參考下

背景

前陣子某個(gè)客戶反饋他的RDS PostgreSQL無(wú)法寫(xiě)入,報(bào)錯(cuò)信息如下:

postgres=# select * from test;
 id 
----
(0 rows)

postgres=# insert into test select 1;
ERROR:  database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT:  Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.

隨后RDS工程師介入處理以后,該問(wèn)題立馬得到了解決。

XID基礎(chǔ)原理

XID 定義

XID(Transaction ID)是 PostgreSQL 內(nèi)部的事務(wù)編號(hào),每個(gè)事務(wù)都會(huì)分配一個(gè)XID,依次遞增。PostgreSQL 數(shù)據(jù)中每個(gè)元組頭部都會(huì)保存著 插入 或者 刪除 這條元組的XID(Transaction ID),然后內(nèi)核通過(guò)這個(gè) XID 構(gòu)造數(shù)據(jù)庫(kù)的一致性讀。在事務(wù)隔離級(jí)別是 可重復(fù)讀 的情況下,假設(shè)如有兩個(gè)事務(wù),xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元組,看不到 t_xmin > 200 的元組。

typedef uint32 TransactionId;  /* 事務(wù)號(hào)定義,32位無(wú)符號(hào)整數(shù) */
typedef struct HeapTupleFields
{
  TransactionId t_xmin;    /* 插入該元組的事務(wù)號(hào) */
  TransactionId t_xmax;    /* 刪除或鎖定該元組的事務(wù)號(hào) */

    /*** 其它屬性省略 ***/
} HeapTupleFields;

struct HeapTupleHeaderData
{
  union
  {
    HeapTupleFields t_heap;
    DatumTupleFields t_datum;
  }      t_choice;

    /*** 其它屬性省略 ***/
};

XID 發(fā)行機(jī)制

從上面結(jié)構(gòu)中我們可以看到,XID 是一個(gè)32位無(wú)符號(hào)整數(shù),也就是 XID 的范圍是 0到2^32-1;那么超過(guò)了 2^32-1的事務(wù)怎么辦呢?其實(shí) XID 是一個(gè)環(huán),超過(guò)了 2^32-1 之后又會(huì)從頭開(kāi)始分配。通過(guò)源代碼也證明了上述結(jié)論:

// 無(wú)效事務(wù)號(hào)
#define InvalidTransactionId    ((TransactionId) 0)
// 引導(dǎo)事務(wù)號(hào),在數(shù)據(jù)庫(kù)初始化過(guò)程(BKI執(zhí)行)中使用
#define BootstrapTransactionId    ((TransactionId) 1)
// 凍結(jié)事務(wù)號(hào)用于表示非常陳舊的元組,它們比所有正常事務(wù)號(hào)都要早(也就是可見(jiàn))
#define FrozenTransactionId      ((TransactionId) 2)
// 第一個(gè)正常事務(wù)號(hào)
#define FirstNormalTransactionId  ((TransactionId) 3)
// 把 FullTransactionId 的低32位作為無(wú)符號(hào)整數(shù)生成 xid
#define XidFromFullTransactionId(x)    ((uint32) (x).value)

static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
  dest->value++;
  while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
    dest->value++;
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
    /*** 省略 ***/
  FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
    /*** 省略 ***
  return full_xid;
}

static void
AssignTransactionId(TransactionState s)
{
    /*** 省略 ***/
  s->fullTransactionId = GetNewTransactionId(isSubXact);
  if (!isSubXact)
    XactTopFullTransactionId = s->fullTransactionId;
    /*** 省略 ***/
}

TransactionId
GetTopTransactionId(void)
{
  if (!FullTransactionIdIsValid(XactTopFullTransactionId))
    AssignTransactionId(&TopTransactionStateData);
  return XidFromFullTransactionId(XactTopFullTransactionId);
}

可以看到,新事務(wù)號(hào)保存在共享變量緩存中:ShmemVariableCache->nextFullXid,每發(fā)行一個(gè)事務(wù)號(hào)后,向上調(diào)整它的值,并跳過(guò)上述三個(gè)特殊值。三個(gè)特殊仠分別為0、1和2,作用可以看上面代碼注釋。

XID 回卷機(jī)制

前面說(shuō)到,XID 是一個(gè)環(huán),分配到 2^32-1 之后又從 3 開(kāi)始,那么內(nèi)核是怎么比較兩個(gè)事務(wù)的大小的呢?比如 xid 經(jīng)歷了這樣一個(gè)過(guò)程 3-> 2^32-1 -> 5,那么內(nèi)核怎么樣知道 5 這個(gè)事務(wù)在 2^32-1 后面呢?我們?cè)倏匆幌麓a:

/*
 * TransactionIdPrecedes --- is id1 logically < id2?
 */
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
  /*
   * If either ID is a permanent XID then we can just do unsigned
   * comparison.  If both are normal, do a modulo-2^32 comparison.
   */
  int32    diff;

  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 < id2);

  diff = (int32) (id1 - id2);
  return (diff < 0);
}

可以看到,內(nèi)核使用了一個(gè)比較取巧的方法:(int32) (id1 - id2) < 0,32位有符號(hào)整數(shù)的取值范圍是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以轉(zhuǎn)換成 int32 會(huì)變成負(fù)數(shù)。但是這里面有一個(gè)問(wèn)題,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,一旦大于就會(huì)出現(xiàn)回卷,導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見(jiàn)。

XID 回卷預(yù)防

前面講到,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,否則會(huì)發(fā)生回卷導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見(jiàn),那內(nèi)核是怎么避免這個(gè)問(wèn)題的呢??jī)?nèi)核是這樣處理的:通過(guò)定期把老事務(wù)產(chǎn)生的元組的 XID 更新為 FrozenTransactionId,即更新為2,來(lái)回收 XID,而 XID 為2 的元組對(duì)所有的事務(wù)可見(jiàn),這個(gè)過(guò)程稱(chēng)為 XID 凍結(jié),通過(guò)這個(gè)方式可以回收 XID 來(lái)保證 |最新事務(wù)號(hào)-最老事務(wù)號(hào)| < 2^31。
除了內(nèi)核自動(dòng)凍結(jié)回收XID,我們也可以通過(guò)命令或者 sql 的方式手動(dòng)進(jìn)行 xid 凍結(jié)回收

  • 查詢數(shù)據(jù)庫(kù)或表的年齡,數(shù)據(jù)庫(kù)年齡指的是:「最新事務(wù)號(hào)-數(shù)據(jù)庫(kù)中最老事務(wù)號(hào)」,表年齡指的是:「最新事務(wù)號(hào)-表中最老事務(wù)號(hào)」
# 查看每個(gè)庫(kù)的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;

# 1個(gè)庫(kù)每個(gè)表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 

# 查看1個(gè)表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱(chēng).表名稱(chēng)'::regclass::oid;

手動(dòng)凍結(jié)回收一張表的元組的 xid 的sql:

vacuum freeze 表名;

手動(dòng)凍結(jié)回收一個(gè)庫(kù)里面的所有表 xid 的命令:

vacuumdb -d 庫(kù)名 --freeze --jobs=30 -h 連接串 -p 端口號(hào) -U 庫(kù)Owner

凍結(jié)回收過(guò)程是一個(gè)重 IO 的操作,這個(gè)過(guò)程內(nèi)核會(huì)描述表的所有頁(yè)面,然后把符合要求的元組的 t_xmin 字段更新為 2,所以這個(gè)過(guò)程需要在業(yè)務(wù)低峰進(jìn)行,避免影響業(yè)務(wù)。

與凍結(jié)回收相關(guān)的內(nèi)核參數(shù)有三個(gè):vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于筆者對(duì)于這三個(gè)參數(shù)理解不深,就不在這里班門(mén)弄斧了,感興趣的同學(xué)可以自行找資料了解一下。

解決方案

問(wèn)題分析

基于上面的原理分析,我們知道,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 =  2^31-1000000,即當(dāng)前可用的 xid 僅剩下一百萬(wàn)的時(shí)候,內(nèi)核就會(huì)禁止實(shí)例寫(xiě)入并報(bào)錯(cuò):database is not accepting commands to avoid wraparound data loss in database, 這個(gè)時(shí)候必須連到提示中的 "xxxx" 對(duì)表進(jìn)行 freeze 回收更多的 XID。

void
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
  TransactionId xidVacLimit;
  TransactionId xidWarnLimit;
  TransactionId xidStopLimit;
  TransactionId xidWrapLimit;
  TransactionId curXid;

  Assert(TransactionIdIsNormal(oldest_datfrozenxid));

  /*
     * xidWrapLimit = 最老的事務(wù)號(hào) + 0x7FFFFFFF,當(dāng)前事務(wù)號(hào)一旦到達(dá)xidWrapLimit將發(fā)生回卷
   */
  xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  if (xidWrapLimit < FirstNormalTransactionId)
    xidWrapLimit += FirstNormalTransactionId;

  /*
     * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidStopLimit,實(shí)例將不可寫(xiě)入,保留 1000000 的xid用于vacuum
     * 每 vacuum 一張表需要占用一個(gè)xid
   */
  xidStopLimit = xidWrapLimit - 1000000;
  if (xidStopLimit < FirstNormalTransactionId)
    xidStopLimit -= FirstNormalTransactionId;

  /*
     * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidWarnLimit,將不停地收到
     * WARNING:  database "xxxx" must be vacuumed within 2740112 transactions
   */
  xidWarnLimit = xidStopLimit - 10000000;
  if (xidWarnLimit < FirstNormalTransactionId)
    xidWarnLimit -= FirstNormalTransactionId;

  /*
     * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidVacLimit將觸發(fā)force autovacuums
   */
  xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
  if (xidVacLimit < FirstNormalTransactionId)
    xidVacLimit += FirstNormalTransactionId;

  /* Grab lock for just long enough to set the new limit values */
  LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
  ShmemVariableCache->oldestXid = oldest_datfrozenxid;
  ShmemVariableCache->xidVacLimit = xidVacLimit;
  ShmemVariableCache->xidWarnLimit = xidWarnLimit;
  ShmemVariableCache->xidStopLimit = xidStopLimit;
  ShmemVariableCache->xidWrapLimit = xidWrapLimit;
  ShmemVariableCache->oldestXidDB = oldest_datoid;
  curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
  LWLockRelease(XidGenLock);

  /* Log the info */
  ereport(DEBUG1,
      (errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
          xidWrapLimit, oldest_datoid)));

  /*
     * 如果 當(dāng)前事務(wù)號(hào)>=最老事務(wù)號(hào)+autovacuum_freeze_max_age
     * 觸發(fā) autovacuum 對(duì)年齡最老的數(shù)據(jù)庫(kù)進(jìn)行清理,如果有多個(gè)數(shù)據(jù)庫(kù)達(dá)到要求,按年齡最老的順序依次清理
   * 通過(guò)設(shè)置標(biāo)志位標(biāo)記當(dāng)前 autovacuum 結(jié)束之后再來(lái)一次 autovacuum
     */
  if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
    IsUnderPostmaster && !InRecovery)
    SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

  /* Give an immediate warning if past the wrap warn point */
  if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
  {
    char     *oldest_datname;

    if (IsTransactionState())
      oldest_datname = get_database_name(oldest_datoid);
    else
      oldest_datname = NULL;

    if (oldest_datname)
      ereport(WARNING,
          (errmsg("database \"%s\" must be vacuumed within %u transactions",
              oldest_datname,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
    else
      ereport(WARNING,
          (errmsg("database with OID %u must be vacuumed within %u transactions",
              oldest_datoid,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
  }
}

bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 >= id2);

  diff = (int32) (id1 - id2);
  return (diff >= 0);
}

FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);

  if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
  {
    TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
    TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
    TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
    Oid      oldest_datoid = ShmemVariableCache->oldestXidDB;

        /*** 省略 ***/
    if (IsUnderPostmaster &&
      TransactionIdFollowsOrEquals(xid, xidStopLimit))
    {
      char     *oldest_datname = get_database_name(oldest_datoid);

      /* complain even if that DB has disappeared */
      if (oldest_datname)
        ereport(ERROR,
            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
             errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
                oldest_datname),
             errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
            /*** 省略 ***/
    }
        /*** 省略 ***/
  }
    /*** 省略 ***/
}

問(wèn)題定位

# 查看每個(gè)庫(kù)的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;

# 1個(gè)庫(kù)每個(gè)表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 

# 查看1個(gè)表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱(chēng).表名稱(chēng)'::regclass::oid;

問(wèn)題解決

  1. 通過(guò)上面的第一個(gè) sql,查找年齡最大的數(shù)據(jù)庫(kù),數(shù)據(jù)庫(kù)年齡指的是:|最新事務(wù)號(hào)-數(shù)據(jù)庫(kù)中最老事務(wù)號(hào)|
  2. 通過(guò)上面第二個(gè) sql,查找年齡最大的表,然后對(duì)表依次執(zhí)行:vacuum freeze 表名,把表中的老事務(wù)號(hào)凍結(jié)回收,表年齡指的是:|最新事務(wù)號(hào)-表中最老事務(wù)號(hào)|
  3. 運(yùn)維腳本

單進(jìn)程 Shell 腳本

# 對(duì)指定數(shù)據(jù)庫(kù)中年齡最大的前 50 張表進(jìn)行 vacuum freeze

for cmd in `psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫(kù)名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd  | grep -v row | grep vacuum`; do
    psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫(kù)名 -c "$cmd"
done

多進(jìn)程 Python 腳本

from multiprocessing import Pool
import psycopg2

args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='數(shù)據(jù)庫(kù)名',
            user='用戶名', password='密碼')

def vacuum_handler(sql):
    sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        cur = conn.cursor()
        cur.execute(sql_str)
        print cur.fetchall()
        conn.close()
    except Exception as e:
        print str(e)

# 對(duì)指定數(shù)據(jù)庫(kù)中年齡最大的前 1000 張表進(jìn)行 vacuum freeze,32 個(gè)進(jìn)程并發(fā)執(zhí)行
def multi_vacuum():
    pool = Pool(processes=32)
    sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql_str)
        rows = cur.fetchall()
        for row in rows:
            cmd = row['vacuum_cmd']
            pool.apply_async(vacuum_handler, (cmd, ))
        conn.close()
        pool.close()
        pool.join()
    except Exception as e:
        print str(e)


multi_vacuum()

友情提示

vacuum freeze 會(huì)掃描表的所有頁(yè)面并更新,是一個(gè)重 IO 的操作,操作過(guò)程中一定要控制好并發(fā)數(shù),否則非常容易把實(shí)例打掛。

作者信息

謝桂起(花名:淵渱) 2020年畢業(yè)后加入阿里云,一直從事RDS PostgreSQL相關(guān)工作,善于解決線上各類(lèi)RDS PostgreSQL運(yùn)維管控相關(guān)問(wèn)題。

總結(jié)

到此這篇關(guān)于PostgreSQL事務(wù)回卷的文章就介紹到這了,更多相關(guān)PostgreSQL事務(wù)回卷內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論