📄 async.c
字号:
bool isnull; char *relname; int32 listenerPID; if (!pendingNotifies) return; /* no NOTIFY statements in this * transaction */ /* * NOTIFY is disabled if not normal processing mode. This test used to * be in xact.c, but it seems cleaner to do it here. */ if (!IsNormalProcessingMode()) { ClearPendingNotifies(); return; } TPRINTF(TRACE_NOTIFY, "AtCommit_Notify"); lRel = heap_openr(ListenerRelationName); LockRelation(lRel, AccessExclusiveLock); tdesc = RelationGetDescr(lRel); sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL); /* preset data to update notify column to MyProcPid */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; repl[Anum_pg_listener_notify - 1] = 'r'; value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid); while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) { d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull); relname = (char *) DatumGetPointer(d); if (AsyncExistsPendingNotify(relname)) { d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull); listenerPID = DatumGetInt32(d); if (listenerPID == MyProcPid) { /* * Self-notify: no need to bother with table update. * Indeed, we *must not* clear the notification field in * this path, or we could lose an outside notify, which'd * be bad for applications that ignore self-notify * messages. */ TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self"); NotifyMyFrontEnd(relname, listenerPID); } else { TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d", listenerPID); /* * If someone has already notified this listener, we don't * bother modifying the table, but we do still send a * SIGUSR2 signal, just in case that backend missed the * earlier signal for some reason. It's OK to send the * signal first, because the other guy can't read * pg_listener until we unlock it. */ if (kill(listenerPID, SIGUSR2) < 0) { /* * Get rid of pg_listener entry if it refers to a PID * that no longer exists. Presumably, that backend * crashed without deleting its pg_listener entries. * This code used to only delete the entry if * errno==ESRCH, but as far as I can see we should * just do it for any failure (certainly at least for * EPERM too...) */ heap_delete(lRel, &lTuple->t_self, NULL); } else { d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull); if (DatumGetInt32(d) == 0) { rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); heap_replace(lRel, &lTuple->t_self, rTuple, NULL); } } } } } heap_endscan(sRel); /* * We do not do RelationUnsetLockForWrite(lRel) here, because the * transaction is about to be committed anyway. */ heap_close(lRel); ClearPendingNotifies(); TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");}/* *-------------------------------------------------------------- * AtAbort_Notify * * This is called at transaction abort. * * Gets rid of pending outbound notifies that we would have executed * if the transaction got committed. * * Results: * XXX * *-------------------------------------------------------------- */voidAtAbort_Notify(){ ClearPendingNotifies();}/* *-------------------------------------------------------------- * Async_NotifyHandler * * This is the signal handler for SIGUSR2. * * If we are idle (notifyInterruptEnabled is set), we can safely invoke * ProcessIncomingNotify directly. Otherwise, just set a flag * to do it later. * * Results: * none * * Side effects: * per above *-------------------------------------------------------------- */voidAsync_NotifyHandler(SIGNAL_ARGS){ /* * Note: this is a SIGNAL HANDLER. You must be very wary what you do * here. Some helpful soul had this routine sprinkled with TPRINTFs, * which would likely lead to corruption of stdio buffers if they were * ever turned on. */ if (notifyInterruptEnabled) { /* * I'm not sure whether some flavors of Unix might allow another * SIGUSR2 occurrence to recursively interrupt this routine. To * cope with the possibility, we do the same sort of dance that * EnableNotifyInterrupt must do --- see that routine for * comments. */ notifyInterruptEnabled = 0; /* disable any recursive signal */ notifyInterruptOccurred = 1; /* do at least one iteration */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { /* Here, it is finally safe to do stuff. */ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: perform async notify"); ProcessIncomingNotify(); TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done"); } } } else { /* * In this path it is NOT SAFE to do much of anything, except * this: */ notifyInterruptOccurred = 1; }}/* * -------------------------------------------------------------- * EnableNotifyInterrupt * * This is called by the PostgresMain main loop just before waiting * for a frontend command. If we are truly idle (ie, *not* inside * a transaction block), then process any pending inbound notifies, * and enable the signal handler to process future notifies directly. * * NOTE: the signal handler starts out disabled, and stays so until * PostgresMain calls this the first time. * -------------------------------------------------------------- */voidEnableNotifyInterrupt(void){ if (CurrentTransactionState->blockState != TRANS_DEFAULT) return; /* not really idle */ /* * This code is tricky because we are communicating with a signal * handler that could interrupt us at any point. If we just checked * notifyInterruptOccurred and then set notifyInterruptEnabled, we * could fail to respond promptly to a signal that happens in between * those two steps. (A very small time window, perhaps, but Murphy's * Law says you can hit it...) Instead, we first set the enable flag, * then test the occurred flag. If we see an unserviced interrupt has * occurred, we re-clear the enable flag before going off to do the * service work. (That prevents re-entrant invocation of * ProcessIncomingNotify() if another interrupt occurs.) If an * interrupt comes in between the setting and clearing of * notifyInterruptEnabled, then it will have done the service work and * left notifyInterruptOccurred zero, so we have to check again after * clearing enable. The whole thing has to be in a loop in case * another interrupt occurs while we're servicing the first. Once we * get out of the loop, enable is set and we know there is no * unserviced interrupt. * * NB: an overenthusiastic optimizing compiler could easily break this * code. Hopefully, they all understand what "volatile" means these * days. */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: perform async notify"); ProcessIncomingNotify(); TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done"); } }}/* * -------------------------------------------------------------- * DisableNotifyInterrupt * * This is called by the PostgresMain main loop just after receiving * a frontend command. Signal handler execution of inbound notifies * is disabled until the next EnableNotifyInterrupt call. * -------------------------------------------------------------- */voidDisableNotifyInterrupt(void){ notifyInterruptEnabled = 0;}/* * -------------------------------------------------------------- * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the SIGUSR2 signal handler, * or the next time control reaches the outer idle loop. * Scan pg_listener for arriving notifies, report them to my front end, * and clear the notification field in pg_listener until next time. * * NOTE: since we are outside any transaction, we must create our own. * * Results: * XXX * * -------------------------------------------------------------- */static voidProcessIncomingNotify(void){ Relation lRel; TupleDesc tdesc; ScanKeyData key[1]; HeapScanDesc sRel; HeapTuple lTuple, rTuple; Datum d, value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; bool isnull; char *relname; int32 sourcePID; TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify"); PS_SET_STATUS("async_notify"); notifyInterruptOccurred = 0; StartTransactionCommand(); lRel = heap_openr(ListenerRelationName); LockRelation(lRel, AccessExclusiveLock); tdesc = RelationGetDescr(lRel); /* Scan only entries with my listenerPID */ ScanKeyEntryInitialize(&key[0], 0, Anum_pg_listener_pid, F_INT4EQ, Int32GetDatum(MyProcPid)); sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key); /* Prepare data for rewriting 0 into notification field */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; repl[Anum_pg_listener_notify - 1] = 'r'; value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) { d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull); sourcePID = DatumGetInt32(d); if (sourcePID != 0) { d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull); relname = (char *) DatumGetPointer(d); /* Notify the frontend */ TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d", relname, (int) sourcePID); NotifyMyFrontEnd(relname, sourcePID); /* Rewrite the tuple with 0 in notification column */ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); heap_replace(lRel, &lTuple->t_self, rTuple, NULL); } } heap_endscan(sRel); /* * We do not do RelationUnsetLockForWrite(lRel) here, because the * transaction is about to be committed anyway. */ heap_close(lRel); CommitTransactionCommand(); /* * Must flush the notify messages to ensure frontend gets them * promptly. */ pq_flush(); PS_SET_STATUS("idle"); TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");}/* Send NOTIFY message to my front end. */static voidNotifyMyFrontEnd(char *relname, int32 listenerPID){ if (whereToSendOutput == Remote) { StringInfoData buf; pq_beginmessage(&buf); pq_sendbyte(&buf, 'A'); pq_sendint(&buf, listenerPID, sizeof(int32)); pq_sendstring(&buf, relname); pq_endmessage(&buf); /* * NOTE: we do not do pq_flush() here. For a self-notify, it will * happen at the end of the transaction, and for incoming notifies * ProcessIncomingNotify will do it after finding all the * notifies. */ } else elog(NOTICE, "NOTIFY for %s", relname);}/* Does pendingNotifies include the given relname? * * NB: not called unless pendingNotifies != NULL. */static intAsyncExistsPendingNotify(char *relname){ Dlelem *p; for (p = DLGetHead(pendingNotifies); p != NULL; p = DLGetSucc(p)) { /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN)) return 1; } return 0;}/* Clear the pendingNotifies list. */static voidClearPendingNotifies(){ Dlelem *p; if (pendingNotifies) { /* * Since the referenced strings are malloc'd, we have to scan the * list and delete them individually. If we used palloc for the * strings then we could just do DLFreeList to get rid of both the * list nodes and the list base... */ while ((p = DLRemHead(pendingNotifies)) != NULL) { free(DLE_VAL(p)); DLFreeElem(p); } DLFreeList(pendingNotifies); pendingNotifies = NULL; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -