async.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 948 行 · 第 1/2 页

C
948
字号
		{			/*			 * Self-notify: no need to bother with table update. Indeed,			 * we *must not* clear the notification field in this path, or			 * we could lose an outside notify, which'd be bad for			 * applications that ignore self-notify messages.			 */			if (Trace_notify)				elog(DEBUG1, "AtCommit_Notify: notifying self");			NotifyMyFrontEnd(relname, listenerPID);		}		else		{			if (Trace_notify)				elog(DEBUG1, "AtCommit_Notify: notifying pid %d",					 listenerPID);			/*			 * If someone has already notified this listener, we don't			 * bother modifying the table, but we do still send a SIGUSR2			 * signal, just in case that backend missed the earlier signal			 * for some reason.  It's OK to send the signal first, because			 * the other guy can't read pg_listener until we unlock it.			 */			if (kill(listenerPID, SIGUSR2) < 0)			{				/*				 * Get rid of pg_listener entry if it refers to a PID that				 * no longer exists.  Presumably, that backend crashed				 * without deleting its pg_listener entries. This code				 * used to only delete the entry if errno==ESRCH, but as				 * far as I can see we should just do it for any failure				 * (certainly at least for EPERM too...)				 */				simple_heap_delete(lRel, &lTuple->t_self);			}			else if (listener->notification == 0)			{				ItemPointerData ctid;				int			result;				rTuple = heap_modifytuple(lTuple, lRel,										  value, nulls, repl);				/*				 * We cannot use simple_heap_update here because the tuple				 * could have been modified by an uncommitted transaction;				 * specifically, since UNLISTEN releases exclusive lock on				 * the table before commit, the other guy could already have				 * tried to unlisten.  There are no other cases where we				 * should be able to see an uncommitted update or delete.				 * Therefore, our response to a HeapTupleBeingUpdated result				 * is just to ignore it.  We do *not* wait for the other				 * guy to commit --- that would risk deadlock, and we don't				 * want to block while holding the table lock anyway for				 * performance reasons.  We also ignore HeapTupleUpdated,				 * which could occur if the other guy commits between our				 * heap_getnext and heap_update calls.				 */				result = heap_update(lRel, &lTuple->t_self, rTuple,									 &ctid,									 GetCurrentCommandId(), SnapshotAny,									 false /* no wait for commit */);				switch (result)				{					case HeapTupleSelfUpdated:						/* Tuple was already updated in current command? */						elog(ERROR, "tuple already updated by self");						break;					case HeapTupleMayBeUpdated:						/* done successfully */#ifdef NOT_USED					/* currently there are no indexes */						CatalogUpdateIndexes(lRel, rTuple);#endif						break;					case HeapTupleBeingUpdated:						/* ignore uncommitted tuples */						break;					case HeapTupleUpdated:						/* ignore just-committed tuples */						break;					default:						elog(ERROR, "unrecognized heap_update status: %u",							 result);						break;				}			}		}	}	heap_endscan(scan);	/*	 * We do NOT release the lock on pg_listener here; we need to hold it	 * until end of transaction (which is about to happen, anyway) to	 * ensure that notified backends see our tuple updates when they look.	 * Else they might disregard the signal, which would make the	 * application programmer very unhappy.	 */	heap_close(lRel, NoLock);	ClearPendingNotifies();	if (Trace_notify)		elog(DEBUG1, "AtCommit_Notify: done");}/* *-------------------------------------------------------------- * AtAbort_Notify * *		This is called at transaction abort. * *		Gets rid of pending outbound notifies that we would have executed *		if the transaction got committed. * * Results: *		XXX * *-------------------------------------------------------------- */voidAtAbort_Notify(void){	ClearPendingNotifies();}/* *-------------------------------------------------------------- * Async_NotifyHandler * *		This is the signal handler for SIGUSR2. * *		If we are idle (notifyInterruptEnabled is set), we can safely invoke *		ProcessIncomingNotify directly.  Otherwise, just set a flag *		to do it later. * * Results: *		none * * Side effects: *		per above *-------------------------------------------------------------- */voidAsync_NotifyHandler(SIGNAL_ARGS){	int			save_errno = errno;	/*	 * Note: this is a SIGNAL HANDLER.	You must be very wary what you do	 * here. Some helpful soul had this routine sprinkled with TPRINTFs,	 * which would likely lead to corruption of stdio buffers if they were	 * ever turned on.	 */	/* Don't joggle the elbow of proc_exit */	if (proc_exit_inprogress)		return;	if (notifyInterruptEnabled)	{		bool		save_ImmediateInterruptOK = ImmediateInterruptOK;		/*		 * We may be called while ImmediateInterruptOK is true; turn it		 * off while messing with the NOTIFY state.  (We would have to		 * save and restore it anyway, because PGSemaphore operations		 * inside ProcessIncomingNotify() might reset it.)		 */		ImmediateInterruptOK = false;		/*		 * I'm not sure whether some flavors of Unix might allow another		 * SIGUSR2 occurrence to recursively interrupt this routine. To		 * cope with the possibility, we do the same sort of dance that		 * EnableNotifyInterrupt must do --- see that routine for		 * comments.		 */		notifyInterruptEnabled = 0;		/* disable any recursive signal */		notifyInterruptOccurred = 1;	/* do at least one iteration */		for (;;)		{			notifyInterruptEnabled = 1;			if (!notifyInterruptOccurred)				break;			notifyInterruptEnabled = 0;			if (notifyInterruptOccurred)			{				/* Here, it is finally safe to do stuff. */				if (Trace_notify)					elog(DEBUG1, "Async_NotifyHandler: perform async notify");				ProcessIncomingNotify();				if (Trace_notify)					elog(DEBUG1, "Async_NotifyHandler: done");			}		}		/*		 * Restore ImmediateInterruptOK, and check for interrupts if		 * needed.		 */		ImmediateInterruptOK = save_ImmediateInterruptOK;		if (save_ImmediateInterruptOK)			CHECK_FOR_INTERRUPTS();	}	else	{		/*		 * In this path it is NOT SAFE to do much of anything, except		 * this:		 */		notifyInterruptOccurred = 1;	}	errno = save_errno;}/* * -------------------------------------------------------------- * EnableNotifyInterrupt * *		This is called by the PostgresMain main loop just before waiting *		for a frontend command.  If we are truly idle (ie, *not* inside *		a transaction block), then process any pending inbound notifies, *		and enable the signal handler to process future notifies directly. * *		NOTE: the signal handler starts out disabled, and stays so until *		PostgresMain calls this the first time. * -------------------------------------------------------------- */voidEnableNotifyInterrupt(void){	if (IsTransactionOrTransactionBlock())		return;					/* not really idle */	/*	 * This code is tricky because we are communicating with a signal	 * handler that could interrupt us at any point.  If we just checked	 * notifyInterruptOccurred and then set notifyInterruptEnabled, we	 * could fail to respond promptly to a signal that happens in between	 * those two steps.  (A very small time window, perhaps, but Murphy's	 * Law says you can hit it...)	Instead, we first set the enable flag,	 * then test the occurred flag.  If we see an unserviced interrupt has	 * occurred, we re-clear the enable flag before going off to do the	 * service work.  (That prevents re-entrant invocation of	 * ProcessIncomingNotify() if another interrupt occurs.) If an	 * interrupt comes in between the setting and clearing of	 * notifyInterruptEnabled, then it will have done the service work and	 * left notifyInterruptOccurred zero, so we have to check again after	 * clearing enable.  The whole thing has to be in a loop in case	 * another interrupt occurs while we're servicing the first. Once we	 * get out of the loop, enable is set and we know there is no	 * unserviced interrupt.	 *	 * NB: an overenthusiastic optimizing compiler could easily break this	 * code.  Hopefully, they all understand what "volatile" means these	 * days.	 */	for (;;)	{		notifyInterruptEnabled = 1;		if (!notifyInterruptOccurred)			break;		notifyInterruptEnabled = 0;		if (notifyInterruptOccurred)		{			if (Trace_notify)				elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");			ProcessIncomingNotify();			if (Trace_notify)				elog(DEBUG1, "EnableNotifyInterrupt: done");		}	}}/* * -------------------------------------------------------------- * DisableNotifyInterrupt * *		This is called by the PostgresMain main loop just after receiving *		a frontend command.  Signal handler execution of inbound notifies *		is disabled until the next EnableNotifyInterrupt call. * -------------------------------------------------------------- */voidDisableNotifyInterrupt(void){	notifyInterruptEnabled = 0;}/* * -------------------------------------------------------------- * ProcessIncomingNotify * *		Deal with arriving NOTIFYs from other backends. *		This is called either directly from the SIGUSR2 signal handler, *		or the next time control reaches the outer idle loop. *		Scan pg_listener for arriving notifies, report them to my front end, *		and clear the notification field in pg_listener until next time. * *		NOTE: since we are outside any transaction, we must create our own. * * Results: *		XXX * * -------------------------------------------------------------- */static voidProcessIncomingNotify(void){	Relation	lRel;	TupleDesc	tdesc;	ScanKeyData key[1];	HeapScanDesc scan;	HeapTuple	lTuple,				rTuple;	Datum		value[Natts_pg_listener];	char		repl[Natts_pg_listener],				nulls[Natts_pg_listener];	if (Trace_notify)		elog(DEBUG1, "ProcessIncomingNotify");	set_ps_display("async_notify");	notifyInterruptOccurred = 0;	StartTransactionCommand();	lRel = heap_openr(ListenerRelationName, ExclusiveLock);	tdesc = RelationGetDescr(lRel);	/* Scan only entries with my listenerPID */	ScanKeyEntryInitialize(&key[0], 0,						   Anum_pg_listener_pid,						   F_INT4EQ,						   Int32GetDatum(MyProcPid));	scan = heap_beginscan(lRel, SnapshotNow, 1, key);	/* Prepare data for rewriting 0 into notification field */	nulls[0] = nulls[1] = nulls[2] = ' ';	repl[0] = repl[1] = repl[2] = ' ';	repl[Anum_pg_listener_notify - 1] = 'r';	value[0] = value[1] = value[2] = (Datum) 0;	value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);	while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)	{		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);		char	   *relname = NameStr(listener->relname);		int32		sourcePID = listener->notification;		if (sourcePID != 0)		{			/* Notify the frontend */			if (Trace_notify)				elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",					 relname, (int) sourcePID);			NotifyMyFrontEnd(relname, sourcePID);			/*			 * Rewrite the tuple with 0 in notification column.			 *			 * simple_heap_update is safe here because no one else would			 * have tried to UNLISTEN us, so there can be no uncommitted			 * changes.			 */			rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);			simple_heap_update(lRel, &lTuple->t_self, rTuple);#ifdef NOT_USED					/* currently there are no indexes */			CatalogUpdateIndexes(lRel, rTuple);#endif		}	}	heap_endscan(scan);	/*	 * We do NOT release the lock on pg_listener here; we need to hold it	 * until end of transaction (which is about to happen, anyway) to	 * ensure that other backends see our tuple updates when they look.	 * Otherwise, a transaction started after this one might mistakenly	 * think it doesn't need to send this backend a new NOTIFY.	 */	heap_close(lRel, NoLock);	CommitTransactionCommand();	/*	 * Must flush the notify messages to ensure frontend gets them	 * promptly.	 */	pq_flush();	set_ps_display("idle");	if (Trace_notify)		elog(DEBUG1, "ProcessIncomingNotify: done");}/* * Send NOTIFY message to my front end. */static voidNotifyMyFrontEnd(char *relname, int32 listenerPID){	if (whereToSendOutput == Remote)	{		StringInfoData buf;		pq_beginmessage(&buf, 'A');		pq_sendint(&buf, listenerPID, sizeof(int32));		pq_sendstring(&buf, relname);		if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)		{			/* XXX Add parameter string here later */			pq_sendstring(&buf, "");		}		pq_endmessage(&buf);		/*		 * NOTE: we do not do pq_flush() here.	For a self-notify, it will		 * happen at the end of the transaction, and for incoming notifies		 * ProcessIncomingNotify will do it after finding all the		 * notifies.		 */	}	else		elog(INFO, "NOTIFY for %s", relname);}/* Does pendingNotifies include the given relname? */static boolAsyncExistsPendingNotify(const char *relname){	List	   *p;	foreach(p, pendingNotifies)	{		/* Use NAMEDATALEN for relname comparison.	  DZ - 26-08-1996 */		if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)			return true;	}	return false;}/* Clear the pendingNotifies list. */static voidClearPendingNotifies(void){	/*	 * We used to have to explicitly deallocate the list members and	 * nodes, because they were malloc'd.  Now, since we know they are	 * palloc'd in TopTransactionContext, we need not do that --- they'll	 * go away automatically at transaction exit.  We need only reset the	 * list head pointer.	 */	pendingNotifies = NIL;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?