📄 async.c
字号:
/*------------------------------------------------------------------------- * * async.c * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.127.2.1 2005/11/22 18:23:06 momjian Exp $ * *------------------------------------------------------------------------- *//*------------------------------------------------------------------------- * New Async Notification Model: * 1. Multiple backends on same machine. Multiple backends listening on * one relation. (Note: "listening on a relation" is not really the * right way to think about it, since the notify names need not have * anything to do with the names of relations actually in the database. * But this terminology is all over the code and docs, and I don't feel * like trying to replace it.) * * 2. There is a tuple in relation "pg_listener" for each active LISTEN, * ie, each relname/listenerPID pair. The "notification" field of the * tuple is zero when no NOTIFY is pending for that listener, or the PID * of the originating backend when a cross-backend NOTIFY is pending. * (We skip writing to pg_listener when doing a self-NOTIFY, so the * notification field should never be equal to the listenerPID field.) * * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target * relname to a list of outstanding NOTIFY requests. Actual processing * happens if and only if we reach transaction commit. At that time (in * routine AtCommit_Notify) we scan pg_listener for matching relnames. * If the listenerPID in a matching tuple is ours, we just send a notify * message to our own front end. If it is not ours, and "notification" * is not already nonzero, we set notification to our own PID and send a * SIGUSR2 signal to the receiving process (indicated by listenerPID). * BTW: if the signal operation fails, we presume that the listener backend * crashed without removing this tuple, and remove the tuple for it. * * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound- * notify processing immediately if this backend is idle (ie, it is * waiting for a frontend command and is not within a transaction block). * Otherwise the handler may only set a flag, which will cause the * processing to occur just before we next go idle. * * 5. Inbound-notify processing consists of scanning pg_listener for tuples * matching our own listenerPID and having nonzero notification fields. * For each such tuple, we send a message to our frontend and clear the * notification field. BTW: this routine has to start/commit its own * transaction, since by assumption it is only called from outside any * transaction. * * Although we grab ExclusiveLock on pg_listener for any operation, * the lock is never held very long, so it shouldn't cause too much of * a performance problem. (Previously we used AccessExclusiveLock, but * there's no real reason to forbid concurrent reads.) * * An application that listens on the same relname it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.) The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies. Note, * however, that we do *not* guarantee that a separate frontend message will * be sent for every outside NOTIFY. Since there is only room for one * originating PID in pg_listener, outside notifies occurring at about the * same time may be collapsed into a single message bearing the PID of the * first outside backend to perform the NOTIFY. *------------------------------------------------------------------------- */#include "postgres.h"#include <unistd.h>#include <signal.h>#include <netinet/in.h>#include "access/heapam.h"#include "access/twophase_rmgr.h"#include "catalog/pg_listener.h"#include "commands/async.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"#include "miscadmin.h"#include "storage/ipc.h"#include "storage/sinval.h"#include "tcop/tcopprot.h"#include "utils/fmgroids.h"#include "utils/memutils.h"#include "utils/ps_status.h"#include "utils/syscache.h"/* * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction. We do not actually perform a NOTIFY until * and unless the transaction commits. pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but * successful subtransactions attach their lists to their parent's list. * Failed subtransactions simply discard their lists. */static List *pendingNotifies = NIL;static List *upperPendingNotifies = NIL; /* list of upper-xact lists *//* * State for inbound notifies consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. * * NB: the "volatile" on these declarations is critical! If your compiler * does not grok "volatile", you'd be best advised to compile this file * with all optimization turned off. */static volatile int notifyInterruptEnabled = 0;static volatile int notifyInterruptOccurred = 0;/* True if we've registered an on_shmem_exit cleanup */static bool unlistenExitRegistered = false;bool Trace_notify = false;static void Async_UnlistenAll(void);static void Async_UnlistenOnExit(int code, Datum arg);static void ProcessIncomingNotify(void);static void NotifyMyFrontEnd(char *relname, int32 listenerPID);static bool AsyncExistsPendingNotify(const char *relname);static void ClearPendingNotifies(void);/* *-------------------------------------------------------------- * Async_Notify * * This is executed by the SQL notify command. * * Adds the relation to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * *-------------------------------------------------------------- */voidAsync_Notify(const char *relname){ if (Trace_notify) elog(DEBUG1, "Async_Notify(%s)", relname); /* no point in making duplicate entries in the list ... */ if (!AsyncExistsPendingNotify(relname)) { /* * The name list needs to live until end of transaction, so store it * in the transaction context. */ MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(CurTransactionContext); pendingNotifies = lcons(pstrdup(relname), pendingNotifies); MemoryContextSwitchTo(oldcontext); }}/* *-------------------------------------------------------------- * Async_Listen * * This is executed by the SQL listen command. * * Register the current backend as listening on the specified * relation. * * Side effects: * pg_listener is updated. * *-------------------------------------------------------------- */voidAsync_Listen(const char *relname){ Relation lRel; HeapScanDesc scan; HeapTuple tuple; Datum values[Natts_pg_listener]; char nulls[Natts_pg_listener]; int i; bool alreadyListener = false; if (Trace_notify) elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); lRel = heap_open(ListenerRelationId, ExclusiveLock); /* Detect whether we are already listening on this relname */ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); if (listener->listenerpid == MyProcPid && strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) { alreadyListener = true; /* No need to scan the rest of the table */ break; } } heap_endscan(scan); if (alreadyListener) { heap_close(lRel, ExclusiveLock); return; } /* * OK to insert a new tuple */ for (i = 0; i < Natts_pg_listener; i++) { nulls[i] = ' '; values[i] = PointerGetDatum(NULL); } i = 0; values[i++] = (Datum) relname; values[i++] = (Datum) MyProcPid; values[i++] = (Datum) 0; /* no notifies pending */ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls); simple_heap_insert(lRel, tuple);#ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, tuple);#endif heap_freetuple(tuple); heap_close(lRel, ExclusiveLock); /* * now that we are listening, make sure we will unlisten before dying. */ if (!unlistenExitRegistered) { on_shmem_exit(Async_UnlistenOnExit, 0); unlistenExitRegistered = true; }}/* *-------------------------------------------------------------- * Async_Unlisten * * This is executed by the SQL unlisten command. * * Remove the current backend from the list of listening backends * for the specified relation. * * Side effects: * pg_listener is updated. * *-------------------------------------------------------------- */voidAsync_Unlisten(const char *relname){ Relation lRel; HeapScanDesc scan; HeapTuple tuple; /* Handle specially the `unlisten "*"' command */ if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) { Async_UnlistenAll(); return; } if (Trace_notify) elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); lRel = heap_open(ListenerRelationId, ExclusiveLock); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); if (listener->listenerpid == MyProcPid && strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) { /* Found the matching tuple, delete it */ simple_heap_delete(lRel, &tuple->t_self); /* * We assume there can be only one match, so no need to scan the * rest of the table */ break; } } heap_endscan(scan); heap_close(lRel, ExclusiveLock); /* * We do not complain about unlistening something not being listened; * should we? */}/* *-------------------------------------------------------------- * Async_UnlistenAll * * Unlisten all relations for this backend. * * This is invoked by UNLISTEN "*" command, and also at backend exit. * * Results: * XXX * * Side effects: * pg_listener is updated. * *-------------------------------------------------------------- */static voidAsync_UnlistenAll(void){ Relation lRel; TupleDesc tdesc; HeapScanDesc scan; HeapTuple lTuple; ScanKeyData key[1]; if (Trace_notify) elog(DEBUG1, "Async_UnlistenAll"); lRel = heap_open(ListenerRelationId, ExclusiveLock); tdesc = RelationGetDescr(lRel); /* Find and delete all entries with my listenerPID */ ScanKeyInit(&key[0], Anum_pg_listener_pid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(MyProcPid));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -