📄 spi.c
字号:
if (_SPI_curid != _SPI_connected || _SPI_connected < 0) elog(ERROR, "improper call to spi_printtup"); if (_SPI_current != &(_SPI_stack[_SPI_curid])) elog(ERROR, "SPI stack corrupted"); tuptable = _SPI_current->tuptable; if (tuptable == NULL) elog(ERROR, "improper call to spi_printtup"); oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt); if (tuptable->free == 0) { tuptable->free = 256; tuptable->alloced += tuptable->free; tuptable->vals = (HeapTuple *) repalloc(tuptable->vals, tuptable->alloced * sizeof(HeapTuple)); } tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple(tuple); (tuptable->free)--; MemoryContextSwitchTo(oldcxt);}/* * Static functions *//* * Plan and optionally execute a querystring. * * If plan != NULL, just prepare plan tree, else execute immediately. */static int_SPI_execute(const char *src, int tcount, _SPI_plan *plan){ List *raw_parsetree_list; List *query_list_list; List *plan_list; List *list_item; int nargs = 0; Oid *argtypes = NULL; int res = 0; if (plan) { nargs = plan->nargs; argtypes = plan->argtypes; } /* Increment CommandCounter to see changes made by now */ CommandCounterIncrement(); /* Reset state (only needed in case string is empty) */ SPI_processed = 0; SPI_lastoid = InvalidOid; SPI_tuptable = NULL; _SPI_current->tuptable = NULL; /* * Parse the request string into a list of raw parse trees. */ raw_parsetree_list = pg_parse_query(src); /* * Do parse analysis and rule rewrite for each raw parsetree. * * We save the querytrees from each raw parsetree as a separate sublist. * This allows _SPI_execute_plan() to know where the boundaries * between original queries fall. */ query_list_list = NIL; plan_list = NIL; foreach(list_item, raw_parsetree_list) { Node *parsetree = (Node *) lfirst(list_item); List *query_list; List *query_list_item; query_list = pg_analyze_and_rewrite(parsetree, argtypes, nargs); query_list_list = lappend(query_list_list, query_list); /* Reset state for each original parsetree */ /* (at most one of its querytrees will be marked canSetTag) */ SPI_processed = 0; SPI_lastoid = InvalidOid; SPI_tuptable = NULL; _SPI_current->tuptable = NULL; foreach(query_list_item, query_list) { Query *queryTree = (Query *) lfirst(query_list_item); Plan *planTree; QueryDesc *qdesc; DestReceiver *dest; planTree = pg_plan_query(queryTree); plan_list = lappend(plan_list, planTree); dest = CreateDestReceiver(queryTree->canSetTag ? SPI : None, NULL); if (queryTree->commandType == CMD_UTILITY) { if (IsA(queryTree->utilityStmt, CopyStmt)) { CopyStmt *stmt = (CopyStmt *) queryTree->utilityStmt; if (stmt->filename == NULL) return SPI_ERROR_COPY; } else if (IsA(queryTree->utilityStmt, DeclareCursorStmt) || IsA(queryTree->utilityStmt, ClosePortalStmt) || IsA(queryTree->utilityStmt, FetchStmt)) return SPI_ERROR_CURSOR; else if (IsA(queryTree->utilityStmt, TransactionStmt)) return SPI_ERROR_TRANSACTION; res = SPI_OK_UTILITY; if (plan == NULL) { ProcessUtility(queryTree->utilityStmt, dest, NULL); CommandCounterIncrement(); } } else if (plan == NULL) { qdesc = CreateQueryDesc(queryTree, planTree, dest, NULL, false); res = _SPI_pquery(qdesc, true, false, queryTree->canSetTag ? tcount : 0); if (res < 0) return res; CommandCounterIncrement(); } else { qdesc = CreateQueryDesc(queryTree, planTree, dest, NULL, false); res = _SPI_pquery(qdesc, false, false, 0); if (res < 0) return res; } } } if (plan) { plan->qtlist = query_list_list; plan->ptlist = plan_list; } return res;}static int_SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, bool useCurrentSnapshot, int tcount){ List *query_list_list = plan->qtlist; List *plan_list = plan->ptlist; List *query_list_list_item; int nargs = plan->nargs; int res = 0; ParamListInfo paramLI; /* Increment CommandCounter to see changes made by now */ CommandCounterIncrement(); /* Convert parameters to form wanted by executor */ if (nargs > 0) { int k; paramLI = (ParamListInfo) palloc0((nargs + 1) * sizeof(ParamListInfoData)); for (k = 0; k < nargs; k++) { paramLI[k].kind = PARAM_NUM; paramLI[k].id = k + 1; paramLI[k].isnull = (Nulls && Nulls[k] == 'n'); paramLI[k].value = Values[k]; } paramLI[k].kind = PARAM_INVALID; } else paramLI = NULL; /* Reset state (only needed in case string is empty) */ SPI_processed = 0; SPI_lastoid = InvalidOid; SPI_tuptable = NULL; _SPI_current->tuptable = NULL; foreach(query_list_list_item, query_list_list) { List *query_list = lfirst(query_list_list_item); List *query_list_item; /* Reset state for each original parsetree */ /* (at most one of its querytrees will be marked canSetTag) */ SPI_processed = 0; SPI_lastoid = InvalidOid; SPI_tuptable = NULL; _SPI_current->tuptable = NULL; foreach(query_list_item, query_list) { Query *queryTree = (Query *) lfirst(query_list_item); Plan *planTree; QueryDesc *qdesc; DestReceiver *dest; planTree = lfirst(plan_list); plan_list = lnext(plan_list); dest = CreateDestReceiver(queryTree->canSetTag ? SPI : None, NULL); if (queryTree->commandType == CMD_UTILITY) { ProcessUtility(queryTree->utilityStmt, dest, NULL); res = SPI_OK_UTILITY; CommandCounterIncrement(); } else { qdesc = CreateQueryDesc(queryTree, planTree, dest, paramLI, false); res = _SPI_pquery(qdesc, true, useCurrentSnapshot, queryTree->canSetTag ? tcount : 0); if (res < 0) return res; CommandCounterIncrement(); } } } return res;}static int_SPI_pquery(QueryDesc *queryDesc, bool runit, bool useCurrentSnapshot, int tcount){ int operation = queryDesc->operation; int res; Oid save_lastoid; switch (operation) { case CMD_SELECT: res = SPI_OK_SELECT; if (queryDesc->parsetree->into != NULL) /* select into table */ { res = SPI_OK_SELINTO; queryDesc->dest = None_Receiver; /* don't output results */ } break; case CMD_INSERT: res = SPI_OK_INSERT; break; case CMD_DELETE: res = SPI_OK_DELETE; break; case CMD_UPDATE: res = SPI_OK_UPDATE; break; default: return SPI_ERROR_OPUNKNOWN; } if (!runit) /* plan preparation, don't execute */ return res;#ifdef SPI_EXECUTOR_STATS if (ShowExecutorStats) ResetUsage();#endif ExecutorStart(queryDesc, useCurrentSnapshot, false); ExecutorRun(queryDesc, ForwardScanDirection, (long) tcount); _SPI_current->processed = queryDesc->estate->es_processed; save_lastoid = queryDesc->estate->es_lastoid; if (operation == CMD_SELECT && queryDesc->dest->mydest == SPI) { if (_SPI_checktuples()) elog(ERROR, "consistency check on SPI tuple count failed"); } if (queryDesc->dest->mydest == SPI) { SPI_processed = _SPI_current->processed; SPI_lastoid = save_lastoid; SPI_tuptable = _SPI_current->tuptable; } else if (res == SPI_OK_SELECT) { /* Don't return SPI_OK_SELECT if we discarded the result */ res = SPI_OK_UTILITY; } ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc);#ifdef SPI_EXECUTOR_STATS if (ShowExecutorStats) ShowUsage("SPI EXECUTOR STATS");#endif return res;}/* * _SPI_cursor_operation() * * Do a FETCH or MOVE in a cursor */static void_SPI_cursor_operation(Portal portal, bool forward, int count, DestReceiver *dest){ long nfetched; /* Check that the portal is valid */ if (!PortalIsValid(portal)) elog(ERROR, "invalid portal in SPI cursor operation"); /* Push the SPI stack */ if (_SPI_begin_call(true) < 0) elog(ERROR, "SPI cursor operation called while not connected"); /* Reset the SPI result */ SPI_processed = 0; SPI_tuptable = NULL; _SPI_current->processed = 0; _SPI_current->tuptable = NULL; /* Run the cursor */ nfetched = PortalRunFetch(portal, forward ? FETCH_FORWARD : FETCH_BACKWARD, (long) count, dest); /* * Think not to combine this store with the preceding function call. * If the portal contains calls to functions that use SPI, then * SPI_stack is likely to move around while the portal runs. When * control returns, _SPI_current will point to the correct stack * entry... but the pointer may be different than it was beforehand. * So we must be sure to re-fetch the pointer after the function call * completes. */ _SPI_current->processed = nfetched; if (dest->mydest == SPI && _SPI_checktuples()) elog(ERROR, "consistency check on SPI tuple count failed"); /* Put the result into place for access by caller */ SPI_processed = _SPI_current->processed; SPI_tuptable = _SPI_current->tuptable; /* Pop the SPI stack */ _SPI_end_call(true);}static MemoryContext_SPI_execmem(){ return MemoryContextSwitchTo(_SPI_current->execCxt);}static MemoryContext_SPI_procmem(){ return MemoryContextSwitchTo(_SPI_current->procCxt);}/* * _SPI_begin_call: begin a SPI operation within a connected procedure */static int_SPI_begin_call(bool execmem){ if (_SPI_curid + 1 != _SPI_connected) return SPI_ERROR_UNCONNECTED; _SPI_curid++; if (_SPI_current != &(_SPI_stack[_SPI_curid])) elog(ERROR, "SPI stack corrupted"); if (execmem) /* switch to the Executor memory context */ _SPI_execmem(); return 0;}/* * _SPI_end_call: end a SPI operation within a connected procedure * * Note: this currently has no failure return cases, so callers don't check */static int_SPI_end_call(bool procmem){ /* * We're returning to procedure where _SPI_curid == _SPI_connected - 1 */ _SPI_curid--; if (procmem) /* switch to the procedure memory context */ { _SPI_procmem(); /* and free Executor memory */ MemoryContextResetAndDeleteChildren(_SPI_current->execCxt); } return 0;}static bool_SPI_checktuples(void){ uint32 processed = _SPI_current->processed; SPITupleTable *tuptable = _SPI_current->tuptable; bool failed = false; if (tuptable == NULL) /* spi_dest_startup was not called */ failed = true; else if (processed != (tuptable->alloced - tuptable->free)) failed = true; return failed;}static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location){ _SPI_plan *newplan; MemoryContext oldcxt; MemoryContext plancxt; MemoryContext parentcxt; /* Determine correct parent for the plan's memory context */ if (location == _SPI_CPLAN_PROCXT) parentcxt = _SPI_current->procCxt; else if (location == _SPI_CPLAN_TOPCXT) parentcxt = TopMemoryContext; else/* (this case not currently used) */ parentcxt = CurrentMemoryContext; /* * Create a memory context for the plan. We don't expect the plan to * be very large, so use smaller-than-default alloc parameters. */ plancxt = AllocSetContextCreate(parentcxt, "SPI Plan", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); oldcxt = MemoryContextSwitchTo(plancxt); /* Copy the SPI plan into its own context */ newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan)); newplan->plancxt = plancxt; newplan->qtlist = (List *) copyObject(plan->qtlist); newplan->ptlist = (List *) copyObject(plan->ptlist); newplan->nargs = plan->nargs; if (plan->nargs > 0) { newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid)); memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid)); } else newplan->argtypes = NULL; MemoryContextSwitchTo(oldcxt); return newplan;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -