📄 pquery.c
字号:
* transaction without freezing a snapshot. By extension we allow SHOW * not to set a snapshot. The other stmts listed are just efficiency * hacks. Beware of listing anything that can modify the database --- if, * say, it has to update an index with expressions that invoke * user-defined functions, then it had better have a snapshot. * * Note we assume that caller will take care of restoring ActiveSnapshot * on exit/error. */ if (!(IsA(utilityStmt, TransactionStmt) || IsA(utilityStmt, LockStmt) || IsA(utilityStmt, VariableSetStmt) || IsA(utilityStmt, VariableShowStmt) || IsA(utilityStmt, VariableResetStmt) || IsA(utilityStmt, ConstraintsSetStmt) || /* efficiency hacks from here down */ IsA(utilityStmt, FetchStmt) || IsA(utilityStmt, ListenStmt) || IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || IsA(utilityStmt, CheckPointStmt))) ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); else ActiveSnapshot = NULL; if (query->canSetTag) { /* utility statement can override default tag string */ ProcessUtility(utilityStmt, portal->portalParams, dest, completionTag); if (completionTag && completionTag[0] == '\0' && portal->commandTag) strcpy(completionTag, portal->commandTag); /* use the default */ } else { /* utility added by rewrite cannot set tag */ ProcessUtility(utilityStmt, portal->portalParams, dest, NULL); } /* Some utility statements may change context on us */ MemoryContextSwitchTo(PortalGetHeapMemory(portal)); if (ActiveSnapshot) FreeSnapshot(ActiveSnapshot); ActiveSnapshot = NULL;}/* * PortalRunMulti * Execute a portal's queries in the general case (multi queries). */static voidPortalRunMulti(Portal portal, DestReceiver *dest, DestReceiver *altdest, char *completionTag){ ListCell *querylist_item; ListCell *planlist_item; /* * If the destination is DestRemoteExecute, change to DestNone. The * reason is that the client won't be expecting any tuples, and indeed has * no way to know what they are, since there is no provision for Describe * to send a RowDescription message when this portal execution strategy is * in effect. This presently will only affect SELECT commands added to * non-SELECT queries by rewrite rules: such commands will be executed, * but the results will be discarded unless you use "simple Query" * protocol. */ if (dest->mydest == DestRemoteExecute) dest = None_Receiver; if (altdest->mydest == DestRemoteExecute) altdest = None_Receiver; /* * Loop to handle the individual queries generated from a single parsetree * by analysis and rewrite. */ forboth(querylist_item, portal->parseTrees, planlist_item, portal->planTrees) { Query *query = (Query *) lfirst(querylist_item); Plan *plan = (Plan *) lfirst(planlist_item); /* * If we got a cancel signal in prior command, quit */ CHECK_FOR_INTERRUPTS(); if (query->commandType == CMD_UTILITY) { /* * process utility functions (create, destroy, etc..) */ Assert(plan == NULL); PortalRunUtility(portal, query, query->canSetTag ? dest : altdest, completionTag); } else { /* * process a plannable query. */ if (log_executor_stats) ResetUsage(); if (query->canSetTag) { /* statement can set tag string */ ProcessQuery(query, plan, portal->portalParams, dest, completionTag); } else { /* stmt added by rewrite cannot set tag */ ProcessQuery(query, plan, portal->portalParams, altdest, NULL); } if (log_executor_stats) ShowUsage("EXECUTOR STATISTICS"); } /* * Increment command counter between queries, but not after the last * one. */ if (lnext(planlist_item) != NULL) CommandCounterIncrement(); /* * Clear subsidiary contexts to recover temporary memory. */ Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext); MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } /* * If a command completion tag was supplied, use it. Otherwise use the * portal's commandTag as the default completion tag. * * Exception: clients will expect INSERT/UPDATE/DELETE tags to have * counts, so fake something up if necessary. (This could happen if the * original query was replaced by a DO INSTEAD rule.) */ if (completionTag && completionTag[0] == '\0') { if (portal->commandTag) strcpy(completionTag, portal->commandTag); if (strcmp(completionTag, "INSERT") == 0) strcpy(completionTag, "INSERT 0 0"); else if (strcmp(completionTag, "UPDATE") == 0) strcpy(completionTag, "UPDATE 0"); else if (strcmp(completionTag, "DELETE") == 0) strcpy(completionTag, "DELETE 0"); }}/* * PortalRunFetch * Variant form of PortalRun that supports SQL FETCH directions. * * Returns number of rows processed (suitable for use in result tag) */longPortalRunFetch(Portal portal, FetchDirection fdirection, long count, DestReceiver *dest){ long result; Portal saveActivePortal; Snapshot saveActiveSnapshot; ResourceOwner saveResourceOwner; MemoryContext savePortalContext; MemoryContext saveQueryContext; MemoryContext oldContext; AssertArg(PortalIsValid(portal)); /* * Check for improper portal use, and mark portal active. */ if (portal->status != PORTAL_READY) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("portal \"%s\" cannot be run", portal->name))); portal->status = PORTAL_ACTIVE; /* * Set up global portal context pointers. */ saveActivePortal = ActivePortal; saveActiveSnapshot = ActiveSnapshot; saveResourceOwner = CurrentResourceOwner; savePortalContext = PortalContext; saveQueryContext = QueryContext; PG_TRY(); { ActivePortal = portal; ActiveSnapshot = NULL; /* will be set later */ CurrentResourceOwner = portal->resowner; PortalContext = PortalGetHeapMemory(portal); QueryContext = portal->queryContext; oldContext = MemoryContextSwitchTo(PortalContext); switch (portal->strategy) { case PORTAL_ONE_SELECT: result = DoPortalRunFetch(portal, fdirection, count, dest); break; case PORTAL_UTIL_SELECT: /* * If we have not yet run the utility statement, do so, * storing its results in the portal's tuplestore. */ if (!portal->portalUtilReady) { DestReceiver *treceiver; PortalCreateHoldStore(portal); treceiver = CreateDestReceiver(DestTuplestore, portal); PortalRunUtility(portal, linitial(portal->parseTrees), treceiver, NULL); (*treceiver->rDestroy) (treceiver); portal->portalUtilReady = true; } /* * Now fetch desired portion of results. */ result = DoPortalRunFetch(portal, fdirection, count, dest); break; default: elog(ERROR, "unsupported portal strategy"); result = 0; /* keep compiler quiet */ break; } } PG_CATCH(); { /* Uncaught error while executing portal: mark it dead */ portal->status = PORTAL_FAILED; /* Restore global vars and propagate error */ ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; CurrentResourceOwner = saveResourceOwner; PortalContext = savePortalContext; QueryContext = saveQueryContext; PG_RE_THROW(); } PG_END_TRY(); MemoryContextSwitchTo(oldContext); /* Mark portal not active */ portal->status = PORTAL_READY; ActivePortal = saveActivePortal; ActiveSnapshot = saveActiveSnapshot; CurrentResourceOwner = saveResourceOwner; PortalContext = savePortalContext; QueryContext = saveQueryContext; return result;}/* * DoPortalRunFetch * Guts of PortalRunFetch --- the portal context is already set up * * Returns number of rows processed (suitable for use in result tag) */static longDoPortalRunFetch(Portal portal, FetchDirection fdirection, long count, DestReceiver *dest){ bool forward; Assert(portal->strategy == PORTAL_ONE_SELECT || portal->strategy == PORTAL_UTIL_SELECT); switch (fdirection) { case FETCH_FORWARD: if (count < 0) { fdirection = FETCH_BACKWARD; count = -count; } /* fall out of switch to share code with FETCH_BACKWARD */ break; case FETCH_BACKWARD: if (count < 0) { fdirection = FETCH_FORWARD; count = -count; } /* fall out of switch to share code with FETCH_FORWARD */ break; case FETCH_ABSOLUTE: if (count > 0) { /* * Definition: Rewind to start, advance count-1 rows, return * next row (if any). In practice, if the goal is less than * halfway back to the start, it's better to scan from where * we are. In any case, we arrange to fetch the target row * going forwards. */ if (portal->posOverflow || portal->portalPos == LONG_MAX || count - 1 <= portal->portalPos / 2) { DoPortalRewind(portal); if (count > 1) PortalRunSelect(portal, true, count - 1, None_Receiver); } else { long pos = portal->portalPos; if (portal->atEnd) pos++; /* need one extra fetch if off end */ if (count <= pos) PortalRunSelect(portal, false, pos - count + 1, None_Receiver); else if (count > pos + 1) PortalRunSelect(portal, true, count - pos - 1, None_Receiver); } return PortalRunSelect(portal, true, 1L, dest); } else if (count < 0) { /* * Definition: Advance to end, back up abs(count)-1 rows, * return prior row (if any). We could optimize this if we * knew in advance where the end was, but typically we won't. * (Is it worth considering case where count > half of size of * query? We could rewind once we know the size ...) */ PortalRunSelect(portal, true, FETCH_ALL, None_Receiver); if (count < -1) PortalRunSelect(portal, false, -count - 1, None_Receiver); return PortalRunSelect(portal, false, 1L, dest); } else { /* count == 0 */ /* Rewind to start, return zero rows */ DoPortalRewind(portal); return PortalRunSelect(portal, true, 0L, dest); } break; case FETCH_RELATIVE: if (count > 0) { /* * Definition: advance count-1 rows, return next row (if any). */ if (count > 1) PortalRunSelect(portal, true, count - 1, None_Receiver); return PortalRunSelect(portal, true, 1L, dest); } else if (count < 0) { /* * Definition: back up abs(count)-1 rows, return prior row (if * any). */ if (count < -1) PortalRunSelect(portal, false, -count - 1, None_Receiver); return PortalRunSelect(portal, false, 1L, dest); } else { /* count == 0 */ /* Same as FETCH FORWARD 0, so fall out of switch */ fdirection = FETCH_FORWARD; } break; default: elog(ERROR, "bogus direction"); break; } /* * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count * >= 0. */ forward = (fdirection == FETCH_FORWARD); /* * Zero count means to re-fetch the current row, if any (per SQL92) */ if (count == 0) { bool on_row; /* Are we sitting on a row? */ on_row = (!portal->atStart && !portal->atEnd); if (dest->mydest == DestNone) { /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */ return on_row ? 1L : 0L; } else { /* * If we are sitting on a row, back up one so we can re-fetch it. * If we are not sitting on a row, we still have to start up and * shut down the executor so that the destination is initialized * and shut down correctly; so keep going. To PortalRunSelect, * count == 0 means we will retrieve no row. */ if (on_row) { PortalRunSelect(portal, false, 1L, None_Receiver); /* Set up to fetch one row forward */ count = 1; forward = true; } } } /* * Optimize MOVE BACKWARD ALL into a Rewind. */ if (!forward && count == FETCH_ALL && dest->mydest == DestNone) { long result = portal->portalPos; if (result > 0 && !portal->atEnd) result--; DoPortalRewind(portal); /* result is bogus if pos had overflowed, but it's best we can do */ return result; } return PortalRunSelect(portal, forward, count, dest);}/* * DoPortalRewind - rewind a Portal to starting point */static voidDoPortalRewind(Portal portal){ if (portal->holdStore) { MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(portal->holdContext); tuplestore_rescan(portal->holdStore); MemoryContextSwitchTo(oldcontext); } if (PortalGetQueryDesc(portal)) ExecutorRewind(PortalGetQueryDesc(portal)); portal->atStart = true; portal->atEnd = false; portal->portalPos = 0; portal->posOverflow = false;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -