⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 async.c

📁 关系型数据库 Postgresql 6.5.2
💻 C
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------- * * async.c *	  Asynchronous notification: NOTIFY, LISTEN, UNLISTEN * * Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION *	  $Header: /usr/local/cvsroot/pgsql/src/backend/commands/async.c,v 1.47.2.2 1999/09/14 22:33:35 tgl Exp $ * *------------------------------------------------------------------------- *//*------------------------------------------------------------------------- * New Async Notification Model: * 1. Multiple backends on same machine.  Multiple backends listening on *	  one relation.  (Note: "listening on a relation" is not really the *	  right way to think about it, since the notify names need not have *	  anything to do with the names of relations actually in the database. *	  But this terminology is all over the code and docs, and I don't feel *	  like trying to replace it.) * * 2. There is a tuple in relation "pg_listener" for each active LISTEN, *	  ie, each relname/listenerPID pair.  The "notification" field of the *	  tuple is zero when no NOTIFY is pending for that listener, or the PID *	  of the originating backend when a cross-backend NOTIFY is pending. *	  (We skip writing to pg_listener when doing a self-NOTIFY, so the *	  notification field should never be equal to the listenerPID field.) * * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target *	  relname to a list of outstanding NOTIFY requests.  Actual processing *	  happens if and only if we reach transaction commit.  At that time (in *	  routine AtCommit_Notify) we scan pg_listener for matching relnames. *	  If the listenerPID in a matching tuple is ours, we just send a notify *	  message to our own front end.  If it is not ours, and "notification" *	  is not already nonzero, we set notification to our own PID and send a *	  SIGUSR2 signal to the receiving process (indicated by listenerPID). *	  BTW: if the signal operation fails, we presume that the listener backend *	  crashed without removing this tuple, and remove the tuple for it. * * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound- *	  notify processing immediately if this backend is idle (ie, it is *	  waiting for a frontend command and is not within a transaction block). *	  Otherwise the handler may only set a flag, which will cause the *	  processing to occur just before we next go idle. * * 5. Inbound-notify processing consists of scanning pg_listener for tuples *	  matching our own listenerPID and having nonzero notification fields. *	  For each such tuple, we send a message to our frontend and clear the *	  notification field.  BTW: this routine has to start/commit its own *	  transaction, since by assumption it is only called from outside any *	  transaction. * * Note that the system's use of pg_listener is confined to very short * intervals at the end of a transaction that contains NOTIFY statements, * or during the transaction caused by an inbound SIGUSR2.	So the fact that * pg_listener is a global resource shouldn't cause too much performance * problem.  But application authors ought to be discouraged from doing * LISTEN or UNLISTEN near the start of a long transaction --- that would * result in holding the pg_listener write lock for a long time, possibly * blocking unrelated activity.  It could even lead to deadlock against another * transaction that touches the same user tables and then tries to NOTIFY. * Probably best to do LISTEN or UNLISTEN outside of transaction blocks. * * An application that listens on the same relname it notifies will get * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.)  The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies.  Note, * however, that we do *not* guarantee that a separate frontend message will * be sent for every outside NOTIFY.  Since there is only room for one * originating PID in pg_listener, outside notifies occurring at about the * same time may be collapsed into a single message bearing the PID of the * first outside backend to perform the NOTIFY. *------------------------------------------------------------------------- */#include <unistd.h>#include <signal.h>#include <errno.h>#include <sys/types.h>#include <netinet/in.h>#include "postgres.h"#include "access/heapam.h"#include "catalog/catname.h"#include "catalog/pg_listener.h"#include "commands/async.h"#include "lib/dllist.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"#include "miscadmin.h"#include "utils/ps_status.h"#include "utils/syscache.h"#include "utils/trace.h"/* stuff that we really ought not be touching directly :-( */extern TransactionState CurrentTransactionState;extern CommandDest whereToSendOutput;/* * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction.	We do not actually perform a NOTIFY until * and unless the transaction commits.	pendingNotifies is NULL if no * NOTIFYs have been done in the current transaction. */static Dllist *pendingNotifies = NULL;/* * State for inbound notifies consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. * * NB: the "volatile" on these declarations is critical!  If your compiler * does not grok "volatile", you'd be best advised to compile this file * with all optimization turned off. */static volatile int notifyInterruptEnabled = 0;static volatile int notifyInterruptOccurred = 0;/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */static int	unlistenExitRegistered = 0;static void Async_UnlistenAll(void);static void Async_UnlistenOnExit(void);static void ProcessIncomingNotify(void);static void NotifyMyFrontEnd(char *relname, int32 listenerPID);static int	AsyncExistsPendingNotify(char *relname);static void ClearPendingNotifies(void);/* *-------------------------------------------------------------- * Async_Notify * *		This is executed by the SQL notify command. * *		Adds the relation to the list of pending notifies. *		Actual notification happens during transaction commit. *		^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * * Results: *		XXX * *-------------------------------------------------------------- */voidAsync_Notify(char *relname){	char	   *notifyName;	TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);	/*	 * We allocate list memory from the global malloc pool to ensure that	 * it will live until we want to use it.  This is probably not	 * necessary any longer, since we will use it before the end of the	 * transaction. DLList only knows how to use malloc() anyway, but we	 * could probably palloc() the strings...	 */	if (!pendingNotifies)		pendingNotifies = DLNewList();	notifyName = strdup(relname);	DLAddHead(pendingNotifies, DLNewElem(notifyName));	/*	 * NOTE: we could check to see if pendingNotifies already has an entry	 * for relname, and thus avoid making duplicate entries.  However,	 * most apps probably don't notify the same name multiple times per	 * transaction, so we'd likely just be wasting cycles to make such a	 * check. AsyncExistsPendingNotify() doesn't really care whether the	 * list contains duplicates...	 */}/* *-------------------------------------------------------------- * Async_Listen * *		This is executed by the SQL listen command. * *		Register a backend (identified by its Unix PID) as listening *		on the specified relation. * * Results: *		XXX * * Side effects: *		pg_listener is updated. * *-------------------------------------------------------------- */voidAsync_Listen(char *relname, int pid){	Relation	lRel;	TupleDesc	tdesc;	HeapScanDesc scan;	HeapTuple	tuple,				newtup;	Datum		values[Natts_pg_listener];	char		nulls[Natts_pg_listener];	Datum		d;	int			i;	bool		isnull;	int			alreadyListener = 0;	TupleDesc	tupDesc;	TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);	lRel = heap_openr(ListenerRelationName);	LockRelation(lRel, AccessExclusiveLock);	tdesc = RelationGetDescr(lRel);	/* Detect whether we are already listening on this relname */	scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);	while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))	{		d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);		if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))		{			d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);			if (DatumGetInt32(d) == pid)			{				alreadyListener = 1;				/* No need to scan the rest of the table */				break;			}		}	}	heap_endscan(scan);	if (alreadyListener)	{		elog(NOTICE, "Async_Listen: We are already listening on %s", relname);		UnlockRelation(lRel, AccessExclusiveLock);		heap_close(lRel);		return;	}	/*	 * OK to insert a new tuple	 */	for (i = 0; i < Natts_pg_listener; i++)	{		nulls[i] = ' ';		values[i] = PointerGetDatum(NULL);	}	i = 0;	values[i++] = (Datum) relname;	values[i++] = (Datum) pid;	values[i++] = (Datum) 0;	/* no notifies pending */	tupDesc = lRel->rd_att;	newtup = heap_formtuple(tupDesc, values, nulls);	heap_insert(lRel, newtup);	pfree(newtup);	UnlockRelation(lRel, AccessExclusiveLock);	heap_close(lRel);	/*	 * now that we are listening, make sure we will unlisten before dying.	 */	if (!unlistenExitRegistered)	{		if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)			elog(NOTICE, "Async_Listen: out of shmem_exit slots");		unlistenExitRegistered = 1;	}}/* *-------------------------------------------------------------- * Async_Unlisten * *		This is executed by the SQL unlisten command. * *		Remove the backend from the list of listening backends *		for the specified relation. * * Results: *		XXX * * Side effects: *		pg_listener is updated. * *-------------------------------------------------------------- */voidAsync_Unlisten(char *relname, int pid){	Relation	lRel;	HeapTuple	lTuple;	/* Handle specially the `unlisten "*"' command */	if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))	{		Async_UnlistenAll();		return;	}	TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);	/* Note we assume there can be only one matching tuple. */	lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),								 Int32GetDatum(pid),								 0, 0);	if (lTuple != NULL)	{		lRel = heap_openr(ListenerRelationName);		LockRelation(lRel, AccessExclusiveLock);		heap_delete(lRel, &lTuple->t_self, NULL);		UnlockRelation(lRel, AccessExclusiveLock);		heap_close(lRel);	}	/*	 * We do not complain about unlistening something not being listened;	 * should we?	 */}/* *-------------------------------------------------------------- * Async_UnlistenAll * *		Unlisten all relations for this backend. * *		This is invoked by UNLISTEN "*" command, and also at backend exit. * * Results: *		XXX * * Side effects: *		pg_listener is updated. * *-------------------------------------------------------------- */static voidAsync_UnlistenAll(){	Relation	lRel;	TupleDesc	tdesc;	HeapScanDesc sRel;	HeapTuple	lTuple;	ScanKeyData key[1];	TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");	lRel = heap_openr(ListenerRelationName);	LockRelation(lRel, AccessExclusiveLock);	tdesc = RelationGetDescr(lRel);	/* Find and delete all entries with my listenerPID */	ScanKeyEntryInitialize(&key[0], 0,						   Anum_pg_listener_pid,						   F_INT4EQ,						   Int32GetDatum(MyProcPid));	sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);	while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))		heap_delete(lRel, &lTuple->t_self, NULL);	heap_endscan(sRel);	UnlockRelation(lRel, AccessExclusiveLock);	heap_close(lRel);}/* *-------------------------------------------------------------- * Async_UnlistenOnExit * *		Clean up the pg_listener table at backend exit. * *		This is executed if we have done any LISTENs in this backend. *		It might not be necessary anymore, if the user UNLISTENed everything, *		but we don't try to detect that case. * * Results: *		XXX * * Side effects: *		pg_listener is updated if necessary. * *-------------------------------------------------------------- */static voidAsync_UnlistenOnExit(){	/*	 * We need to start/commit a transaction for the unlisten, but if	 * there is already an active transaction we had better abort that one	 * first.  Otherwise we'd end up committing changes that probably	 * ought to be discarded.	 */	AbortOutOfAnyTransaction();	/* Now we can do the unlisten */	StartTransactionCommand();	Async_UnlistenAll();	CommitTransactionCommand();}/* *-------------------------------------------------------------- * AtCommit_Notify * *		This is called at transaction commit. * *		If there are outbound notify requests in the pendingNotifies list, *		scan pg_listener for matching tuples, and either signal the other *		backend or send a message to our own frontend. * *		NOTE: we are still inside the current transaction, therefore can *		piggyback on its committing of changes. * * Results: *		XXX * * Side effects: *		Tuples in pg_listener that have matching relnames and other peoples' *		listenerPIDs are updated with a nonzero notification field. * *-------------------------------------------------------------- */voidAtCommit_Notify(){	Relation	lRel;	TupleDesc	tdesc;	HeapScanDesc sRel;	HeapTuple	lTuple,				rTuple;	Datum		d,				value[Natts_pg_listener];	char		repl[Natts_pg_listener],				nulls[Natts_pg_listener];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -