📄 pquery.c
字号:
PortalSetResultFormat(Portal portal, int nFormats, int16 *formats){ int natts; int i; /* Do nothing if portal won't return tuples */ if (portal->tupDesc == NULL) return; natts = portal->tupDesc->natts; portal->formats = (int16 *) MemoryContextAlloc(PortalGetHeapMemory(portal), natts * sizeof(int16)); if (nFormats > 1) { /* format specified for each column */ if (nFormats != natts) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("bind message has %d result formats but query has %d columns", nFormats, natts))); memcpy(portal->formats, formats, natts * sizeof(int16)); } else if (nFormats > 0) { /* single format specified, use for all columns */ int16 format1 = formats[0]; for (i = 0; i < natts; i++) portal->formats[i] = format1; } else { /* use default format for all columns */ for (i = 0; i < natts; i++) portal->formats[i] = 0; }}/* * PortalRun * Run a portal's query or queries. * * count <= 0 is interpreted as a no-op: the destination gets started up * and shut down, but nothing else happens. Also, count == FETCH_ALL is * interpreted as "all rows". Note that count is ignored in multi-query * situations, where we always run the portal to completion. * * dest: where to send output of primary (canSetTag) query * * altdest: where to send output of non-primary queries * * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE * in which to store a command completion status string. * May be NULL if caller doesn't want a status string. * * Returns TRUE if the portal's execution is complete, FALSE if it was * suspended due to exhaustion of the count parameter. */boolPortalRun(Portal portal, long count, DestReceiver *dest, DestReceiver *altdest, char *completionTag){ bool result; ResourceOwner saveTopTransactionResourceOwner; MemoryContext saveTopTransactionContext; Portal saveActivePortal; Snapshot saveActiveSnapshot; ResourceOwner saveResourceOwner; MemoryContext savePortalContext; MemoryContext saveQueryContext; MemoryContext saveMemoryContext; AssertArg(PortalIsValid(portal)); /* Initialize completion tag to empty string */ if (completionTag) completionTag[0] = '\0'; if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY) { ereport(DEBUG3, (errmsg_internal("PortalRun"))); /* PORTAL_MULTI_QUERY logs its own stats per query */ ResetUsage(); } /* * Check for improper portal use, and mark portal active. */ if (portal->status != PORTAL_READY) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("portal \"%s\" cannot be run", portal->name))); portal->status = PORTAL_ACTIVE; /* * Set up global portal context pointers. * * We have to play a special game here to support utility commands like * VACUUM and CLUSTER, which internally start and commit transactions. * When we are called to execute such a command, CurrentResourceOwner will * be pointing to the TopTransactionResourceOwner --- which will be * destroyed and replaced in the course of the internal commit and * restart. So we need to be prepared to restore it as pointing to the * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of * internally starting whole new transactions is not good.) * CurrentMemoryContext has a similar problem, but the other pointers we * save here will be NULL or pointing to longer-lived objects. */ saveTopTransactionResourceOwner = TopTransactionResourceOwner; saveTopTransactionContext = TopTransactionContext; saveActivePortal = ActivePortal; saveActiveSnapshot = ActiveSnapshot; saveResourceOwner = CurrentResourceOwner; savePortalContext = PortalContext; saveQueryContext = QueryContext; saveMemoryContext = CurrentMemoryContext; PG_TRY(); { ActivePortal = portal; ActiveSnapshot = NULL; /* will be set later */ CurrentResourceOwner = portal->resowner; PortalContext = PortalGetHeapMemory(portal); QueryContext = portal->queryContext; MemoryContextSwitchTo(PortalContext); switch (portal->strategy) { case PORTAL_ONE_SELECT: (void) PortalRunSelect(portal, true, count, dest); /* we know the query is supposed to set the tag */ if (completionTag && portal->commandTag) strcpy(completionTag, portal->commandTag); /* Mark portal not active */ portal->status = PORTAL_READY; /* * Since it's a forward fetch, say DONE iff atEnd is now true. */ result = portal->atEnd; break; case PORTAL_UTIL_SELECT: /* * If we have not yet run the utility statement, do so, * storing its results in the portal's tuplestore. */ if (!portal->portalUtilReady) { DestReceiver *treceiver; PortalCreateHoldStore(portal); treceiver = CreateDestReceiver(DestTuplestore, portal); PortalRunUtility(portal, linitial(portal->parseTrees), treceiver, NULL); (*treceiver->rDestroy) (treceiver); portal->portalUtilReady = true; } /* * Now fetch desired portion of results. */ (void) PortalRunSelect(portal, true, count, dest); /* * We know the query is supposed to set the tag; we assume * only the default tag is needed. */ if (completionTag && portal->commandTag) strcpy(completionTag, portal->commandTag); /* Mark portal not active */ portal->status = PORTAL_READY; /* * Since it's a forward fetch, say DONE iff atEnd is now true. */ result = portal->atEnd; break; case PORTAL_MULTI_QUERY: PortalRunMulti(portal, dest, altdest, completionTag); /* Prevent portal's commands from being re-executed */ portal->status = PORTAL_DONE; /* Always complete at end of RunMulti */ result = true; break; default: elog(ERROR, "unrecognized portal strategy: %d", (int) portal->strategy); result = false; /* keep compiler quiet */ break; } } PG_CATCH(); { /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; /* Restore global vars and propagate error */ if (saveMemoryContext == saveTopTransactionContext) MemoryContextSwitchTo(TopTransactionContext); else MemoryContextSwitchTo(saveMemoryContext); ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; if (saveResourceOwner == saveTopTransactionResourceOwner) CurrentResourceOwner = TopTransactionResourceOwner; else CurrentResourceOwner = saveResourceOwner; PortalContext = savePortalContext; QueryContext = saveQueryContext; PG_RE_THROW(); } PG_END_TRY(); if (saveMemoryContext == saveTopTransactionContext) MemoryContextSwitchTo(TopTransactionContext); else MemoryContextSwitchTo(saveMemoryContext); ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; if (saveResourceOwner == saveTopTransactionResourceOwner) CurrentResourceOwner = TopTransactionResourceOwner; else CurrentResourceOwner = saveResourceOwner; PortalContext = savePortalContext; QueryContext = saveQueryContext; if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY) ShowUsage("EXECUTOR STATISTICS"); return result;}/* * PortalRunSelect * Execute a portal's query in SELECT cases (also UTIL_SELECT). * * This handles simple N-rows-forward-or-backward cases. For more complex * nonsequential access to a portal, see PortalRunFetch. * * count <= 0 is interpreted as a no-op: the destination gets started up * and shut down, but nothing else happens. Also, count == FETCH_ALL is * interpreted as "all rows". * * Caller must already have validated the Portal and done appropriate * setup (cf. PortalRun). * * Returns number of rows processed (suitable for use in result tag) */static longPortalRunSelect(Portal portal, bool forward, long count, DestReceiver *dest){ QueryDesc *queryDesc; ScanDirection direction; uint32 nprocessed; /* * NB: queryDesc will be NULL if we are fetching from a held cursor or a * completed utility query; can't use it in that path. */ queryDesc = PortalGetQueryDesc(portal); /* Caller messed up if we have neither a ready query nor held data. */ Assert(queryDesc || portal->holdStore); /* * Force the queryDesc destination to the right thing. This supports * MOVE, for example, which will pass in dest = DestNone. This is okay to * change as long as we do it on every fetch. (The Executor must not * assume that dest never changes.) */ if (queryDesc) queryDesc->dest = dest; /* * Determine which direction to go in, and check to see if we're already * at the end of the available tuples in that direction. If so, set the * direction to NoMovement to avoid trying to fetch any tuples. (This * check exists because not all plan node types are robust about being * called again if they've already returned NULL once.) Then call the * executor (we must not skip this, because the destination needs to see a * setup and shutdown even if no tuples are available). Finally, update * the portal position state depending on the number of tuples that were * retrieved. */ if (forward) { if (portal->atEnd || count <= 0) direction = NoMovementScanDirection; else direction = ForwardScanDirection; /* In the executor, zero count processes all rows */ if (count == FETCH_ALL) count = 0; if (portal->holdStore) nprocessed = RunFromStore(portal, direction, count, dest); else { ActiveSnapshot = queryDesc->snapshot; ExecutorRun(queryDesc, direction, count); nprocessed = queryDesc->estate->es_processed; } if (direction != NoMovementScanDirection) { long oldPos; if (nprocessed > 0) portal->atStart = false; /* OK to go backward now */ if (count == 0 || (unsigned long) nprocessed < (unsigned long) count) portal->atEnd = true; /* we retrieved 'em all */ oldPos = portal->portalPos; portal->portalPos += nprocessed; /* portalPos doesn't advance when we fall off the end */ if (portal->portalPos < oldPos) portal->posOverflow = true; } } else { if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cursor can only scan forward"), errhint("Declare it with SCROLL option to enable backward scan."))); if (portal->atStart || count <= 0) direction = NoMovementScanDirection; else direction = BackwardScanDirection; /* In the executor, zero count processes all rows */ if (count == FETCH_ALL) count = 0; if (portal->holdStore) nprocessed = RunFromStore(portal, direction, count, dest); else { ActiveSnapshot = queryDesc->snapshot; ExecutorRun(queryDesc, direction, count); nprocessed = queryDesc->estate->es_processed; } if (direction != NoMovementScanDirection) { if (nprocessed > 0 && portal->atEnd) { portal->atEnd = false; /* OK to go forward now */ portal->portalPos++; /* adjust for endpoint case */ } if (count == 0 || (unsigned long) nprocessed < (unsigned long) count) { portal->atStart = true; /* we retrieved 'em all */ portal->portalPos = 0; portal->posOverflow = false; } else { long oldPos; oldPos = portal->portalPos; portal->portalPos -= nprocessed; if (portal->portalPos > oldPos || portal->portalPos <= 0) portal->posOverflow = true; } } } return nprocessed;}/* * RunFromStore * Fetch tuples from the portal's tuple store. * * Calling conventions are similar to ExecutorRun, except that we * do not depend on having a queryDesc or estate. Therefore we return the * number of tuples processed as the result, not in estate->es_processed. * * One difference from ExecutorRun is that the destination receiver functions * are run in the caller's memory context (since we have no estate). Watch * out for memory leaks. */static uint32RunFromStore(Portal portal, ScanDirection direction, long count, DestReceiver *dest){ long current_tuple_count = 0; TupleTableSlot *slot; slot = MakeSingleTupleTableSlot(portal->tupDesc); (*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc); if (direction == NoMovementScanDirection) { /* do nothing except start/stop the destination */ } else { bool forward = (direction == ForwardScanDirection); for (;;) { MemoryContext oldcontext; HeapTuple tup; bool should_free; oldcontext = MemoryContextSwitchTo(portal->holdContext); tup = tuplestore_getheaptuple(portal->holdStore, forward, &should_free); MemoryContextSwitchTo(oldcontext); if (tup == NULL) break; ExecStoreTuple(tup, slot, InvalidBuffer, should_free); (*dest->receiveSlot) (slot, dest); ExecClearTuple(slot); /* * check our tuple count.. if we've processed the proper number * then quit, else loop again and process more tuples. Zero count * means no limit. */ current_tuple_count++; if (count && count == current_tuple_count) break; } } (*dest->rShutdown) (dest); ExecDropSingleTupleTableSlot(slot); return (uint32) current_tuple_count;}/* * PortalRunUtility * Execute a utility statement inside a portal. */static voidPortalRunUtility(Portal portal, Query *query, DestReceiver *dest, char *completionTag){ Node *utilityStmt = query->utilityStmt; ereport(DEBUG3, (errmsg_internal("ProcessUtility"))); /* * Set snapshot if utility stmt needs one. Most reliable way to do this * seems to be to enumerate those that do not need one; this is a short * list. Transaction control, LOCK, and SET must *not* set a snapshot * since they need to be executable at the start of a serializable
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -