📄 async.c
字号:
* * Results: * none * * Side effects: * per above *-------------------------------------------------------------- */voidNotifyInterruptHandler(SIGNAL_ARGS){ int save_errno = errno; /* * Note: this is a SIGNAL HANDLER. You must be very wary what you do * here. Some helpful soul had this routine sprinkled with TPRINTFs, which * would likely lead to corruption of stdio buffers if they were ever * turned on. */ /* Don't joggle the elbow of proc_exit */ if (proc_exit_inprogress) return; if (notifyInterruptEnabled) { bool save_ImmediateInterruptOK = ImmediateInterruptOK; /* * We may be called while ImmediateInterruptOK is true; turn it off * while messing with the NOTIFY state. (We would have to save and * restore it anyway, because PGSemaphore operations inside * ProcessIncomingNotify() might reset it.) */ ImmediateInterruptOK = false; /* * I'm not sure whether some flavors of Unix might allow another * SIGUSR2 occurrence to recursively interrupt this routine. To cope * with the possibility, we do the same sort of dance that * EnableNotifyInterrupt must do --- see that routine for comments. */ notifyInterruptEnabled = 0; /* disable any recursive signal */ notifyInterruptOccurred = 1; /* do at least one iteration */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { /* Here, it is finally safe to do stuff. */ if (Trace_notify) elog(DEBUG1, "NotifyInterruptHandler: perform async notify"); ProcessIncomingNotify(); if (Trace_notify) elog(DEBUG1, "NotifyInterruptHandler: done"); } } /* * Restore ImmediateInterruptOK, and check for interrupts if needed. */ ImmediateInterruptOK = save_ImmediateInterruptOK; if (save_ImmediateInterruptOK) CHECK_FOR_INTERRUPTS(); } else { /* * In this path it is NOT SAFE to do much of anything, except this: */ notifyInterruptOccurred = 1; } errno = save_errno;}/* * -------------------------------------------------------------- * EnableNotifyInterrupt * * This is called by the PostgresMain main loop just before waiting * for a frontend command. If we are truly idle (ie, *not* inside * a transaction block), then process any pending inbound notifies, * and enable the signal handler to process future notifies directly. * * NOTE: the signal handler starts out disabled, and stays so until * PostgresMain calls this the first time. * -------------------------------------------------------------- */voidEnableNotifyInterrupt(void){ if (IsTransactionOrTransactionBlock()) return; /* not really idle */ /* * This code is tricky because we are communicating with a signal handler * that could interrupt us at any point. If we just checked * notifyInterruptOccurred and then set notifyInterruptEnabled, we could * fail to respond promptly to a signal that happens in between those two * steps. (A very small time window, perhaps, but Murphy's Law says you * can hit it...) Instead, we first set the enable flag, then test the * occurred flag. If we see an unserviced interrupt has occurred, we * re-clear the enable flag before going off to do the service work. (That * prevents re-entrant invocation of ProcessIncomingNotify() if another * interrupt occurs.) If an interrupt comes in between the setting and * clearing of notifyInterruptEnabled, then it will have done the service * work and left notifyInterruptOccurred zero, so we have to check again * after clearing enable. The whole thing has to be in a loop in case * another interrupt occurs while we're servicing the first. Once we get * out of the loop, enable is set and we know there is no unserviced * interrupt. * * NB: an overenthusiastic optimizing compiler could easily break this * code. Hopefully, they all understand what "volatile" means these days. */ for (;;) { notifyInterruptEnabled = 1; if (!notifyInterruptOccurred) break; notifyInterruptEnabled = 0; if (notifyInterruptOccurred) { if (Trace_notify) elog(DEBUG1, "EnableNotifyInterrupt: perform async notify"); ProcessIncomingNotify(); if (Trace_notify) elog(DEBUG1, "EnableNotifyInterrupt: done"); } }}/* * -------------------------------------------------------------- * DisableNotifyInterrupt * * This is called by the PostgresMain main loop just after receiving * a frontend command. Signal handler execution of inbound notifies * is disabled until the next EnableNotifyInterrupt call. * * The SIGUSR1 signal handler also needs to call this, so as to * prevent conflicts if one signal interrupts the other. So we * must return the previous state of the flag. * -------------------------------------------------------------- */boolDisableNotifyInterrupt(void){ bool result = (notifyInterruptEnabled != 0); notifyInterruptEnabled = 0; return result;}/* * -------------------------------------------------------------- * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the SIGUSR2 signal handler, * or the next time control reaches the outer idle loop. * Scan pg_listener for arriving notifies, report them to my front end, * and clear the notification field in pg_listener until next time. * * NOTE: since we are outside any transaction, we must create our own. * -------------------------------------------------------------- */static voidProcessIncomingNotify(void){ Relation lRel; TupleDesc tdesc; ScanKeyData key[1]; HeapScanDesc scan; HeapTuple lTuple, rTuple; Datum value[Natts_pg_listener]; char repl[Natts_pg_listener], nulls[Natts_pg_listener]; bool catchup_enabled; /* Must prevent SIGUSR1 interrupt while I am running */ catchup_enabled = DisableCatchupInterrupt(); if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify"); set_ps_display("notify interrupt"); notifyInterruptOccurred = 0; StartTransactionCommand(); lRel = heap_open(ListenerRelationId, ExclusiveLock); tdesc = RelationGetDescr(lRel); /* Scan only entries with my listenerPID */ ScanKeyInit(&key[0], Anum_pg_listener_pid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(MyProcPid)); scan = heap_beginscan(lRel, SnapshotNow, 1, key); /* Prepare data for rewriting 0 into notification field */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; repl[Anum_pg_listener_notify - 1] = 'r'; value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(0); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); char *relname = NameStr(listener->relname); int32 sourcePID = listener->notification; if (sourcePID != 0) { /* Notify the frontend */ if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", relname, (int) sourcePID); NotifyMyFrontEnd(relname, sourcePID); /* * Rewrite the tuple with 0 in notification column. * * simple_heap_update is safe here because no one else would have * tried to UNLISTEN us, so there can be no uncommitted changes. */ rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl); simple_heap_update(lRel, &lTuple->t_self, rTuple);#ifdef NOT_USED /* currently there are no indexes */ CatalogUpdateIndexes(lRel, rTuple);#endif } } heap_endscan(scan); /* * We do NOT release the lock on pg_listener here; we need to hold it * until end of transaction (which is about to happen, anyway) to ensure * that other backends see our tuple updates when they look. Otherwise, a * transaction started after this one might mistakenly think it doesn't * need to send this backend a new NOTIFY. */ heap_close(lRel, NoLock); CommitTransactionCommand(); /* * Must flush the notify messages to ensure frontend gets them promptly. */ pq_flush(); set_ps_display("idle"); if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify: done"); if (catchup_enabled) EnableCatchupInterrupt();}/* * Send NOTIFY message to my front end. */static voidNotifyMyFrontEnd(char *relname, int32 listenerPID){ if (whereToSendOutput == DestRemote) { StringInfoData buf; pq_beginmessage(&buf, 'A'); pq_sendint(&buf, listenerPID, sizeof(int32)); pq_sendstring(&buf, relname); if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* XXX Add parameter string here later */ pq_sendstring(&buf, ""); } pq_endmessage(&buf); /* * NOTE: we do not do pq_flush() here. For a self-notify, it will * happen at the end of the transaction, and for incoming notifies * ProcessIncomingNotify will do it after finding all the notifies. */ } else elog(INFO, "NOTIFY for %s", relname);}/* Does pendingNotifies include the given relname? */static boolAsyncExistsPendingNotify(const char *relname){ ListCell *p; foreach(p, pendingNotifies) { const char *prelname = (const char *) lfirst(p); if (strcmp(prelname, relname) == 0) return true; } return false;}/* Clear the pendingNotifies list. */static voidClearPendingNotifies(void){ /* * We used to have to explicitly deallocate the list members and nodes, * because they were malloc'd. Now, since we know they are palloc'd in * CurTransactionContext, we need not do that --- they'll go away * automatically at transaction exit. We need only reset the list head * pointer. */ pendingNotifies = NIL;}/* * 2PC processing routine for COMMIT PREPARED case. * * (We don't have to do anything for ROLLBACK PREPARED.) */voidnotify_twophase_postcommit(TransactionId xid, uint16 info, void *recdata, uint32 len){ /* * Set up to issue the NOTIFY at the end of my own current transaction. * (XXX this has some issues if my own transaction later rolls back, or if * there is any significant delay before I commit. OK for now because we * disallow COMMIT PREPARED inside a transaction block.) */ Async_Notify((char *) recdata);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -