⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 async.c

📁 关系型数据库 Postgresql 6.5.2
💻 C
📖 第 1 页 / 共 2 页
字号:
	bool		isnull;	char	   *relname;	int32		listenerPID;	if (!pendingNotifies)		return;					/* no NOTIFY statements in this								 * transaction */	/*	 * NOTIFY is disabled if not normal processing mode. This test used to	 * be in xact.c, but it seems cleaner to do it here.	 */	if (!IsNormalProcessingMode())	{		ClearPendingNotifies();		return;	}	TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");	lRel = heap_openr(ListenerRelationName);	LockRelation(lRel, AccessExclusiveLock);	tdesc = RelationGetDescr(lRel);	sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);	/* preset data to update notify column to MyProcPid */	nulls[0] = nulls[1] = nulls[2] = ' ';	repl[0] = repl[1] = repl[2] = ' ';	repl[Anum_pg_listener_notify - 1] = 'r';	value[0] = value[1] = value[2] = (Datum) 0;	value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))	{		d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);		relname = (char *) DatumGetPointer(d);		if (AsyncExistsPendingNotify(relname))		{			d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);			listenerPID = DatumGetInt32(d);			if (listenerPID == MyProcPid)			{				/*				 * Self-notify: no need to bother with table update.				 * Indeed, we *must not* clear the notification field in				 * this path, or we could lose an outside notify, which'd				 * be bad for applications that ignore self-notify				 * messages.				 */				TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");				NotifyMyFrontEnd(relname, listenerPID);			}			else			{				TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",						listenerPID);				/*				 * If someone has already notified this listener, we don't				 * bother modifying the table, but we do still send a				 * SIGUSR2 signal, just in case that backend missed the				 * earlier signal for some reason.	It's OK to send the				 * signal first, because the other guy can't read				 * pg_listener until we unlock it.				 */				if (kill(listenerPID, SIGUSR2) < 0)				{					/*					 * Get rid of pg_listener entry if it refers to a PID					 * that no longer exists.  Presumably, that backend					 * crashed without deleting its pg_listener entries.					 * This code used to only delete the entry if					 * errno==ESRCH, but as far as I can see we should					 * just do it for any failure (certainly at least for					 * EPERM too...)					 */					heap_delete(lRel, &lTuple->t_self, NULL);				}				else				{					d = heap_getattr(lTuple, Anum_pg_listener_notify,									 tdesc, &isnull);					if (DatumGetInt32(d) == 0)					{						rTuple = heap_modifytuple(lTuple, lRel,												  value, nulls, repl);						heap_replace(lRel, &lTuple->t_self, rTuple, NULL);					}				}			}		}	}	heap_endscan(sRel);	/*	 * We do not do RelationUnsetLockForWrite(lRel) here, because the	 * transaction is about to be committed anyway.	 */	heap_close(lRel);	ClearPendingNotifies();	TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");}/* *-------------------------------------------------------------- * AtAbort_Notify * *		This is called at transaction abort. * *		Gets rid of pending outbound notifies that we would have executed *		if the transaction got committed. * * Results: *		XXX * *-------------------------------------------------------------- */voidAtAbort_Notify(){	ClearPendingNotifies();}/* *-------------------------------------------------------------- * Async_NotifyHandler * *		This is the signal handler for SIGUSR2. * *		If we are idle (notifyInterruptEnabled is set), we can safely invoke *		ProcessIncomingNotify directly.  Otherwise, just set a flag *		to do it later. * * Results: *		none * * Side effects: *		per above *-------------------------------------------------------------- */voidAsync_NotifyHandler(SIGNAL_ARGS){	/*	 * Note: this is a SIGNAL HANDLER.	You must be very wary what you do	 * here. Some helpful soul had this routine sprinkled with TPRINTFs,	 * which would likely lead to corruption of stdio buffers if they were	 * ever turned on.	 */	if (notifyInterruptEnabled)	{		/*		 * I'm not sure whether some flavors of Unix might allow another		 * SIGUSR2 occurrence to recursively interrupt this routine. To		 * cope with the possibility, we do the same sort of dance that		 * EnableNotifyInterrupt must do --- see that routine for		 * comments.		 */		notifyInterruptEnabled = 0;		/* disable any recursive signal */		notifyInterruptOccurred = 1;	/* do at least one iteration */		for (;;)		{			notifyInterruptEnabled = 1;			if (!notifyInterruptOccurred)				break;			notifyInterruptEnabled = 0;			if (notifyInterruptOccurred)			{				/* Here, it is finally safe to do stuff. */				TPRINTF(TRACE_NOTIFY,						"Async_NotifyHandler: perform async notify");				ProcessIncomingNotify();				TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");			}		}	}	else	{		/*		 * In this path it is NOT SAFE to do much of anything, except		 * this:		 */		notifyInterruptOccurred = 1;	}}/* * -------------------------------------------------------------- * EnableNotifyInterrupt * *		This is called by the PostgresMain main loop just before waiting *		for a frontend command.  If we are truly idle (ie, *not* inside *		a transaction block), then process any pending inbound notifies, *		and enable the signal handler to process future notifies directly. * *		NOTE: the signal handler starts out disabled, and stays so until *		PostgresMain calls this the first time. * -------------------------------------------------------------- */voidEnableNotifyInterrupt(void){	if (CurrentTransactionState->blockState != TRANS_DEFAULT)		return;					/* not really idle */	/*	 * This code is tricky because we are communicating with a signal	 * handler that could interrupt us at any point.  If we just checked	 * notifyInterruptOccurred and then set notifyInterruptEnabled, we	 * could fail to respond promptly to a signal that happens in between	 * those two steps.  (A very small time window, perhaps, but Murphy's	 * Law says you can hit it...)	Instead, we first set the enable flag,	 * then test the occurred flag.  If we see an unserviced interrupt has	 * occurred, we re-clear the enable flag before going off to do the	 * service work.  (That prevents re-entrant invocation of	 * ProcessIncomingNotify() if another interrupt occurs.) If an	 * interrupt comes in between the setting and clearing of	 * notifyInterruptEnabled, then it will have done the service work and	 * left notifyInterruptOccurred zero, so we have to check again after	 * clearing enable.  The whole thing has to be in a loop in case	 * another interrupt occurs while we're servicing the first. Once we	 * get out of the loop, enable is set and we know there is no	 * unserviced interrupt.	 *	 * NB: an overenthusiastic optimizing compiler could easily break this	 * code.  Hopefully, they all understand what "volatile" means these	 * days.	 */	for (;;)	{		notifyInterruptEnabled = 1;		if (!notifyInterruptOccurred)			break;		notifyInterruptEnabled = 0;		if (notifyInterruptOccurred)		{			TPRINTF(TRACE_NOTIFY,					"EnableNotifyInterrupt: perform async notify");			ProcessIncomingNotify();			TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");		}	}}/* * -------------------------------------------------------------- * DisableNotifyInterrupt * *		This is called by the PostgresMain main loop just after receiving *		a frontend command.  Signal handler execution of inbound notifies *		is disabled until the next EnableNotifyInterrupt call. * -------------------------------------------------------------- */voidDisableNotifyInterrupt(void){	notifyInterruptEnabled = 0;}/* * -------------------------------------------------------------- * ProcessIncomingNotify * *		Deal with arriving NOTIFYs from other backends. *		This is called either directly from the SIGUSR2 signal handler, *		or the next time control reaches the outer idle loop. *		Scan pg_listener for arriving notifies, report them to my front end, *		and clear the notification field in pg_listener until next time. * *		NOTE: since we are outside any transaction, we must create our own. * * Results: *		XXX * * -------------------------------------------------------------- */static voidProcessIncomingNotify(void){	Relation	lRel;	TupleDesc	tdesc;	ScanKeyData key[1];	HeapScanDesc sRel;	HeapTuple	lTuple,				rTuple;	Datum		d,				value[Natts_pg_listener];	char		repl[Natts_pg_listener],				nulls[Natts_pg_listener];	bool		isnull;	char	   *relname;	int32		sourcePID;	TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");	PS_SET_STATUS("async_notify");	notifyInterruptOccurred = 0;	StartTransactionCommand();	lRel = heap_openr(ListenerRelationName);	LockRelation(lRel, AccessExclusiveLock);	tdesc = RelationGetDescr(lRel);	/* Scan only entries with my listenerPID */	ScanKeyEntryInitialize(&key[0], 0,						   Anum_pg_listener_pid,						   F_INT4EQ,						   Int32GetDatum(MyProcPid));	sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);	/* Prepare data for rewriting 0 into notification field */	nulls[0] = nulls[1] = nulls[2] = ' ';	repl[0] = repl[1] = repl[2] = ' ';	repl[Anum_pg_listener_notify - 1] = 'r';	value[0] = value[1] = value[2] = (Datum) 0;	value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))	{		d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);		sourcePID = DatumGetInt32(d);		if (sourcePID != 0)		{			d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);			relname = (char *) DatumGetPointer(d);			/* Notify the frontend */			TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",					relname, (int) sourcePID);			NotifyMyFrontEnd(relname, sourcePID);			/* Rewrite the tuple with 0 in notification column */			rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);			heap_replace(lRel, &lTuple->t_self, rTuple, NULL);		}	}	heap_endscan(sRel);	/*	 * We do not do RelationUnsetLockForWrite(lRel) here, because the	 * transaction is about to be committed anyway.	 */	heap_close(lRel);	CommitTransactionCommand();	/*	 * Must flush the notify messages to ensure frontend gets them	 * promptly.	 */	pq_flush();	PS_SET_STATUS("idle");	TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");}/* Send NOTIFY message to my front end. */static voidNotifyMyFrontEnd(char *relname, int32 listenerPID){	if (whereToSendOutput == Remote)	{		StringInfoData buf;		pq_beginmessage(&buf);		pq_sendbyte(&buf, 'A');		pq_sendint(&buf, listenerPID, sizeof(int32));		pq_sendstring(&buf, relname);		pq_endmessage(&buf);		/*		 * NOTE: we do not do pq_flush() here.	For a self-notify, it will		 * happen at the end of the transaction, and for incoming notifies		 * ProcessIncomingNotify will do it after finding all the		 * notifies.		 */	}	else		elog(NOTICE, "NOTIFY for %s", relname);}/* Does pendingNotifies include the given relname? * * NB: not called unless pendingNotifies != NULL. */static intAsyncExistsPendingNotify(char *relname){	Dlelem	   *p;	for (p = DLGetHead(pendingNotifies);		 p != NULL;		 p = DLGetSucc(p))	{		/* Use NAMEDATALEN for relname comparison.	  DZ - 26-08-1996 */		if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))			return 1;	}	return 0;}/* Clear the pendingNotifies list. */static voidClearPendingNotifies(){	Dlelem	   *p;	if (pendingNotifies)	{		/*		 * Since the referenced strings are malloc'd, we have to scan the		 * list and delete them individually.  If we used palloc for the		 * strings then we could just do DLFreeList to get rid of both the		 * list nodes and the list base...		 */		while ((p = DLRemHead(pendingNotifies)) != NULL)		{			free(DLE_VAL(p));			DLFreeElem(p);		}		DLFreeList(pendingNotifies);		pendingNotifies = NULL;	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -