📄 pgstat.c
字号:
return NULL; /* * Now inside the DB's table hash table lookup the requested one. */ if (dbentry->tables == NULL) return NULL; tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, (void *) &relid, HASH_FIND, NULL); if (tabentry == NULL) return NULL; return tabentry;}/* ---------- * pgstat_fetch_stat_beentry() - * * Support function for the SQL-callable pgstat* functions. Returns * the actual activity slot of one active backend. The caller is * responsible for a check if the actual user is permitted to see * that info (especially the querystring). * ---------- */PgStat_StatBeEntry *pgstat_fetch_stat_beentry(int beid){ if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId())) { pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId, &pgStatBeTable, &pgStatNumBackends); pgStatDBHashXact = GetCurrentTransactionId(); } if (beid < 1 || beid > pgStatNumBackends) return NULL; return &pgStatBeTable[beid - 1];}/* ---------- * pgstat_fetch_stat_numbackends() - * * Support function for the SQL-callable pgstat* functions. Returns * the maximum current backend id. * ---------- */intpgstat_fetch_stat_numbackends(void){ if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId())) { pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId, &pgStatBeTable, &pgStatNumBackends); pgStatDBHashXact = GetCurrentTransactionId(); } return pgStatNumBackends;}/* ------------------------------------------------------------ * Local support functions follow * ------------------------------------------------------------ *//* ---------- * pgstat_setheader() - * * Set common header fields in a statistics message * ---------- */static voidpgstat_setheader(PgStat_MsgHdr *hdr, int mtype){ hdr->m_type = mtype; hdr->m_backendid = MyBackendId; hdr->m_procpid = MyProcPid; hdr->m_databaseid = MyDatabaseId; hdr->m_userid = GetSessionUserId();}/* ---------- * pgstat_send() - * * Send out one statistics message to the collector * ---------- */static voidpgstat_send(void *msg, int len){ if (pgStatSock < 0) return; ((PgStat_MsgHdr *) msg)->m_size = len; send(pgStatSock, msg, len, 0); /* We deliberately ignore any error from send() */}/* ------------------------------------------------------------ * Local functions implementing the statistics collector itself follow *------------------------------------------------------------ *//* ---------- * pgstat_main() - * * Start up the statistics collector itself. This is the body of the * postmaster child process. * ---------- */static voidpgstat_main(void){ PgStat_Msg msg; fd_set rfds; int readPipe; int pmPipe = pgStatPmPipe[0]; int maxfd; int nready; int len = 0; struct timeval timeout; struct timeval next_statwrite; bool need_statwrite; HASHCTL hash_ctl; /* * Close the writing end of the postmaster pipe, so we'll see it * closing when the postmaster terminates and can terminate as well. */ closesocket(pgStatPmPipe[1]); pgStatPmPipe[1] = -1; /* * Ignore all signals usually bound to some action in the postmaster, * except for SIGCHLD --- see pgstat_recvbuffer. */ pqsignal(SIGHUP, SIG_IGN); pqsignal(SIGINT, SIG_IGN); pqsignal(SIGTERM, SIG_IGN); pqsignal(SIGQUIT, SIG_IGN); pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, SIG_IGN); pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGCHLD, pgstat_die); pqsignal(SIGTTIN, SIG_DFL); pqsignal(SIGTTOU, SIG_DFL); pqsignal(SIGCONT, SIG_DFL); pqsignal(SIGWINCH, SIG_DFL); /* * Start a buffering process to read from the socket, so we have a * little more time to process incoming messages. * * NOTE: the process structure is: postmaster is parent of buffer process * is parent of collector process. This way, the buffer can detect * collector failure via SIGCHLD, whereas otherwise it wouldn't notice * collector failure until it tried to write on the pipe. That would * mean that after the postmaster started a new collector, we'd have * two buffer processes competing to read from the UDP socket --- not * good. */ if (pipe(pgStatPipe) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not create pipe for statistics buffer: %m"))); exit(1); } switch (fork()) { case -1: ereport(LOG, (errmsg("could not fork statistics collector: %m"))); exit(1); case 0: /* child becomes collector process */ closesocket(pgStatPipe[1]); closesocket(pgStatSock); break; default: /* parent becomes buffer process */ closesocket(pgStatPipe[0]); pgstat_recvbuffer(); exit(0); } /* * In the child we can have default SIGCHLD handling (in case we want * to call system() here...) */ pqsignal(SIGCHLD, SIG_DFL); MyProcPid = getpid(); /* reset MyProcPid */ /* * Identify myself via ps */ init_ps_display("stats collector process", "", ""); set_ps_display(""); /* * Arrange to write the initial status file right away */ gettimeofday(&next_statwrite, NULL); need_statwrite = TRUE; /* * Read in an existing statistics stats file or initialize the stats * to zero. */ pgStatRunningInCollector = TRUE; pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL); /* * Create the dead backend hashtable */ memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(int); hash_ctl.entrysize = sizeof(PgStat_StatBeDead); hash_ctl.hash = tag_hash; pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION); if (pgStatBeDead == NULL) { /* assume the problem is out-of-memory */ ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } /* * Create the known backends table */ pgStatBeTable = (PgStat_StatBeEntry *) malloc( sizeof(PgStat_StatBeEntry) * MaxBackends); if (pgStatBeTable == NULL) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends); readPipe = pgStatPipe[0]; /* * Process incoming messages and handle all the reporting stuff until * there are no more messages. */ for (;;) { /* * If we need to write the status file again (there have been * changes in the statistics since we wrote it last) calculate the * timeout until we have to do so. */ if (need_statwrite) { struct timeval now; gettimeofday(&now, NULL); /* avoid assuming that tv_sec is signed */ if (now.tv_sec > next_statwrite.tv_sec || (now.tv_sec == next_statwrite.tv_sec && now.tv_usec >= next_statwrite.tv_usec)) { timeout.tv_sec = 0; timeout.tv_usec = 0; } else { timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec; timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec; if (timeout.tv_usec < 0) { timeout.tv_sec--; timeout.tv_usec += 1000000; } } } /* * Setup the descriptor set for select(2) */ FD_ZERO(&rfds); FD_SET(readPipe, &rfds); FD_SET(pmPipe, &rfds); if (readPipe > pmPipe) maxfd = readPipe; else maxfd = pmPipe; /* * Now wait for something to do. */ nready = select(maxfd + 1, &rfds, NULL, NULL, (need_statwrite) ? &timeout : NULL); if (nready < 0) { if (errno == EINTR) continue; ereport(LOG, (errcode_for_socket_access(), errmsg("select() failed in statistics collector: %m"))); exit(1); } /* * If there are no descriptors ready, our timeout for writing the * stats file happened. */ if (nready == 0) { pgstat_write_statsfile(); need_statwrite = FALSE; continue; } /* * Check if there is a new statistics message to collect. */ if (FD_ISSET(readPipe, &rfds)) { /* * We may need to issue multiple read calls in case the buffer * process didn't write the message in a single write, which * is possible since it dumps its buffer bytewise. In any * case, we'd need two reads since we don't know the message * length initially. */ int nread = 0; int targetlen = sizeof(PgStat_MsgHdr); /* initial */ while (nread < targetlen) { len = read(readPipe, ((char *) &msg) + nread, targetlen - nread); if (len < 0) { if (errno == EINTR) continue; ereport(LOG, (errcode_for_socket_access(), errmsg("could not read from statistics collector pipe: %m"))); exit(1); } if (len == 0) /* EOF on the pipe! */ break; nread += len; if (nread == sizeof(PgStat_MsgHdr)) { /* we have the header, compute actual msg length */ targetlen = msg.msg_hdr.m_size; if (targetlen < (int) sizeof(PgStat_MsgHdr) || targetlen > (int) sizeof(msg)) { /* * Bogus message length implies that we got out of * sync with the buffer process somehow. Abort so * that we can restart both processes. */ ereport(LOG, (errmsg("invalid statistics message length"))); exit(1); } } } /* * EOF on the pipe implies that the buffer process exited. * Fall out of outer loop. */ if (len == 0) break; /* * Distribute the message to the specific function handling * it. */ switch (msg.msg_hdr.m_type) { case PGSTAT_MTYPE_DUMMY: break; case PGSTAT_MTYPE_BESTART: pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread); break; case PGSTAT_MTYPE_BETERM: pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread); break; case PGSTAT_MTYPE_TABSTAT: pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread); break; case PGSTAT_MTYPE_TABPURGE: pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread); break; case PGSTAT_MTYPE_ACTIVITY: pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread); break; case PGSTAT_MTYPE_DROPDB: pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread); break; case PGSTAT_MTYPE_RESETCOUNTER: pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg, nread); break; default: break; } /* * Globally count messages. */ pgStatNumMessages++; /* * If this is the first message after we wrote the stats file * the last time, setup the timeout that it'd be written. */ if (!need_statwrite) { gettimeofday(&next_statwrite, NULL); next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000); next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000); next_statwrite.tv_usec %= 1000000; need_statwrite = TRUE; } } /* * Note that we do NOT check for postmaster exit inside the loop; * only EOF on the buffer pipe causes us to fall out. This * ensures we don't exit prematurely if there are still a few * messages in the buffer or pipe at postmaster shutdown. */ } /* * Okay, we saw EOF on the buffer pipe, so there are no more messages * to process. If the buffer process quit because of postmaster * shutdown, we want to save the final stats to reuse at next startup. * But if the buffer process failed, it seems best not to (there may * even now be a new collector firing up, and we don't want it to read * a partially- rewritten stats file). We can tell whether the * postmaster is still alive by checking to see if the postmaster pipe * is still open. If it is read-ready (ie, EOF), the postmaster must * have quit. */ if (FD_ISSET(pmPipe, &rfds)) pgstat_write_statsfile();}/* ---------- * pgstat_recvbuffer() - * * This is the body of the separate buffering process. Its only * purpose is to receive messages from the UDP socket as fast as * possible and forward them over a pipe into the collector itself. * If the collector is slow to absorb messages, they are buffered here. * ---------- */static voidpgstat_recvbuffer(void){ fd_set rfds; fd_set wfds; int writePipe = pgStatPipe[1]; int pmPipe = pgStatPmPipe[0]; int maxfd; int nready; int len; int xfr; int frm; PgStat_Msg input_buffer; char *msgbuffer; int msg_send = 0; /* next send index in buffer */ int msg_recv = 0; /* next receive index */ int msg_have = 0; /* number of bytes stored */ bool overflow = false; /* * Identify myself via ps */ init_ps_display("stats buffer process", "", ""); set_ps_display(""); /* * We want to die if our child collector process does. There are two * ways we might notice that it has died: receive SIGCHLD, or get a * write failure on the pipe leading to the child. We can set SIGPIPE * to kill us here. Our SIGCHLD handler was already set up before we * forked (must do it that way, else it's a race condition). */ pqsignal(SIGPIPE, SIG_DFL); PG_SETMASK(&UnBlockSig); /* * Set the write pipe to nonblock mode, so that we cannot block when * the collector falls behind. */ if (FCNTL_NONBLOCK(writePipe) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not set statistics collector pipe to nonblocking mode: %m"))); exit(1); } /* * Allocate the message buffer */ msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ); if (msgbuffer == NULL) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } /* * Loop forever */ for (;;) { FD_ZERO(&rfds); FD_ZERO(&wfds); maxfd = -1; /* * As long as we have buffer space we add the socket to the read * descriptor set.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -