pquery.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 1,183 行 · 第 1/3 页

C
1,183
字号
PortalRunMulti(Portal portal,			   DestReceiver *dest, DestReceiver *altdest,			   char *completionTag){	List	   *plantree_list = portal->planTrees;	List	   *querylist_item;	/*	 * If the destination is RemoteExecute, change to None.  The reason is	 * that the client won't be expecting any tuples, and indeed has no	 * way to know what they are, since there is no provision for Describe	 * to send a RowDescription message when this portal execution	 * strategy is in effect.  This presently will only affect SELECT	 * commands added to non-SELECT queries by rewrite rules: such	 * commands will be executed, but the results will be discarded unless	 * you use "simple Query" protocol.	 */	if (dest->mydest == RemoteExecute)		dest = None_Receiver;	if (altdest->mydest == RemoteExecute)		altdest = None_Receiver;	/*	 * Loop to handle the individual queries generated from a single	 * parsetree by analysis and rewrite.	 */	foreach(querylist_item, portal->parseTrees)	{		Query	   *query = (Query *) lfirst(querylist_item);		Plan	   *plan = (Plan *) lfirst(plantree_list);		plantree_list = lnext(plantree_list);		/*		 * If we got a cancel signal in prior command, quit		 */		CHECK_FOR_INTERRUPTS();		if (query->commandType == CMD_UTILITY)		{			/*			 * process utility functions (create, destroy, etc..)			 */			Assert(plan == NULL);			PortalRunUtility(portal, query,							 query->canSetTag ? dest : altdest,							 completionTag);		}		else		{			/*			 * process a plannable query.			 */			ereport(DEBUG3,					(errmsg_internal("ProcessQuery")));			/* Must always set snapshot for plannable queries */			SetQuerySnapshot();			/*			 * execute the plan			 */			if (log_executor_stats)				ResetUsage();			if (query->canSetTag)			{				/* statement can set tag string */				ProcessQuery(query, plan,							 portal->portalParams,							 dest, completionTag);			}			else			{				/* stmt added by rewrite cannot set tag */				ProcessQuery(query, plan,							 portal->portalParams,							 altdest, NULL);			}			if (log_executor_stats)				ShowUsage("EXECUTOR STATISTICS");		}		/*		 * Increment command counter between queries, but not after the		 * last one.		 */		if (plantree_list != NIL)			CommandCounterIncrement();		/*		 * Clear subsidiary contexts to recover temporary memory.		 */		Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);		MemoryContextDeleteChildren(PortalGetHeapMemory(portal));	}	/*	 * If a command completion tag was supplied, use it.  Otherwise use	 * the portal's commandTag as the default completion tag.	 *	 * Exception: clients will expect INSERT/UPDATE/DELETE tags to have	 * counts, so fake something up if necessary.  (This could happen if	 * the original query was replaced by a DO INSTEAD rule.)	 */	if (completionTag && completionTag[0] == '\0')	{		if (portal->commandTag)			strcpy(completionTag, portal->commandTag);		if (strcmp(completionTag, "INSERT") == 0)			strcpy(completionTag, "INSERT 0 0");		else if (strcmp(completionTag, "UPDATE") == 0)			strcpy(completionTag, "UPDATE 0");		else if (strcmp(completionTag, "DELETE") == 0)			strcpy(completionTag, "DELETE 0");	}	/* Prevent portal's commands from being re-executed */	portal->portalDone = true;}/* * PortalRunFetch *		Variant form of PortalRun that supports SQL FETCH directions. * * Returns number of rows processed (suitable for use in result tag) */longPortalRunFetch(Portal portal,			   FetchDirection fdirection,			   long count,			   DestReceiver *dest){	long		result;	MemoryContext savePortalContext;	MemoryContext saveQueryContext;	MemoryContext oldContext;	AssertArg(PortalIsValid(portal));	AssertState(portal->portalReady);	/* else no PortalStart */	/*	 * Check for improper portal use, and mark portal active.	 */	if (portal->portalDone)		ereport(ERROR,				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),		   errmsg("portal \"%s\" cannot be run anymore", portal->name)));	if (portal->portalActive)		ereport(ERROR,				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),				 errmsg("portal \"%s\" already active", portal->name)));	portal->portalActive = true;	/*	 * Set global portal context pointers.	 */	savePortalContext = PortalContext;	PortalContext = PortalGetHeapMemory(portal);	saveQueryContext = QueryContext;	QueryContext = portal->queryContext;	oldContext = MemoryContextSwitchTo(PortalContext);	switch (portal->strategy)	{		case PORTAL_ONE_SELECT:			result = DoPortalRunFetch(portal, fdirection, count, dest);			break;		default:			elog(ERROR, "unsupported portal strategy");			result = 0;			/* keep compiler quiet */			break;	}	MemoryContextSwitchTo(oldContext);	/* Mark portal not active */	portal->portalActive = false;	PortalContext = savePortalContext;	QueryContext = saveQueryContext;	return result;}/* * DoPortalRunFetch *		Guts of PortalRunFetch --- the portal context is already set up * * Returns number of rows processed (suitable for use in result tag) */static longDoPortalRunFetch(Portal portal,				 FetchDirection fdirection,				 long count,				 DestReceiver *dest){	bool		forward;	Assert(portal->strategy == PORTAL_ONE_SELECT);	switch (fdirection)	{		case FETCH_FORWARD:			if (count < 0)			{				fdirection = FETCH_BACKWARD;				count = -count;			}			/* fall out of switch to share code with FETCH_BACKWARD */			break;		case FETCH_BACKWARD:			if (count < 0)			{				fdirection = FETCH_FORWARD;				count = -count;			}			/* fall out of switch to share code with FETCH_FORWARD */			break;		case FETCH_ABSOLUTE:			if (count > 0)			{				/*				 * Definition: Rewind to start, advance count-1 rows,				 * return next row (if any).  In practice, if the goal is				 * less than halfway back to the start, it's better to				 * scan from where we are.	In any case, we arrange to				 * fetch the target row going forwards.				 */				if (portal->posOverflow || portal->portalPos == LONG_MAX ||					count - 1 <= portal->portalPos / 2)				{					DoPortalRewind(portal);					if (count > 1)						PortalRunSelect(portal, true, count - 1,										None_Receiver);				}				else				{					long		pos = portal->portalPos;					if (portal->atEnd)						pos++;	/* need one extra fetch if off end */					if (count <= pos)						PortalRunSelect(portal, false, pos - count + 1,										None_Receiver);					else if (count > pos + 1)						PortalRunSelect(portal, true, count - pos - 1,										None_Receiver);				}				return PortalRunSelect(portal, true, 1L, dest);			}			else if (count < 0)			{				/*				 * Definition: Advance to end, back up abs(count)-1 rows,				 * return prior row (if any).  We could optimize this if				 * we knew in advance where the end was, but typically we				 * won't. (Is it worth considering case where count > half				 * of size of query?  We could rewind once we know the				 * size ...)				 */				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);				if (count < -1)					PortalRunSelect(portal, false, -count - 1, None_Receiver);				return PortalRunSelect(portal, false, 1L, dest);			}			else/* count == 0 */			{				/* Rewind to start, return zero rows */				DoPortalRewind(portal);				return PortalRunSelect(portal, true, 0L, dest);			}			break;		case FETCH_RELATIVE:			if (count > 0)			{				/*				 * Definition: advance count-1 rows, return next row (if				 * any).				 */				if (count > 1)					PortalRunSelect(portal, true, count - 1, None_Receiver);				return PortalRunSelect(portal, true, 1L, dest);			}			else if (count < 0)			{				/*				 * Definition: back up abs(count)-1 rows, return prior row				 * (if any).				 */				if (count < -1)					PortalRunSelect(portal, false, -count - 1, None_Receiver);				return PortalRunSelect(portal, false, 1L, dest);			}			else/* count == 0 */			{				/* Same as FETCH FORWARD 0, so fall out of switch */				fdirection = FETCH_FORWARD;			}			break;		default:			elog(ERROR, "bogus direction");			break;	}	/*	 * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and	 * count >= 0.	 */	forward = (fdirection == FETCH_FORWARD);	/*	 * Zero count means to re-fetch the current row, if any (per SQL92)	 */	if (count == 0)	{		bool		on_row;		/* Are we sitting on a row? */		on_row = (!portal->atStart && !portal->atEnd);		if (dest->mydest == None)		{			/* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */			return on_row ? 1L : 0L;		}		else		{			/*			 * If we are sitting on a row, back up one so we can re-fetch			 * it. If we are not sitting on a row, we still have to start			 * up and shut down the executor so that the destination is			 * initialized and shut down correctly; so keep going.	To			 * PortalRunSelect, count == 0 means we will retrieve no row.			 */			if (on_row)			{				PortalRunSelect(portal, false, 1L, None_Receiver);				/* Set up to fetch one row forward */				count = 1;				forward = true;			}		}	}	/*	 * Optimize MOVE BACKWARD ALL into a Rewind.	 */	if (!forward && count == FETCH_ALL && dest->mydest == None)	{		long		result = portal->portalPos;		if (result > 0 && !portal->atEnd)			result--;		DoPortalRewind(portal);		/* result is bogus if pos had overflowed, but it's best we can do */		return result;	}	return PortalRunSelect(portal, forward, count, dest);}/* * DoPortalRewind - rewind a Portal to starting point */static voidDoPortalRewind(Portal portal){	if (portal->holdStore)	{		MemoryContext oldcontext;		oldcontext = MemoryContextSwitchTo(portal->holdContext);		tuplestore_rescan(portal->holdStore);		MemoryContextSwitchTo(oldcontext);	}	if (PortalGetQueryDesc(portal))		ExecutorRewind(PortalGetQueryDesc(portal));	portal->atStart = true;	portal->atEnd = false;	portal->portalPos = 0;	portal->posOverflow = false;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?