⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pquery.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 3 页
字号:
PortalSetResultFormat(Portal portal, int nFormats, int16 *formats){	int			natts;	int			i;	/* Do nothing if portal won't return tuples */	if (portal->tupDesc == NULL)		return;	natts = portal->tupDesc->natts;	portal->formats = (int16 *)		MemoryContextAlloc(PortalGetHeapMemory(portal),						   natts * sizeof(int16));	if (nFormats > 1)	{		/* format specified for each column */		if (nFormats != natts)			ereport(ERROR,					(errcode(ERRCODE_PROTOCOL_VIOLATION),					 errmsg("bind message has %d result formats but query has %d columns",							nFormats, natts)));		memcpy(portal->formats, formats, natts * sizeof(int16));	}	else if (nFormats > 0)	{		/* single format specified, use for all columns */		int16		format1 = formats[0];		for (i = 0; i < natts; i++)			portal->formats[i] = format1;	}	else	{		/* use default format for all columns */		for (i = 0; i < natts; i++)			portal->formats[i] = 0;	}}/* * PortalRun *		Run a portal's query or queries. * * count <= 0 is interpreted as a no-op: the destination gets started up * and shut down, but nothing else happens.  Also, count == FETCH_ALL is * interpreted as "all rows".  Note that count is ignored in multi-query * situations, where we always run the portal to completion. * * dest: where to send output of primary (canSetTag) query * * altdest: where to send output of non-primary queries * * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE *		in which to store a command completion status string. *		May be NULL if caller doesn't want a status string. * * Returns TRUE if the portal's execution is complete, FALSE if it was * suspended due to exhaustion of the count parameter. */boolPortalRun(Portal portal, long count,		  DestReceiver *dest, DestReceiver *altdest,		  char *completionTag){	bool		result;	ResourceOwner saveTopTransactionResourceOwner;	MemoryContext saveTopTransactionContext;	Portal		saveActivePortal;	Snapshot	saveActiveSnapshot;	ResourceOwner saveResourceOwner;	MemoryContext savePortalContext;	MemoryContext saveQueryContext;	MemoryContext saveMemoryContext;	AssertArg(PortalIsValid(portal));	/* Initialize completion tag to empty string */	if (completionTag)		completionTag[0] = '\0';	if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)	{		ereport(DEBUG3,				(errmsg_internal("PortalRun")));		/* PORTAL_MULTI_QUERY logs its own stats per query */		ResetUsage();	}	/*	 * Check for improper portal use, and mark portal active.	 */	if (portal->status != PORTAL_READY)		ereport(ERROR,				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),				 errmsg("portal \"%s\" cannot be run", portal->name)));	portal->status = PORTAL_ACTIVE;	/*	 * Set up global portal context pointers.	 *	 * We have to play a special game here to support utility commands like	 * VACUUM and CLUSTER, which internally start and commit transactions.	 * When we are called to execute such a command, CurrentResourceOwner will	 * be pointing to the TopTransactionResourceOwner --- which will be	 * destroyed and replaced in the course of the internal commit and	 * restart.  So we need to be prepared to restore it as pointing to the	 * exit-time TopTransactionResourceOwner.  (Ain't that ugly?  This idea of	 * internally starting whole new transactions is not good.)	 * CurrentMemoryContext has a similar problem, but the other pointers we	 * save here will be NULL or pointing to longer-lived objects.	 */	saveTopTransactionResourceOwner = TopTransactionResourceOwner;	saveTopTransactionContext = TopTransactionContext;	saveActivePortal = ActivePortal;	saveActiveSnapshot = ActiveSnapshot;	saveResourceOwner = CurrentResourceOwner;	savePortalContext = PortalContext;	saveQueryContext = QueryContext;	saveMemoryContext = CurrentMemoryContext;	PG_TRY();	{		ActivePortal = portal;		ActiveSnapshot = NULL;	/* will be set later */		CurrentResourceOwner = portal->resowner;		PortalContext = PortalGetHeapMemory(portal);		QueryContext = portal->queryContext;		MemoryContextSwitchTo(PortalContext);		switch (portal->strategy)		{			case PORTAL_ONE_SELECT:				(void) PortalRunSelect(portal, true, count, dest);				/* we know the query is supposed to set the tag */				if (completionTag && portal->commandTag)					strcpy(completionTag, portal->commandTag);				/* Mark portal not active */				portal->status = PORTAL_READY;				/*				 * Since it's a forward fetch, say DONE iff atEnd is now true.				 */				result = portal->atEnd;				break;			case PORTAL_UTIL_SELECT:				/*				 * If we have not yet run the utility statement, do so,				 * storing its results in the portal's tuplestore.				 */				if (!portal->portalUtilReady)				{					DestReceiver *treceiver;					PortalCreateHoldStore(portal);					treceiver = CreateDestReceiver(DestTuplestore, portal);					PortalRunUtility(portal, linitial(portal->parseTrees),									 treceiver, NULL);					(*treceiver->rDestroy) (treceiver);					portal->portalUtilReady = true;				}				/*				 * Now fetch desired portion of results.				 */				(void) PortalRunSelect(portal, true, count, dest);				/*				 * We know the query is supposed to set the tag; we assume				 * only the default tag is needed.				 */				if (completionTag && portal->commandTag)					strcpy(completionTag, portal->commandTag);				/* Mark portal not active */				portal->status = PORTAL_READY;				/*				 * Since it's a forward fetch, say DONE iff atEnd is now true.				 */				result = portal->atEnd;				break;			case PORTAL_MULTI_QUERY:				PortalRunMulti(portal, dest, altdest, completionTag);				/* Prevent portal's commands from being re-executed */				portal->status = PORTAL_DONE;				/* Always complete at end of RunMulti */				result = true;				break;			default:				elog(ERROR, "unrecognized portal strategy: %d",					 (int) portal->strategy);				result = false; /* keep compiler quiet */				break;		}	}	PG_CATCH();	{		/* Uncaught error while executing portal: mark it dead */		portal->status = PORTAL_FAILED;		/* Restore global vars and propagate error */		if (saveMemoryContext == saveTopTransactionContext)			MemoryContextSwitchTo(TopTransactionContext);		else			MemoryContextSwitchTo(saveMemoryContext);		ActivePortal = saveActivePortal;		ActiveSnapshot = saveActiveSnapshot;		if (saveResourceOwner == saveTopTransactionResourceOwner)			CurrentResourceOwner = TopTransactionResourceOwner;		else			CurrentResourceOwner = saveResourceOwner;		PortalContext = savePortalContext;		QueryContext = saveQueryContext;		PG_RE_THROW();	}	PG_END_TRY();	if (saveMemoryContext == saveTopTransactionContext)		MemoryContextSwitchTo(TopTransactionContext);	else		MemoryContextSwitchTo(saveMemoryContext);	ActivePortal = saveActivePortal;	ActiveSnapshot = saveActiveSnapshot;	if (saveResourceOwner == saveTopTransactionResourceOwner)		CurrentResourceOwner = TopTransactionResourceOwner;	else		CurrentResourceOwner = saveResourceOwner;	PortalContext = savePortalContext;	QueryContext = saveQueryContext;	if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)		ShowUsage("EXECUTOR STATISTICS");	return result;}/* * PortalRunSelect *		Execute a portal's query in SELECT cases (also UTIL_SELECT). * * This handles simple N-rows-forward-or-backward cases.  For more complex * nonsequential access to a portal, see PortalRunFetch. * * count <= 0 is interpreted as a no-op: the destination gets started up * and shut down, but nothing else happens.  Also, count == FETCH_ALL is * interpreted as "all rows". * * Caller must already have validated the Portal and done appropriate * setup (cf. PortalRun). * * Returns number of rows processed (suitable for use in result tag) */static longPortalRunSelect(Portal portal,				bool forward,				long count,				DestReceiver *dest){	QueryDesc  *queryDesc;	ScanDirection direction;	uint32		nprocessed;	/*	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a	 * completed utility query; can't use it in that path.	 */	queryDesc = PortalGetQueryDesc(portal);	/* Caller messed up if we have neither a ready query nor held data. */	Assert(queryDesc || portal->holdStore);	/*	 * Force the queryDesc destination to the right thing.	This supports	 * MOVE, for example, which will pass in dest = DestNone.  This is okay to	 * change as long as we do it on every fetch.  (The Executor must not	 * assume that dest never changes.)	 */	if (queryDesc)		queryDesc->dest = dest;	/*	 * Determine which direction to go in, and check to see if we're already	 * at the end of the available tuples in that direction.  If so, set the	 * direction to NoMovement to avoid trying to fetch any tuples.  (This	 * check exists because not all plan node types are robust about being	 * called again if they've already returned NULL once.)  Then call the	 * executor (we must not skip this, because the destination needs to see a	 * setup and shutdown even if no tuples are available).  Finally, update	 * the portal position state depending on the number of tuples that were	 * retrieved.	 */	if (forward)	{		if (portal->atEnd || count <= 0)			direction = NoMovementScanDirection;		else			direction = ForwardScanDirection;		/* In the executor, zero count processes all rows */		if (count == FETCH_ALL)			count = 0;		if (portal->holdStore)			nprocessed = RunFromStore(portal, direction, count, dest);		else		{			ActiveSnapshot = queryDesc->snapshot;			ExecutorRun(queryDesc, direction, count);			nprocessed = queryDesc->estate->es_processed;		}		if (direction != NoMovementScanDirection)		{			long		oldPos;			if (nprocessed > 0)				portal->atStart = false;		/* OK to go backward now */			if (count == 0 ||				(unsigned long) nprocessed < (unsigned long) count)				portal->atEnd = true;	/* we retrieved 'em all */			oldPos = portal->portalPos;			portal->portalPos += nprocessed;			/* portalPos doesn't advance when we fall off the end */			if (portal->portalPos < oldPos)				portal->posOverflow = true;		}	}	else	{		if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)			ereport(ERROR,					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),					 errmsg("cursor can only scan forward"),					 errhint("Declare it with SCROLL option to enable backward scan.")));		if (portal->atStart || count <= 0)			direction = NoMovementScanDirection;		else			direction = BackwardScanDirection;		/* In the executor, zero count processes all rows */		if (count == FETCH_ALL)			count = 0;		if (portal->holdStore)			nprocessed = RunFromStore(portal, direction, count, dest);		else		{			ActiveSnapshot = queryDesc->snapshot;			ExecutorRun(queryDesc, direction, count);			nprocessed = queryDesc->estate->es_processed;		}		if (direction != NoMovementScanDirection)		{			if (nprocessed > 0 && portal->atEnd)			{				portal->atEnd = false;	/* OK to go forward now */				portal->portalPos++;	/* adjust for endpoint case */			}			if (count == 0 ||				(unsigned long) nprocessed < (unsigned long) count)			{				portal->atStart = true; /* we retrieved 'em all */				portal->portalPos = 0;				portal->posOverflow = false;			}			else			{				long		oldPos;				oldPos = portal->portalPos;				portal->portalPos -= nprocessed;				if (portal->portalPos > oldPos ||					portal->portalPos <= 0)					portal->posOverflow = true;			}		}	}	return nprocessed;}/* * RunFromStore *		Fetch tuples from the portal's tuple store. * * Calling conventions are similar to ExecutorRun, except that we * do not depend on having a queryDesc or estate.  Therefore we return the * number of tuples processed as the result, not in estate->es_processed. * * One difference from ExecutorRun is that the destination receiver functions * are run in the caller's memory context (since we have no estate).  Watch * out for memory leaks. */static uint32RunFromStore(Portal portal, ScanDirection direction, long count,			 DestReceiver *dest){	long		current_tuple_count = 0;	TupleTableSlot *slot;	slot = MakeSingleTupleTableSlot(portal->tupDesc);	(*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);	if (direction == NoMovementScanDirection)	{		/* do nothing except start/stop the destination */	}	else	{		bool		forward = (direction == ForwardScanDirection);		for (;;)		{			MemoryContext oldcontext;			HeapTuple	tup;			bool		should_free;			oldcontext = MemoryContextSwitchTo(portal->holdContext);			tup = tuplestore_getheaptuple(portal->holdStore, forward,										  &should_free);			MemoryContextSwitchTo(oldcontext);			if (tup == NULL)				break;			ExecStoreTuple(tup, slot, InvalidBuffer, should_free);			(*dest->receiveSlot) (slot, dest);			ExecClearTuple(slot);			/*			 * check our tuple count.. if we've processed the proper number			 * then quit, else loop again and process more tuples. Zero count			 * means no limit.			 */			current_tuple_count++;			if (count && count == current_tuple_count)				break;		}	}	(*dest->rShutdown) (dest);	ExecDropSingleTupleTableSlot(slot);	return (uint32) current_tuple_count;}/* * PortalRunUtility *		Execute a utility statement inside a portal. */static voidPortalRunUtility(Portal portal, Query *query,				 DestReceiver *dest, char *completionTag){	Node	   *utilityStmt = query->utilityStmt;	ereport(DEBUG3,			(errmsg_internal("ProcessUtility")));	/*	 * Set snapshot if utility stmt needs one.	Most reliable way to do this	 * seems to be to enumerate those that do not need one; this is a short	 * list.  Transaction control, LOCK, and SET must *not* set a snapshot	 * since they need to be executable at the start of a serializable

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -