pquery.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 1,183 行 · 第 1/3 页

C
1,183
字号
{	bool		result;	MemoryContext savePortalContext;	MemoryContext saveQueryContext;	MemoryContext oldContext;	AssertArg(PortalIsValid(portal));	AssertState(portal->portalReady);	/* else no PortalStart */	/* Initialize completion tag to empty string */	if (completionTag)		completionTag[0] = '\0';	if (portal->strategy != PORTAL_MULTI_QUERY)	{		ereport(DEBUG3,			(errmsg_internal("PortalRun")));		/* PORTAL_MULTI_QUERY logs its own stats per query */		if (log_executor_stats)			ResetUsage();	}		if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)	/*	 * Check for improper portal use, and mark portal active.	 */	if (portal->portalDone)		ereport(ERROR,				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),		   errmsg("portal \"%s\" cannot be run anymore", portal->name)));	if (portal->portalActive)		ereport(ERROR,				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),				 errmsg("portal \"%s\" already active", portal->name)));	portal->portalActive = true;	/*	 * Set global portal context pointers.	 */	savePortalContext = PortalContext;	PortalContext = PortalGetHeapMemory(portal);	saveQueryContext = QueryContext;	QueryContext = portal->queryContext;	oldContext = MemoryContextSwitchTo(PortalContext);	switch (portal->strategy)	{		case PORTAL_ONE_SELECT:			(void) PortalRunSelect(portal, true, count, dest);			/* we know the query is supposed to set the tag */			if (completionTag && portal->commandTag)				strcpy(completionTag, portal->commandTag);			/*			 * Since it's a forward fetch, say DONE iff atEnd is now true.			 */			result = portal->atEnd;			break;		case PORTAL_UTIL_SELECT:			/*			 * If we have not yet run the utility statement, do so,			 * storing its results in the portal's tuplestore.			 */			if (!portal->portalUtilReady)			{				DestReceiver *treceiver;				PortalCreateHoldStore(portal);				treceiver = CreateDestReceiver(Tuplestore, portal);				PortalRunUtility(portal, lfirst(portal->parseTrees),								 treceiver, NULL);				(*treceiver->rDestroy) (treceiver);				portal->portalUtilReady = true;			}			/*			 * Now fetch desired portion of results.			 */			(void) PortalRunSelect(portal, true, count, dest);			/*			 * We know the query is supposed to set the tag; we assume			 * only the default tag is needed.			 */			if (completionTag && portal->commandTag)				strcpy(completionTag, portal->commandTag);			/*			 * Since it's a forward fetch, say DONE iff atEnd is now true.			 */			result = portal->atEnd;			break;		case PORTAL_MULTI_QUERY:			PortalRunMulti(portal, dest, altdest, completionTag);			/* Always complete at end of RunMulti */			result = true;			break;		default:			elog(ERROR, "unrecognized portal strategy: %d",				 (int) portal->strategy);			result = false;		/* keep compiler quiet */			break;	}	MemoryContextSwitchTo(oldContext);	/* Mark portal not active */	portal->portalActive = false;	PortalContext = savePortalContext;	QueryContext = saveQueryContext;	if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)		ShowUsage("EXECUTOR STATISTICS");	return result;}/* * PortalRunSelect *		Execute a portal's query in SELECT cases (also UTIL_SELECT). * * This handles simple N-rows-forward-or-backward cases.  For more complex * nonsequential access to a portal, see PortalRunFetch. * * count <= 0 is interpreted as a no-op: the destination gets started up * and shut down, but nothing else happens.  Also, count == FETCH_ALL is * interpreted as "all rows". * * Caller must already have validated the Portal and done appropriate * setup (cf. PortalRun). * * Returns number of rows processed (suitable for use in result tag) */static longPortalRunSelect(Portal portal,				bool forward,				long count,				DestReceiver *dest){	QueryDesc  *queryDesc;	ScanDirection direction;	uint32		nprocessed;	/*	 * NB: queryDesc will be NULL if we are fetching from a held cursor or	 * a completed utility query; can't use it in that path.	 */	queryDesc = PortalGetQueryDesc(portal);	/* Caller messed up if we have neither a ready query nor held data. */	Assert(queryDesc || portal->holdStore);	/*	 * Force the queryDesc destination to the right thing.	This supports	 * MOVE, for example, which will pass in dest = None.  This is okay to	 * change as long as we do it on every fetch.  (The Executor must not	 * assume that dest never changes.)	 */	if (queryDesc)		queryDesc->dest = dest;	/*	 * Determine which direction to go in, and check to see if we're	 * already at the end of the available tuples in that direction.  If	 * so, set the direction to NoMovement to avoid trying to fetch any	 * tuples.	(This check exists because not all plan node types are	 * robust about being called again if they've already returned NULL	 * once.)  Then call the executor (we must not skip this, because the	 * destination needs to see a setup and shutdown even if no tuples are	 * available).	Finally, update the portal position state depending on	 * the number of tuples that were retrieved.	 */	if (forward)	{		if (portal->atEnd || count <= 0)			direction = NoMovementScanDirection;		else			direction = ForwardScanDirection;		/* In the executor, zero count processes all rows */		if (count == FETCH_ALL)			count = 0;		if (portal->holdStore)			nprocessed = RunFromStore(portal, direction, count, dest);		else		{			ExecutorRun(queryDesc, direction, count);			nprocessed = queryDesc->estate->es_processed;		}		if (direction != NoMovementScanDirection)		{			long		oldPos;			if (nprocessed > 0)				portal->atStart = false;		/* OK to go backward now */			if (count == 0 ||				(unsigned long) nprocessed < (unsigned long) count)				portal->atEnd = true;	/* we retrieved 'em all */			oldPos = portal->portalPos;			portal->portalPos += nprocessed;			/* portalPos doesn't advance when we fall off the end */			if (portal->portalPos < oldPos)				portal->posOverflow = true;		}	}	else	{		if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)			ereport(ERROR,					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),					 errmsg("cursor can only scan forward"),					 errhint("Declare it with SCROLL option to enable backward scan.")));		if (portal->atStart || count <= 0)			direction = NoMovementScanDirection;		else			direction = BackwardScanDirection;		/* In the executor, zero count processes all rows */		if (count == FETCH_ALL)			count = 0;		if (portal->holdStore)			nprocessed = RunFromStore(portal, direction, count, dest);		else		{			ExecutorRun(queryDesc, direction, count);			nprocessed = queryDesc->estate->es_processed;		}		if (direction != NoMovementScanDirection)		{			if (nprocessed > 0 && portal->atEnd)			{				portal->atEnd = false;	/* OK to go forward now */				portal->portalPos++;	/* adjust for endpoint case */			}			if (count == 0 ||				(unsigned long) nprocessed < (unsigned long) count)			{				portal->atStart = true; /* we retrieved 'em all */				portal->portalPos = 0;				portal->posOverflow = false;			}			else			{				long		oldPos;				oldPos = portal->portalPos;				portal->portalPos -= nprocessed;				if (portal->portalPos > oldPos ||					portal->portalPos <= 0)					portal->posOverflow = true;			}		}	}	return nprocessed;}/* * RunFromStore *		Fetch tuples from the portal's tuple store. * * Calling conventions are similar to ExecutorRun, except that we * do not depend on having a queryDesc or estate.  Therefore we return the * number of tuples processed as the result, not in estate->es_processed. * * One difference from ExecutorRun is that the destination receiver functions * are run in the caller's memory context (since we have no estate).  Watch * out for memory leaks. */static uint32RunFromStore(Portal portal, ScanDirection direction, long count,			 DestReceiver *dest){	long		current_tuple_count = 0;	(*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);	if (direction == NoMovementScanDirection)	{		/* do nothing except start/stop the destination */	}	else	{		bool		forward = (direction == ForwardScanDirection);		for (;;)		{			MemoryContext oldcontext;			HeapTuple	tup;			bool		should_free;			oldcontext = MemoryContextSwitchTo(portal->holdContext);			tup = tuplestore_getheaptuple(portal->holdStore, forward,										  &should_free);			MemoryContextSwitchTo(oldcontext);			if (tup == NULL)				break;			(*dest->receiveTuple) (tup, portal->tupDesc, dest);			if (should_free)				pfree(tup);			/*			 * check our tuple count.. if we've processed the proper			 * number then quit, else loop again and process more tuples.			 * Zero count means no limit.			 */			current_tuple_count++;			if (count && count == current_tuple_count)				break;		}	}	(*dest->rShutdown) (dest);	return (uint32) current_tuple_count;}/* * PortalRunUtility *		Execute a utility statement inside a portal. */static voidPortalRunUtility(Portal portal, Query *query,				 DestReceiver *dest, char *completionTag){	Node	   *utilityStmt = query->utilityStmt;	ereport(DEBUG3,			(errmsg_internal("ProcessUtility")));	/*	 * Set snapshot if utility stmt needs one.	Most reliable way to do	 * this seems to be to enumerate those that do not need one; this is a	 * short list.	Transaction control, LOCK, and SET must *not* set a	 * snapshot since they need to be executable at the start of a	 * serializable transaction without freezing a snapshot.  By extension	 * we allow SHOW not to set a snapshot.  The other stmts listed are	 * just efficiency hacks.  Beware of listing anything that can modify	 * the database --- if, say, it has to update an index with	 * expressions that invoke user-defined functions, then it had better	 * have a snapshot.	 */	if (!(IsA(utilityStmt, TransactionStmt) ||		  IsA(utilityStmt, LockStmt) ||		  IsA(utilityStmt, VariableSetStmt) ||		  IsA(utilityStmt, VariableShowStmt) ||		  IsA(utilityStmt, VariableResetStmt) ||		  IsA(utilityStmt, ConstraintsSetStmt) ||	/* efficiency hacks from here down */		  IsA(utilityStmt, FetchStmt) ||		  IsA(utilityStmt, ListenStmt) ||		  IsA(utilityStmt, NotifyStmt) ||		  IsA(utilityStmt, UnlistenStmt) ||		  IsA(utilityStmt, CheckPointStmt)))		SetQuerySnapshot();	if (query->canSetTag)	{		/* utility statement can override default tag string */		ProcessUtility(utilityStmt, dest, completionTag);		if (completionTag && completionTag[0] == '\0' && portal->commandTag)			strcpy(completionTag, portal->commandTag);	/* use the default */	}	else	{		/* utility added by rewrite cannot set tag */		ProcessUtility(utilityStmt, dest, NULL);	}	/* Some utility statements may change context on us */	MemoryContextSwitchTo(PortalGetHeapMemory(portal));}/* * PortalRunMulti *		Execute a portal's queries in the general case (multi queries). */static void

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?