async.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 948 行 · 第 1/2 页
C
948 行
{ /* * Self-notify: no need to bother with table update. Indeed, * we *must not* clear the notification field in this path, or * we could lose an outside notify, which'd be bad for * applications that ignore self-notify messages. */ if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: notifying self"); NotifyMyFrontEnd(relname, listenerPID); } else { if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: notifying pid %d", listenerPID); /* * If someone has already notified this listener, we don't * bother modifying the table, but we do still send a SIGUSR2 * signal, just in case that backend missed the earlier signal * for some reason. It's OK to send the signal first, because * the other guy can't read pg_listener until we unlock it. */ if (kill(listenerPID, SIGUSR2) < 0) { /* * Get rid of pg_listener entry if it refers to a PID that * no longer exists. Presumably, that backend crashed * without deleting its pg_listener entries. This code * used to only delete the entry if errno==ESRCH, but as * far as I can see we should just do it for any failure * (certainly at least for EPERM too...) */ simple_heap_delete(lRel, &lTuple->t_self); } else if (listener->notification == 0) { ItemPointerData ctid; int result; rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); /* * We cannot use simple_heap_update here because the tuple * could have been modified by an uncommitted transaction; * specifically, since UNLISTEN releases exclusive lock on * the table before commit, the other guy could already have * tried to unlisten. There are no other cases where we * should be able to see an uncommitted update or delete. * Therefore, our response to a HeapTupleBeingUpdated result * is just to ignore it. We do *not* wait for the other * guy to commit --- that would risk deadlock, and we don't * want to block while holding the table lock anyway for * performance reasons. We also ignore HeapTupleUpdated, * which could occur if the other guy commits between our * heap_getnext and heap_update calls. */ result = heap_update(lRel, &lTuple->t_self, rTuple, &ctid, GetCurrentCommandId(), SnapshotAny, false /* no wait for commit */); switch (result) { case HeapTupleSelfUpdated: /* Tuple was already updated in current command? */ elog(ERROR, "tuple already updated by self"); break; case HeapTupleMayBeUpdated: /* done successfully */#ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, rTuple);#endif break; case HeapTupleBeingUpdated: /* ignore uncommitted tuples */ break; case HeapTupleUpdated: /* ignore just-committed tuples */ break; default: elog(ERROR, "unrecognized heap_update status: %u", result); break; } } } } heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it * until end of transaction (which is about to happen, anyway) to * ensure that notified backends see our tuple updates when they look. * Else they might disregard the signal, which would make the * application programmer very unhappy. */ heap_close(lRel, NoLock); ClearPendingNotifies(); if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: done");}/* *-------------------------------------------------------------- * AtAbort_Notify * * This is called at transaction abort. * * Gets rid of pending outbound notifies that we would have executed * if the transaction got committed. * * Results: * XXX * *-------------------------------------------------------------- */voidAtAbort_Notify(void){ ClearPendingNotifies();}/* *-------------------------------------------------------------- * Async_NotifyHandler * * This is the signal handler for SIGUSR2. * * If we are idle (notifyInterruptEnabled is set), we can safely invoke * ProcessIncomingNotify directly. Otherwise, just set a flag * to do it later. * * Results: * none * * Side effects: * per above *-------------------------------------------------------------- */voidAsync_NotifyHandler(SIGNAL_ARGS){ int save_errno = errno; /* * Note: this is a SIGNAL HANDLER. You must be very wary what you do * here. Some helpful soul had this routine sprinkled with TPRINTFs, * which would likely lead to corruption of stdio buffers if they were * ever turned on. */ /* Don't joggle the elbow of proc_exit */ if (proc_exit_inprogress) return; if (notifyInterruptEnabled) { bool save_ImmediateInterruptOK = ImmediateInterruptOK; /* * We may be called while ImmediateInterruptOK is true; turn it * off while messing with the NOTIFY state. (We would have to * save and restore it anyway, because PGSemaphore operations * inside ProcessIncomingNotify() might reset it.) */ ImmediateInterruptOK = false; /* * I'm not sure whether some flavors of Unix might allow another * SIGUSR2 occurrence to recursively interrupt this routine. To * cope with the possibility, we do the same sort of dance that * EnableNotifyInterrupt must do --- see that routine for * comments. */ notifyInterruptEnabled = 0; /* disable any recursive signal */ notifyInterruptOccurred = 1; /* do at least one iteration */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { /* Here, it is finally safe to do stuff. */ if (Trace_notify) elog(DEBUG1, "Async_NotifyHandler: perform async notify"); ProcessIncomingNotify(); if (Trace_notify) elog(DEBUG1, "Async_NotifyHandler: done"); } } /* * Restore ImmediateInterruptOK, and check for interrupts if * needed. */ ImmediateInterruptOK = save_ImmediateInterruptOK; if (save_ImmediateInterruptOK) CHECK_FOR_INTERRUPTS(); } else { /* * In this path it is NOT SAFE to do much of anything, except * this: */ notifyInterruptOccurred = 1; } errno = save_errno;}/* * -------------------------------------------------------------- * EnableNotifyInterrupt * * This is called by the PostgresMain main loop just before waiting * for a frontend command. If we are truly idle (ie, *not* inside * a transaction block), then process any pending inbound notifies, * and enable the signal handler to process future notifies directly. * * NOTE: the signal handler starts out disabled, and stays so until * PostgresMain calls this the first time. * -------------------------------------------------------------- */voidEnableNotifyInterrupt(void){ if (IsTransactionOrTransactionBlock()) return; /* not really idle */ /* * This code is tricky because we are communicating with a signal * handler that could interrupt us at any point. If we just checked * notifyInterruptOccurred and then set notifyInterruptEnabled, we * could fail to respond promptly to a signal that happens in between * those two steps. (A very small time window, perhaps, but Murphy's * Law says you can hit it...) Instead, we first set the enable flag, * then test the occurred flag. If we see an unserviced interrupt has * occurred, we re-clear the enable flag before going off to do the * service work. (That prevents re-entrant invocation of * ProcessIncomingNotify() if another interrupt occurs.) If an * interrupt comes in between the setting and clearing of * notifyInterruptEnabled, then it will have done the service work and * left notifyInterruptOccurred zero, so we have to check again after * clearing enable. The whole thing has to be in a loop in case * another interrupt occurs while we're servicing the first. Once we * get out of the loop, enable is set and we know there is no * unserviced interrupt. * * NB: an overenthusiastic optimizing compiler could easily break this * code. Hopefully, they all understand what "volatile" means these * days. */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { if (Trace_notify) elog(DEBUG1, "EnableNotifyInterrupt: perform async notify"); ProcessIncomingNotify(); if (Trace_notify) elog(DEBUG1, "EnableNotifyInterrupt: done"); } }}/* * -------------------------------------------------------------- * DisableNotifyInterrupt * * This is called by the PostgresMain main loop just after receiving * a frontend command. Signal handler execution of inbound notifies * is disabled until the next EnableNotifyInterrupt call. * -------------------------------------------------------------- */voidDisableNotifyInterrupt(void){ notifyInterruptEnabled = 0;}/* * -------------------------------------------------------------- * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the SIGUSR2 signal handler, * or the next time control reaches the outer idle loop. * Scan pg_listener for arriving notifies, report them to my front end, * and clear the notification field in pg_listener until next time. * * NOTE: since we are outside any transaction, we must create our own. * * Results: * XXX * * -------------------------------------------------------------- */static voidProcessIncomingNotify(void){ Relation lRel; TupleDesc tdesc; ScanKeyData key[1]; HeapScanDesc scan; HeapTuple lTuple, rTuple; Datum value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify"); set_ps_display("async_notify"); notifyInterruptOccurred = 0; StartTransactionCommand(); lRel = heap_openr(ListenerRelationName, ExclusiveLock); tdesc = RelationGetDescr(lRel); /* Scan only entries with my listenerPID */ ScanKeyEntryInitialize(&key[0], 0, Anum_pg_listener_pid, F_INT4EQ, Int32GetDatum(MyProcPid)); scan = heap_beginscan(lRel, SnapshotNow, 1, key); /* Prepare data for rewriting 0 into notification field */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; repl[Anum_pg_listener_notify - 1] = 'r'; value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); char *relname = NameStr(listener->relname); int32 sourcePID = listener->notification; if (sourcePID != 0) { /* Notify the frontend */ if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", relname, (int) sourcePID); NotifyMyFrontEnd(relname, sourcePID); /* * Rewrite the tuple with 0 in notification column. * * simple_heap_update is safe here because no one else would * have tried to UNLISTEN us, so there can be no uncommitted * changes. */ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); simple_heap_update(lRel, &lTuple->t_self, rTuple);#ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, rTuple);#endif } } heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it * until end of transaction (which is about to happen, anyway) to * ensure that other backends see our tuple updates when they look. * Otherwise, a transaction started after this one might mistakenly * think it doesn't need to send this backend a new NOTIFY. */ heap_close(lRel, NoLock); CommitTransactionCommand(); /* * Must flush the notify messages to ensure frontend gets them * promptly. */ pq_flush(); set_ps_display("idle"); if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify: done");}/* * Send NOTIFY message to my front end. */static voidNotifyMyFrontEnd(char *relname, int32 listenerPID){ if (whereToSendOutput == Remote) { StringInfoData buf; pq_beginmessage(&buf, 'A'); pq_sendint(&buf, listenerPID, sizeof(int32)); pq_sendstring(&buf, relname); if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* XXX Add parameter string here later */ pq_sendstring(&buf, ""); } pq_endmessage(&buf); /* * NOTE: we do not do pq_flush() here. For a self-notify, it will * happen at the end of the transaction, and for incoming notifies * ProcessIncomingNotify will do it after finding all the * notifies. */ } else elog(INFO, "NOTIFY for %s", relname);}/* Does pendingNotifies include the given relname? */static boolAsyncExistsPendingNotify(const char *relname){ List *p; foreach(p, pendingNotifies) { /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0) return true; } return false;}/* Clear the pendingNotifies list. */static voidClearPendingNotifies(void){ /* * We used to have to explicitly deallocate the list members and * nodes, because they were malloc'd. Now, since we know they are * palloc'd in TopTransactionContext, we need not do that --- they'll * go away automatically at transaction exit. We need only reset the * list head pointer. */ pendingNotifies = NIL;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?