⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 async.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------- * * async.c *	  Asynchronous notification: NOTIFY, LISTEN, UNLISTEN * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION *	  $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.127.2.1 2005/11/22 18:23:06 momjian Exp $ * *------------------------------------------------------------------------- *//*------------------------------------------------------------------------- * New Async Notification Model: * 1. Multiple backends on same machine.  Multiple backends listening on *	  one relation.  (Note: "listening on a relation" is not really the *	  right way to think about it, since the notify names need not have *	  anything to do with the names of relations actually in the database. *	  But this terminology is all over the code and docs, and I don't feel *	  like trying to replace it.) * * 2. There is a tuple in relation "pg_listener" for each active LISTEN, *	  ie, each relname/listenerPID pair.  The "notification" field of the *	  tuple is zero when no NOTIFY is pending for that listener, or the PID *	  of the originating backend when a cross-backend NOTIFY is pending. *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the *	  notification field should never be equal to the listenerPID field.) * * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target *	  relname to a list of outstanding NOTIFY requests.  Actual processing *	  happens if and only if we reach transaction commit.  At that time (in *	  routine AtCommit_Notify) we scan pg_listener for matching relnames. *	  If the listenerPID in a matching tuple is ours, we just send a notify *	  message to our own front end.  If it is not ours, and "notification" *	  is not already nonzero, we set notification to our own PID and send a *	  SIGUSR2 signal to the receiving process (indicated by listenerPID). *	  BTW: if the signal operation fails, we presume that the listener backend *	  crashed without removing this tuple, and remove the tuple for it. * * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound- *	  notify processing immediately if this backend is idle (ie, it is *	  waiting for a frontend command and is not within a transaction block). *	  Otherwise the handler may only set a flag, which will cause the *	  processing to occur just before we next go idle. * * 5. Inbound-notify processing consists of scanning pg_listener for tuples *	  matching our own listenerPID and having nonzero notification fields. *	  For each such tuple, we send a message to our frontend and clear the *	  notification field.  BTW: this routine has to start/commit its own *	  transaction, since by assumption it is only called from outside any *	  transaction. * * Although we grab ExclusiveLock on pg_listener for any operation, * the lock is never held very long, so it shouldn't cause too much of * a performance problem.  (Previously we used AccessExclusiveLock, but * there's no real reason to forbid concurrent reads.) * * An application that listens on the same relname it notifies will get * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.)  The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies.  Note, * however, that we do *not* guarantee that a separate frontend message will * be sent for every outside NOTIFY.  Since there is only room for one * originating PID in pg_listener, outside notifies occurring at about the * same time may be collapsed into a single message bearing the PID of the * first outside backend to perform the NOTIFY. *------------------------------------------------------------------------- */#include "postgres.h"#include <unistd.h>#include <signal.h>#include <netinet/in.h>#include "access/heapam.h"#include "access/twophase_rmgr.h"#include "catalog/pg_listener.h"#include "commands/async.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"#include "miscadmin.h"#include "storage/ipc.h"#include "storage/sinval.h"#include "tcop/tcopprot.h"#include "utils/fmgroids.h"#include "utils/memutils.h"#include "utils/ps_status.h"#include "utils/syscache.h"/* * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction.	We do not actually perform a NOTIFY until * and unless the transaction commits.	pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. * * The list is kept in CurTransactionContext.  In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but * successful subtransactions attach their lists to their parent's list. * Failed subtransactions simply discard their lists. */static List *pendingNotifies = NIL;static List *upperPendingNotifies = NIL;		/* list of upper-xact lists *//* * State for inbound notifies consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. * * NB: the "volatile" on these declarations is critical!  If your compiler * does not grok "volatile", you'd be best advised to compile this file * with all optimization turned off. */static volatile int notifyInterruptEnabled = 0;static volatile int notifyInterruptOccurred = 0;/* True if we've registered an on_shmem_exit cleanup */static bool unlistenExitRegistered = false;bool		Trace_notify = false;static void Async_UnlistenAll(void);static void Async_UnlistenOnExit(int code, Datum arg);static void ProcessIncomingNotify(void);static void NotifyMyFrontEnd(char *relname, int32 listenerPID);static bool AsyncExistsPendingNotify(const char *relname);static void ClearPendingNotifies(void);/* *-------------------------------------------------------------- * Async_Notify * *		This is executed by the SQL notify command. * *		Adds the relation to the list of pending notifies. *		Actual notification happens during transaction commit. *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * *-------------------------------------------------------------- */voidAsync_Notify(const char *relname){	if (Trace_notify)		elog(DEBUG1, "Async_Notify(%s)", relname);	/* no point in making duplicate entries in the list ... */	if (!AsyncExistsPendingNotify(relname))	{		/*		 * The name list needs to live until end of transaction, so store it		 * in the transaction context.		 */		MemoryContext oldcontext;		oldcontext = MemoryContextSwitchTo(CurTransactionContext);		pendingNotifies = lcons(pstrdup(relname), pendingNotifies);		MemoryContextSwitchTo(oldcontext);	}}/* *-------------------------------------------------------------- * Async_Listen * *		This is executed by the SQL listen command. * *		Register the current backend as listening on the specified *		relation. * * Side effects: *		pg_listener is updated. * *-------------------------------------------------------------- */voidAsync_Listen(const char *relname){	Relation	lRel;	HeapScanDesc scan;	HeapTuple	tuple;	Datum		values[Natts_pg_listener];	char		nulls[Natts_pg_listener];	int			i;	bool		alreadyListener = false;	if (Trace_notify)		elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);	lRel = heap_open(ListenerRelationId, ExclusiveLock);	/* Detect whether we are already listening on this relname */	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)	{		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);		if (listener->listenerpid == MyProcPid &&			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)		{			alreadyListener = true;			/* No need to scan the rest of the table */			break;		}	}	heap_endscan(scan);	if (alreadyListener)	{		heap_close(lRel, ExclusiveLock);		return;	}	/*	 * OK to insert a new tuple	 */	for (i = 0; i < Natts_pg_listener; i++)	{		nulls[i] = ' ';		values[i] = PointerGetDatum(NULL);	}	i = 0;	values[i++] = (Datum) relname;	values[i++] = (Datum) MyProcPid;	values[i++] = (Datum) 0;	/* no notifies pending */	tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);	simple_heap_insert(lRel, tuple);#ifdef NOT_USED					/* currently there are no indexes */	CatalogUpdateIndexes(lRel, tuple);#endif	heap_freetuple(tuple);	heap_close(lRel, ExclusiveLock);	/*	 * now that we are listening, make sure we will unlisten before dying.	 */	if (!unlistenExitRegistered)	{		on_shmem_exit(Async_UnlistenOnExit, 0);		unlistenExitRegistered = true;	}}/* *-------------------------------------------------------------- * Async_Unlisten * *		This is executed by the SQL unlisten command. * *		Remove the current backend from the list of listening backends *		for the specified relation. * * Side effects: *		pg_listener is updated. * *-------------------------------------------------------------- */voidAsync_Unlisten(const char *relname){	Relation	lRel;	HeapScanDesc scan;	HeapTuple	tuple;	/* Handle specially the `unlisten "*"' command */	if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))	{		Async_UnlistenAll();		return;	}	if (Trace_notify)		elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);	lRel = heap_open(ListenerRelationId, ExclusiveLock);	scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)	{		Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);		if (listener->listenerpid == MyProcPid &&			strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)		{			/* Found the matching tuple, delete it */			simple_heap_delete(lRel, &tuple->t_self);			/*			 * We assume there can be only one match, so no need to scan the			 * rest of the table			 */			break;		}	}	heap_endscan(scan);	heap_close(lRel, ExclusiveLock);	/*	 * We do not complain about unlistening something not being listened;	 * should we?	 */}/* *-------------------------------------------------------------- * Async_UnlistenAll * *		Unlisten all relations for this backend. * *		This is invoked by UNLISTEN "*" command, and also at backend exit. * * Results: *		XXX * * Side effects: *		pg_listener is updated. * *-------------------------------------------------------------- */static voidAsync_UnlistenAll(void){	Relation	lRel;	TupleDesc	tdesc;	HeapScanDesc scan;	HeapTuple	lTuple;	ScanKeyData key[1];	if (Trace_notify)		elog(DEBUG1, "Async_UnlistenAll");	lRel = heap_open(ListenerRelationId, ExclusiveLock);	tdesc = RelationGetDescr(lRel);	/* Find and delete all entries with my listenerPID */	ScanKeyInit(&key[0],				Anum_pg_listener_pid,				BTEqualStrategyNumber, F_INT4EQ,				Int32GetDatum(MyProcPid));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -