Postgresql源碼分析returns?setof函數(shù)oracle管道pipelined
引言
【功能】
- Oracle的return pipelined管道函數(shù)可以使一次返回的集合類(lèi)型,變?yōu)?逐條返回pipe row(集合中的一條)給SQL層,大大減少內(nèi)存的使用。
- Postgresql的return setof函數(shù)并不能起到降低內(nèi)存使用的效果,return next 單條數(shù)據(jù)只起到了緩存的效果,并不會(huì)把數(shù)據(jù)逐條返回SQL層處理,沒(méi)有降低內(nèi)存的效果。
【代碼】
- exec_stmt_return_next中的tupledesc從執(zhí)行計(jì)劃node中取出,返回值需要滿足desc要求,緩存值也會(huì)按該desc保存。
- return next對(duì)rec類(lèi)型和row類(lèi)型處理的區(qū)別
- rec類(lèi)型本質(zhì)上就是tuple,數(shù)據(jù)和desc都以擴(kuò)展形式存放在erh中。如果需要轉(zhuǎn)換為tuple,有幾個(gè)標(biāo)準(zhǔn)函數(shù)提供轉(zhuǎn)換功能,且支持類(lèi)型轉(zhuǎn)換?!巨D(zhuǎn)換后調(diào)用tuplestore的標(biāo)準(zhǔn)接口緩存tuple】
- row類(lèi)型本質(zhì)上是一個(gè)虛擬行(由一組datum位置組成),row->varnos[i]指向某一個(gè)datum,如果想把row轉(zhuǎn)換為tuple,需要用exec_eval_datum算出varnos指向的datum的值,然后組裝成values和nulls數(shù)組,用heap_form_tuple構(gòu)造。注意這種轉(zhuǎn)換過(guò)程不會(huì)有類(lèi)型轉(zhuǎn)換,如果需要的desc和算出來(lái)的列類(lèi)型對(duì)不上,返回空。成功【轉(zhuǎn)換后調(diào)用tuplestore的標(biāo)準(zhǔn)接口緩存tuple】
- return next對(duì)var類(lèi)型的處理:var看做單列tuple,按執(zhí)行計(jì)劃給的desc轉(zhuǎn)換類(lèi)型后構(gòu)造tuple?!巨D(zhuǎn)換后調(diào)用tuplestore的標(biāo)準(zhǔn)接口緩存tuple】
【實(shí)用函數(shù)】
- 通用
- 類(lèi)型轉(zhuǎn)換:exec_cast_value(傳入的值不能是eoh真實(shí)的頭,使用前需要轉(zhuǎn)成eoh存的1be頭,1be指向真實(shí)頭)
- 數(shù)組拼接minimaltuple:heap_form_minimal_tuple
- 有一個(gè)tuple和desc轉(zhuǎn)換為另一個(gè)desc的tuple:convert_tuples_by_position、execute_attr_map_tuple
- tuplestore:
- 用values數(shù)組存tuple(用tuplestore_puttuple_common拼好后傳tuple):tuplestore_putvalues
- 用HeapTuple存tuple(直接傳tuple):tuplestore_puttuple
- 類(lèi)型
- 根據(jù)類(lèi)型id和mod找出desc:lookup_rowtype_tupdesc
- erh
- 從erh擴(kuò)展類(lèi)型拿到緊湊tuple:expanded_record_get_tuple
1 :管道函數(shù)是什么,應(yīng)用于什么場(chǎng)景
oracle支持pipelined函數(shù),可以在函數(shù)定義時(shí)指定RETURN 集合類(lèi)型 PIPELINED 來(lái)說(shuō)明當(dāng)前函數(shù)是管道函數(shù)。
管道函數(shù)最大的作用就是可以使一次返回的集合類(lèi)型,變?yōu)?逐條返回,大大減少內(nèi)存的使用。
例如:嵌套表類(lèi)型outrecset是函數(shù)f_trans的返回值,普通函數(shù)只能組裝好嵌套表outrecset(全部緩存在內(nèi)存),一次性返回。如果嵌套表內(nèi)容較多,可能會(huì)占用較大的內(nèi)存空間。
如果使用管道函數(shù),可以通過(guò)pipe row(嵌套表中的一行)來(lái)代替return
語(yǔ)句,函數(shù)把嵌套表逐行返回給上層處理,無(wú)需緩存,降低內(nèi)存使用。
ORACLE實(shí)例:
CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS TYPE refcur_t IS REF CURSOR RETURN employees%ROWTYPE; TYPE outrec_typ IS RECORD ( var_num NUMBER(6), var_char1 VARCHAR2(30), var_char2 VARCHAR2(30) ); TYPE outrecset IS TABLE OF outrec_typ; FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED; END refcur_pkg; / CREATE OR REPLACE PACKAGE BODY refcur_pkg IS FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED IS out_rec outrec_typ; in_rec p%ROWTYPE; BEGIN LOOP FETCH p INTO in_rec; -- input row EXIT WHEN p%NOTFOUND; out_rec.var_num := in_rec.employee_id; out_rec.var_char1 := in_rec.first_name; out_rec.var_char2 := in_rec.last_name; PIPE ROW(out_rec); -- first transformed output row out_rec.var_char1 := in_rec.email; out_rec.var_char2 := in_rec.phone_number; PIPE ROW(out_rec); -- second transformed output row END LOOP; CLOSE p; RETURN; END f_trans; END refcur_pkg; / SELECT * FROM TABLE ( refcur_pkg.f_trans ( CURSOR (SELECT * FROM employees WHERE department_id = 60) ) );
在PG中,普通的return語(yǔ)句也是需要一次性返回?cái)?shù)據(jù),但PG應(yīng)該是參考ORACLE實(shí)現(xiàn)了return next的功能,也希望逐條返回?cái)?shù)據(jù)(PG沒(méi)有集合類(lèi)型,已普通類(lèi)型為例):
drop function f1; create or replace function f1(in i int, out j int) returns setof int as $$ begin j := i+1; return next; j := i+2; return next; return; end$$ language plpgsql; select * from f1(42); j ---- 43 44
但在內(nèi)核實(shí)現(xiàn)中,并不是逐條返回的,return next其實(shí)只起到了緩存數(shù)據(jù)的功能,總的數(shù)據(jù)集也是一次性返回SQL層的,和直接return沒(méi)有區(qū)別(只有語(yǔ)法上的區(qū)別)。
所以PG的return setof函數(shù)并不能起到降低內(nèi)存使用的效果。下面來(lái)分析具體過(guò)程。
2 return next實(shí)現(xiàn)
return next目前支持三類(lèi)數(shù)據(jù)的返回,var、rec、rows return next也可以不加參數(shù),返回值按out參數(shù)列表拼接
具體處理函數(shù):exec_stmt_return_next
static int exec_stmt_return_next(PLpgSQL_execstate *estate, PLpgSQL_stmt_return_next *stmt) { TupleDesc tupdesc; int natts; HeapTuple tuple; MemoryContext oldcontext;
1 初始化tuple store
初始化總結(jié):
1 初始化的過(guò)程就是在構(gòu)造Tuplestorestate,主要?jiǎng)幼鳎?/p>
- 給Tuplestorestate新的內(nèi)存上下文ExecutorState
- 記錄不能隨機(jī)訪問(wèn):eflags = EXEC_FLAG_REWIND
- 記錄三個(gè)操作函數(shù):copytup_heap、writetup_heap、readtup_heap
2 給estate->tuple_store_desc添加desc,desc來(lái)源:
- 從執(zhí)行計(jì)劃節(jié)點(diǎn)中node(Tuplestorestate)拿到后,傳入ExecMakeTableFunctionResult
- ExecMakeTableFunctionResult組裝ReturnSetInfo掛到fcinfo->resultinfo上
- plpgsql_exec_function時(shí)從fcinfo中拿出ReturnSetInfo取到desc
- plpgsql_estate_setup將取到的desc存入estate->rsi = rsi
#0 plpgsql_estate_setup (estate=0x7ffd81e2f850, func=0x2419028, rsi=0x7ffd81e2fb20, simple_eval_estate=0x0, simple_eval_resowner=0x0) at pl_exec.c:3972 #1 0x00007fe0a3992064 in plpgsql_exec_function (func=0x2419028, fcinfo=0x24da5a8, simple_eval_estate=0x0, simple_eval_resowner=0x0, procedure_resowner=0x0, atomic=true) at pl_exec.c:485 #2 0x00007fe0a39ac8f9 in plpgsql_call_handler (fcinfo=0x24da5a8) at pl_handler.c:277 #3 0x0000000000738829 in ExecMakeTableFunctionResult (setexpr=0x24e0b40, econtext=0x24e0a10, argContext=0x24da490, expectedDesc=0x24e1110, randomAccess=false) at execSRF.c:235 #4 0x0000000000753eed in FunctionNext (node=0x24e0800) at nodeFunctionscan.c:95 #5 0x000000000073a081 in ExecScanFetch (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:133 #6 0x000000000073a0f6 in ExecScan (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:182 #7 0x000000000075428c in ExecFunctionScan (pstate=0x24e0800) at nodeFunctionscan.c:270 #8 0x000000000073614e in ExecProcNodeFirst (node=0x24e0800) at execProcnode.c:464 #9 0x000000000072a08a in ExecProcNode (node=0x24e0800) at ../../../src/include/executor/executor.h:262 #10 0x000000000072cb80 in ExecutePlan (estate=0x24e05d8, planstate=0x24e0800, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24d5910, execute_once=true) at execMain.c:1632 #11 0x000000000072a6d1 in standard_ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:364 #12 0x000000000072a50b in ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308 #13 0x0000000000997ba9 in PortalRunSelect (portal=0x2474a28, forward=true, count=0, dest=0x24d5910) at pquery.c:924 #14 0x0000000000997867 in PortalRun (portal=0x2474a28, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x24d5910, altdest=0x24d5910, qc=0x7ffd81e300b0) at pquery.c:768 #15 0x0000000000991408 in exec_simple_query (query_string=0x23c9518 "select * from f1(42);") at postgres.c:1238 #16 0x0000000000995a3e in PostgresMain (dbname=0x2400998 "postgres", username=0x23c5178 "mingjie") at postgres.c:4563 #17 0x00000000008d3cfe in BackendRun (port=0x23f7220) at postmaster.c:4396 #18 0x00000000008d3697 in BackendStartup (port=0x23f7220) at postmaster.c:4124 #19 0x00000000008d00b8 in ServerLoop () at postmaster.c:1791 #20 0x00000000008cf98a in PostmasterMain (argc=1, argv=0x23c3120) at postmaster.c:1463 #21 0x00000000007ada4b in main (argc=1, argv=0x23c3120) at main.c:200
分析:
if (estate->tuple_store == NULL) exec_init_tuple_store(estate); tupdesc = estate->tuple_store_desc; natts = tupdesc->natts; if (stmt->retvarno >= 0) { PLpgSQL_datum *retvar = estate->datums[stmt->retvarno]; switch (retvar->dtype) {
初始化函數(shù)exec_init_tuple_store
static void exec_init_tuple_store(PLpgSQL_execstate *estate) { ReturnSetInfo *rsi = estate->rsi; MemoryContext oldcxt; ResourceOwner oldowner; // 從"SPI Proc"切換到"ExecutorState" oldcxt = MemoryContextSwitchTo(estate->tuple_store_cxt); // 從“Portal”切換到"Portal" oldowner = CurrentResourceOwner; CurrentResourceOwner = estate->tuple_store_owner; // 進(jìn)入tuplestore_begin_heap函數(shù) estate->tuple_store = tuplestore_begin_heap(rsi->allowedModes & SFRM_Materialize_Random, false, work_mem); CurrentResourceOwner = oldowner; MemoryContextSwitchTo(oldcxt); // 給estate添加DESC,rsi->expectedDesc的來(lái)源? estate->tuple_store_desc = rsi->expectedDesc; }
進(jìn)入tuplestore_begin_heap
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) { // 輸入false不允許隨機(jī)訪問(wèn)、false、8192 Tuplestorestate *state; int eflags; // eflags = EXEC_FLAG_REWIND eflags = randomAccess ? (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : (EXEC_FLAG_REWIND); // 進(jìn)入tuple store模塊開(kāi)始初始化返回Tuplestorestate,注意他會(huì)直接拿當(dāng)前的memcontext state = tuplestore_begin_common(eflags, interXact, maxKBytes); // 返回的Tuplestorestate狀態(tài): // state = {status = TSS_INMEM, eflags = 2, backward = false, interXact = false, // truncated = false, availMem = 8372200, allowedMem = 8388608, tuples = 0, // myfile = 0x0, context = "ExecutorState", resowner = "Portal", copytup = 0x0, // writetup = 0x0, readtup = 0x0, memtuples = 0x24f0d88, memtupdeleted = 0, // memtupcount = 0, memtupsize = 2048, growmemtuples = true, readptrs = 0x24e7a70, // activeptr = 0, readptrcount = 1, readptrsize = 8, writepos_file = 0,writepos_offset = 0} state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; return state; }
后面根據(jù)返回值的不同,進(jìn)入幾個(gè)分支。
在進(jìn)入前,desc已經(jīng)獲取到了: tupdesc = estate->tuple_store_desc;
natts = tupdesc->natts;
場(chǎng)景一:return next返回var類(lèi)型
case PLPGSQL_DTYPE_VAR: { PLpgSQL_var *var = (PLpgSQL_var *) retvar; Datum retval = var->value; bool isNull = var->isnull; Form_pg_attribute attr = TupleDescAttr(tupdesc, 0); if (natts != 1) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("wrong result type supplied in RETURN NEXT"))); // retval是一個(gè)eoh的頭,后續(xù)處理需要一個(gè)1be的頭(1be的data部分指向eoh) retval = MakeExpandedObjectReadOnly(retval, isNull, var->datatype->typlen); // 轉(zhuǎn)成需要的類(lèi)型 retval = exec_cast_value(estate, retval, &isNull, var->datatype->typoid, var->datatype->atttypmod, attr->atttypid, attr->atttypmod); tuplestore_putvalues(estate->tuple_store, tupdesc, &retval, &isNull); } break;
執(zhí)行tuplestore_putvalues保存元組
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull) { MinimalTuple tuple; MemoryContext oldcxt = MemoryContextSwitchTo(state->context); tuple = heap_form_minimal_tuple(tdesc, values, isnull); // 記錄使用了多少空間,修改state->availMem USEMEM(state, GetMemoryChunkSpace(tuple)); tuplestore_puttuple_common(state, (void *) tuple); MemoryContextSwitchTo(oldcxt); } static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) { TSReadPointer *readptr; int i; ResourceOwner oldowner; state->tuples++; switch (state->status) {
內(nèi)存態(tài)直接用數(shù)組緩存tuple,tuple使用的內(nèi)存是在外層函數(shù)切換上下文申請(qǐng)的。
case TSS_INMEM: readptr = state->readptrs; for (i = 0; i < state->readptrcount; readptr++, i++) { if (readptr->eof_reached && i != state->activeptr) { readptr->eof_reached = false; readptr->current = state->memtupcount; } } if (state->memtupcount >= state->memtupsize - 1) { (void) grow_memtuples(state); } state->memtuples[state->memtupcount++] = tuple; if (state->memtupcount < state->memtupsize && !LACKMEM(state)) return; PrepareTempTablespaces(); oldowner = CurrentResourceOwner; CurrentResourceOwner = state->resowner; state->myfile = BufFileCreateTemp(state->interXact); CurrentResourceOwner = oldowner; state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0; state->status = TSS_WRITEFILE; dumptuples(state); break; ...
場(chǎng)景二:return next返回record類(lèi)型
case PLPGSQL_DTYPE_REC: { PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar; TupleDesc rec_tupdesc; TupleConversionMap *tupmap;
拿到record:
{dtype = PLPGSQL_DTYPE_REC, dno = 1, refname = 0x24db608 "r", lineno = 3, isconst = false, notnull = false, default_val = 0x0, datatype = {typname='foo'}, rectypeid = 17117, firstfield = -1, erh = 0x2509708}
- 數(shù)據(jù)和desc都在erh中,列名在firstfield指向的位置。
- 數(shù)據(jù)類(lèi)型在datatype中:foo
- 數(shù)據(jù)類(lèi)型oid在rectypeid中:17117->foo
if (rec->erh == NULL) instantiate_empty_record_variable(estate, rec); if (ExpandedRecordIsEmpty(rec->erh)) deconstruct_expanded_record(rec->erh); // "SPI Proc"切到"ExprContext" oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate)); // return erh->er_tupdesc; rec_tupdesc = expanded_record_get_tupdesc(rec->erh); // 從保存的desc:rec_tupdesc轉(zhuǎn)換到輸出的desc:tupdesc,第一步:生成轉(zhuǎn)換map tupmap = convert_tuples_by_position(rec_tupdesc, tupdesc, gettext_noop("wrong record type supplied in RETURN NEXT")); tuple = expanded_record_get_tuple(rec->erh); if (tupmap) // 從保存的desc:rec_tupdesc轉(zhuǎn)換到輸出的desc:tupdesc,第二步:用map生成轉(zhuǎn)換后的元組 tuple = execute_attr_map_tuple(tuple, tupmap); // 緩存元組 tuplestore_puttuple(estate->tuple_store, tuple); MemoryContextSwitchTo(oldcontext); } break;
場(chǎng)景三:return next返回row類(lèi)型
必須是兩列以上的out參數(shù),直接return next空,才會(huì)使用這段邏輯。
case PLPGSQL_DTYPE_ROW: { PLpgSQL_row *row = (PLpgSQL_row *) retvar; oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate)); // 必須嚴(yán)格匹配tupdesc的類(lèi)型,對(duì)不上則轉(zhuǎn)換失敗 tuple = make_tuple_from_row(estate, row, tupdesc); if (tuple == NULL) ereport(ERROR,...) tuplestore_puttuple(estate->tuple_store, tuple); MemoryContextSwitchTo(oldcontext); } break; default: elog(ERROR, "unrecognized dtype: %d", retvar->dtype); break; } }
3 用例
drop function f1; create or replace function f1(in i int, out j int) returns setof int as $$ begin j := i+1; return next; j := i+2; return next; return; end$$ language plpgsql; select * from f1(42); ---- CREATE TABLE foo (fooid INT, foosubid INT, fooname TEXT); INSERT INTO foo VALUES (1, 2, 'three'); INSERT INTO foo VALUES (4, 5, 'six'); CREATE OR REPLACE FUNCTION get_all_foo() RETURNS SETOF foo AS $BODY$ DECLARE r foo%rowtype; BEGIN FOR r IN SELECT * FROM foo WHERE fooid > 0 LOOP -- can do some processing here RETURN NEXT r; -- return current row of SELECT END LOOP; RETURN; END; $BODY$ LANGUAGE plpgsql; SELECT * FROM get_all_foo(); -------- drop function f1(int); create function f1(in i int, out j int, out k text) returns setof record as $$ begin j := i+1; k := 'foo'; return next; j := j+1; k := 'foot'; return next; return; end$$ language plpgsql; select * from f1(42);
以上就是Postgresql源碼分析returns setof函數(shù)oracle管道pipelined的詳細(xì)內(nèi)容,更多關(guān)于Postgresql returns setof函數(shù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決sqoop從postgresql拉數(shù)據(jù),報(bào)錯(cuò)TCP/IP連接的問(wèn)題
這篇文章主要介紹了解決sqoop從postgresql拉數(shù)據(jù),報(bào)錯(cuò)TCP/IP連接的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12postgresql 補(bǔ)齊空值、自定義查詢(xún)字段并賦值操作
這篇文章主要介紹了postgresql 補(bǔ)齊空值、自定義查詢(xún)字段并賦值操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01PostgreSQL 禁用全表掃描的實(shí)現(xiàn)
這篇文章主要介紹了PostgreSQL 禁用全表掃描的實(shí)現(xiàn)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01PostgreSQL 存儲(chǔ)過(guò)程的進(jìn)階講解(含游標(biāo)、錯(cuò)誤處理、自定義函數(shù)、事務(wù))
PL/pgSQL 游標(biāo)允許我們封裝一個(gè)查詢(xún),然后每次處理結(jié)果集中的一條記錄,這篇文章主要介紹了PostgreSQL 存儲(chǔ)過(guò)程的進(jìn)階介紹(含游標(biāo)、錯(cuò)誤處理、自定義函數(shù)、事務(wù)),需要的朋友可以參考下2023-03-03PostgreSQL中ON?CONFLICT的使用及一些擴(kuò)展用法
Postgres?ON?CONFLICT是PostgreSQL數(shù)據(jù)庫(kù)中的一個(gè)功能,用于處理插入或更新數(shù)據(jù)時(shí)的沖突情況,下面這篇文章主要給大家介紹了關(guān)于PostgreSQL中ON?CONFLICT的使用及一些擴(kuò)展用法的相關(guān)資料,需要的朋友可以參考下2024-06-06PGSQL實(shí)現(xiàn)判斷一個(gè)空值字段,并將NULL值修改為其它值
這篇文章主要介紹了PGSQL實(shí)現(xiàn)判斷一個(gè)空值字段,并將NULL值修改為其它值,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01PostgreSQL數(shù)據(jù)庫(kù)事務(wù)出現(xiàn)未知狀態(tài)的處理方法
這篇文章主要給大家介紹了PostgreSQL數(shù)據(jù)庫(kù)事務(wù)出現(xiàn)未知狀態(tài)的處理方法,需要的朋友可以參考下2017-07-07postgresql 計(jì)算時(shí)間差的秒數(shù)、天數(shù)實(shí)例
這篇文章主要介紹了postgresql 計(jì)算時(shí)間差的秒數(shù)、天數(shù)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12Postgresql 數(shù)據(jù)庫(kù)權(quán)限功能的使用總結(jié)
這篇文章主要介紹了Postgresql 數(shù)據(jù)庫(kù)權(quán)限功能的使用總結(jié),具有很好的參考價(jià)值,對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02