📄 spi.c
字号:
/* * spi_printtup * store tuple retrieved by Executor into SPITupleTable * of current SPI procedure */voidspi_printtup(TupleTableSlot *slot, DestReceiver *self){ SPITupleTable *tuptable; MemoryContext oldcxt; /* * When called by Executor _SPI_curid expected to be equal to * _SPI_connected */ if (_SPI_curid != _SPI_connected || _SPI_connected < 0) elog(ERROR, "improper call to spi_printtup"); if (_SPI_current != &(_SPI_stack[_SPI_curid])) elog(ERROR, "SPI stack corrupted"); tuptable = _SPI_current->tuptable; if (tuptable == NULL) elog(ERROR, "improper call to spi_printtup"); oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt); if (tuptable->free == 0) { tuptable->free = 256; tuptable->alloced += tuptable->free; tuptable->vals = (HeapTuple *) repalloc(tuptable->vals, tuptable->alloced * sizeof(HeapTuple)); } tuptable->vals[tuptable->alloced - tuptable->free] = ExecCopySlotTuple(slot); (tuptable->free)--; MemoryContextSwitchTo(oldcxt);}/* * Static functions *//* * Parse and plan a querystring. * * At entry, plan->argtypes and plan->nargs must be valid. * * Query and plan lists are stored into *plan. */static void_SPI_prepare_plan(const char *src, _SPI_plan *plan){ List *raw_parsetree_list; List *query_list_list; List *plan_list; ListCell *list_item; ErrorContextCallback spierrcontext; Oid *argtypes = plan->argtypes; int nargs = plan->nargs; /* * Increment CommandCounter to see changes made by now. We must do this * to be sure of seeing any schema changes made by a just-preceding SPI * command. (But we don't bother advancing the snapshot, since the * planner generally operates under SnapshotNow rules anyway.) */ CommandCounterIncrement(); /* * Setup error traceback support for ereport() */ spierrcontext.callback = _SPI_error_callback; spierrcontext.arg = (void *) src; spierrcontext.previous = error_context_stack; error_context_stack = &spierrcontext; /* * Parse the request string into a list of raw parse trees. */ raw_parsetree_list = pg_parse_query(src); /* * Do parse analysis and rule rewrite for each raw parsetree. * * We save the querytrees from each raw parsetree as a separate sublist. * This allows _SPI_execute_plan() to know where the boundaries between * original queries fall. */ query_list_list = NIL; plan_list = NIL; foreach(list_item, raw_parsetree_list) { Node *parsetree = (Node *) lfirst(list_item); List *query_list; query_list = pg_analyze_and_rewrite(parsetree, argtypes, nargs); query_list_list = lappend(query_list_list, query_list); plan_list = list_concat(plan_list, pg_plan_queries(query_list, NULL, false)); } plan->qtlist = query_list_list; plan->ptlist = plan_list; /* * Pop the error context stack */ error_context_stack = spierrcontext.previous;}/* * Execute the given plan with the given parameter values * * snapshot: query snapshot to use, or InvalidSnapshot for the normal * behavior of taking a new snapshot for each query. * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot * read_only: TRUE for read-only execution (no CommandCounterIncrement) * tcount: execution tuple-count limit, or 0 for none */static int_SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, Snapshot snapshot, Snapshot crosscheck_snapshot, bool read_only, long tcount){ volatile int res = 0; volatile uint32 my_processed = 0; volatile Oid my_lastoid = InvalidOid; SPITupleTable *volatile my_tuptable = NULL; Snapshot saveActiveSnapshot; /* Be sure to restore ActiveSnapshot on error exit */ saveActiveSnapshot = ActiveSnapshot; PG_TRY(); { List *query_list_list = plan->qtlist; ListCell *plan_list_item = list_head(plan->ptlist); ListCell *query_list_list_item; ErrorContextCallback spierrcontext; int nargs = plan->nargs; ParamListInfo paramLI; /* Convert parameters to form wanted by executor */ if (nargs > 0) { int k; paramLI = (ParamListInfo) palloc0((nargs + 1) * sizeof(ParamListInfoData)); for (k = 0; k < nargs; k++) { paramLI[k].kind = PARAM_NUM; paramLI[k].id = k + 1; paramLI[k].ptype = plan->argtypes[k]; paramLI[k].isnull = (Nulls && Nulls[k] == 'n'); paramLI[k].value = Values[k]; } paramLI[k].kind = PARAM_INVALID; } else paramLI = NULL; /* * Setup error traceback support for ereport() */ spierrcontext.callback = _SPI_error_callback; spierrcontext.arg = (void *) plan->query; spierrcontext.previous = error_context_stack; error_context_stack = &spierrcontext; foreach(query_list_list_item, query_list_list) { List *query_list = lfirst(query_list_list_item); ListCell *query_list_item; foreach(query_list_item, query_list) { Query *queryTree = (Query *) lfirst(query_list_item); Plan *planTree; QueryDesc *qdesc; DestReceiver *dest; planTree = lfirst(plan_list_item); plan_list_item = lnext(plan_list_item); _SPI_current->processed = 0; _SPI_current->lastoid = InvalidOid; _SPI_current->tuptable = NULL; if (queryTree->commandType == CMD_UTILITY) { if (IsA(queryTree->utilityStmt, CopyStmt)) { CopyStmt *stmt = (CopyStmt *) queryTree->utilityStmt; if (stmt->filename == NULL) { res = SPI_ERROR_COPY; goto fail; } } else if (IsA(queryTree->utilityStmt, DeclareCursorStmt) || IsA(queryTree->utilityStmt, ClosePortalStmt) || IsA(queryTree->utilityStmt, FetchStmt)) { res = SPI_ERROR_CURSOR; goto fail; } else if (IsA(queryTree->utilityStmt, TransactionStmt)) { res = SPI_ERROR_TRANSACTION; goto fail; } } if (read_only && !QueryIsReadOnly(queryTree)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), /* translator: %s is a SQL statement name */ errmsg("%s is not allowed in a non-volatile function", CreateQueryTag(queryTree)))); /* * If not read-only mode, advance the command counter before * each command. */ if (!read_only) CommandCounterIncrement(); dest = CreateDestReceiver(queryTree->canSetTag ? DestSPI : DestNone, NULL); if (snapshot == InvalidSnapshot) { /* * Default read_only behavior is to use the entry-time * ActiveSnapshot; if read-write, grab a full new snap. */ if (read_only) ActiveSnapshot = CopySnapshot(saveActiveSnapshot); else ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); } else { /* * We interpret read_only with a specified snapshot to be * exactly that snapshot, but read-write means use the * snap with advancing of command ID. */ ActiveSnapshot = CopySnapshot(snapshot); if (!read_only) ActiveSnapshot->curcid = GetCurrentCommandId(); } if (queryTree->commandType == CMD_UTILITY) { ProcessUtility(queryTree->utilityStmt, paramLI, dest, NULL); res = SPI_OK_UTILITY; } else { qdesc = CreateQueryDesc(queryTree, planTree, ActiveSnapshot, crosscheck_snapshot, dest, paramLI, false); res = _SPI_pquery(qdesc, queryTree->canSetTag ? tcount : 0); FreeQueryDesc(qdesc); } FreeSnapshot(ActiveSnapshot); ActiveSnapshot = NULL; /* * The last canSetTag query sets the auxiliary values returned * to the caller. Be careful to free any tuptables not * returned, to avoid intratransaction memory leak. */ if (queryTree->canSetTag) { my_processed = _SPI_current->processed; my_lastoid = _SPI_current->lastoid; SPI_freetuptable(my_tuptable); my_tuptable = _SPI_current->tuptable; } else { SPI_freetuptable(_SPI_current->tuptable); _SPI_current->tuptable = NULL; } /* we know that the receiver doesn't need a destroy call */ if (res < 0) goto fail; } }fail: /* * Pop the error context stack */ error_context_stack = spierrcontext.previous; } PG_CATCH(); { /* Restore global vars and propagate error */ ActiveSnapshot = saveActiveSnapshot; PG_RE_THROW(); } PG_END_TRY(); ActiveSnapshot = saveActiveSnapshot; /* Save results for caller */ SPI_processed = my_processed; SPI_lastoid = my_lastoid; SPI_tuptable = my_tuptable; return res;}static int_SPI_pquery(QueryDesc *queryDesc, long tcount){ int operation = queryDesc->operation; int res; switch (operation) { case CMD_SELECT: res = SPI_OK_SELECT; if (queryDesc->parsetree->into) /* select into table? */ { res = SPI_OK_SELINTO; queryDesc->dest = None_Receiver; /* don't output results */ } else if (queryDesc->dest->mydest != DestSPI) { /* Don't return SPI_OK_SELECT if we're discarding result */ res = SPI_OK_UTILITY; } break; case CMD_INSERT: res = SPI_OK_INSERT; break; case CMD_DELETE: res = SPI_OK_DELETE; break; case CMD_UPDATE: res = SPI_OK_UPDATE; break; default: return SPI_ERROR_OPUNKNOWN; }#ifdef SPI_EXECUTOR_STATS if (ShowExecutorStats) ResetUsage();#endif AfterTriggerBeginQuery(); ExecutorStart(queryDesc, false); ExecutorRun(queryDesc, ForwardScanDirection, tcount); _SPI_current->processed = queryDesc->estate->es_processed; _SPI_current->lastoid = queryDesc->estate->es_lastoid; if (operation == CMD_SELECT && queryDesc->dest->mydest == DestSPI) { if (_SPI_checktuples()) elog(ERROR, "consistency check on SPI tuple count failed"); } /* Take care of any queued AFTER triggers */ AfterTriggerEndQuery(queryDesc->estate); ExecutorEnd(queryDesc);#ifdef SPI_EXECUTOR_STATS if (ShowExecutorStats) ShowUsage("SPI EXECUTOR STATS");#endif return res;}/* * _SPI_error_callback * * Add context information when a query invoked via SPI fails */static void_SPI_error_callback(void *arg){ const char *query = (const char *) arg; int syntaxerrposition; /* * If there is a syntax error position, convert to internal syntax error; * otherwise treat the query as an item of context stack */ syntaxerrposition = geterrposition(); if (syntaxerrposition > 0) { errposition(0); internalerrposition(syntaxerrposition); internalerrquery(query); } else errcontext("SQL statement \"%s\"", query);}/* * _SPI_cursor_operation() * * Do a FETCH or MOVE in a cursor */static void_SPI_cursor_operation(Portal portal, bool forward, long count, DestReceiver *dest){ long nfetched; /* Check that the portal is valid */ if (!PortalIsValid(portal)) elog(ERROR, "invalid portal in SPI cursor operation"); /* Push the SPI stack */ if (_SPI_begin_call(true) < 0) elog(ERROR, "SPI cursor operation called while not connected"); /* Reset the SPI result (note we deliberately don't touch lastoid) */ SPI_processed = 0; SPI_tuptable = NULL; _SPI_current->processed = 0; _SPI_current->tuptable = NULL; /* Run the cursor */ nfetched = PortalRunFetch(portal, forward ? FETCH_FORWARD : FETCH_BACKWARD, count, dest); /* * Think not to combine this store with the preceding function call. If * the portal contains calls to functions that use SPI, then SPI_stack is * likely to move around while the portal runs. When control returns, * _SPI_current will point to the correct stack entry... but the pointer * may be different than it was beforehand. So we must be sure to re-fetch * the pointer after the function call completes. */ _SPI_current->processed = nfetched; if (dest->mydest == DestSPI && _SPI_checktuples()) elog(ERROR, "consistency check on SPI tuple count failed"); /* Put the result into place for access by caller */ SPI_processed = _SPI_current->processed; SPI_tuptable = _SPI_current->tuptable; /* Pop the SPI stack */ _SPI_end_call(true);}static MemoryContext_SPI_execmem(void){ return MemoryContextSwitchTo(_SPI_current->execCxt);}static MemoryContext_SPI_procmem(void){ return MemoryContextSwitchTo(_SPI_current->procCxt);}/* * _SPI_begin_call: begin a SPI operation within a connected procedure */static int_SPI_begin_call(bool execmem){ if (_SPI_curid + 1 != _SPI_connected) return SPI_ERROR_UNCONNECTED; _SPI_curid++; if (_SPI_current != &(_SPI_stack[_SPI_curid])) elog(ERROR, "SPI stack corrupted"); if (execmem) /* switch to the Executor memory context */ _SPI_execmem(); return 0;}/* * _SPI_end_call: end a SPI operation within a connected procedure * * Note: this currently has no failure return cases, so callers don't check */static int_SPI_end_call(bool procmem){ /* * We're returning to procedure where _SPI_curid == _SPI_connected - 1 */ _SPI_curid--; if (procmem) /* switch to the procedure memory context */ { _SPI_procmem(); /* and free Executor memory */ MemoryContextResetAndDeleteChildren(_SPI_current->execCxt); } return 0;}static bool_SPI_checktuples(void){ uint32 processed = _SPI_current->processed; SPITupleTable *tuptable = _SPI_current->tuptable; bool failed = false; if (tuptable == NULL) /* spi_dest_startup was not called */ failed = true; else if (processed != (tuptable->alloced - tuptable->free)) failed = true; return failed;}static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location){ _SPI_plan *newplan; MemoryContext oldcxt; MemoryContext plancxt; MemoryContext parentcxt; /* Determine correct parent for the plan's memory context */ if (location == _SPI_CPLAN_PROCXT) parentcxt = _SPI_current->procCxt; else if (location == _SPI_CPLAN_TOPCXT) parentcxt = TopMemoryContext; else /* (this case not currently used) */ parentcxt = CurrentMemoryContext; /* * Create a memory context for the plan. We don't expect the plan to be * very large, so use smaller-than-default alloc parameters. */ plancxt = AllocSetContextCreate(parentcxt, "SPI Plan", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); oldcxt = MemoryContextSwitchTo(plancxt); /* Copy the SPI plan into its own context */ newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan)); newplan->plancxt = plancxt; newplan->query = pstrdup(plan->query); newplan->qtlist = (List *) copyObject(plan->qtlist); newplan->ptlist = (List *) copyObject(plan->ptlist); newplan->nargs = plan->nargs; if (plan->nargs > 0) { newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid)); memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid)); } else newplan->argtypes = NULL; MemoryContextSwitchTo(oldcxt); return newplan;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -