📄 pgstat.c
字号:
closesocket(pgStatSock); /* * Identify myself via ps */ init_ps_display("stats collector process", "", ""); set_ps_display(""); /* * Arrange to write the initial status file right away */ gettimeofday(&next_statwrite, NULL); need_statwrite = TRUE; /* * Read in an existing statistics stats file or initialize the stats to * zero. */ pgStatRunningInCollector = TRUE; pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL); /* * Create the dead backend hashtable */ memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(int); hash_ctl.entrysize = sizeof(PgStat_StatBeDead); hash_ctl.hash = tag_hash; pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION); /* * Create the known backends table */ pgStatBeTable = (PgStat_StatBeEntry *) palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends); readPipe = pgStatPipe[0]; /* * Process incoming messages and handle all the reporting stuff until * there are no more messages. */ for (;;) { /* * If we need to write the status file again (there have been changes * in the statistics since we wrote it last) calculate the timeout * until we have to do so. */ if (need_statwrite) { struct timeval now; gettimeofday(&now, NULL); /* avoid assuming that tv_sec is signed */ if (now.tv_sec > next_statwrite.tv_sec || (now.tv_sec == next_statwrite.tv_sec && now.tv_usec >= next_statwrite.tv_usec)) { timeout.tv_sec = 0; timeout.tv_usec = 0; } else { timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec; timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec; if (timeout.tv_usec < 0) { timeout.tv_sec--; timeout.tv_usec += 1000000; } } } /* * Setup the descriptor set for select(2) */ FD_ZERO(&rfds); FD_SET(readPipe, &rfds); /* * Now wait for something to do. */ nready = select(readPipe + 1, &rfds, NULL, NULL, (need_statwrite) ? &timeout : NULL); if (nready < 0) { if (errno == EINTR) continue; ereport(ERROR, (errcode_for_socket_access(), errmsg("select() failed in statistics collector: %m"))); } /* * If there are no descriptors ready, our timeout for writing the * stats file happened. */ if (nready == 0) { pgstat_write_statsfile(); need_statwrite = FALSE; continue; } /* * Check if there is a new statistics message to collect. */ if (FD_ISSET(readPipe, &rfds)) { /* * We may need to issue multiple read calls in case the buffer * process didn't write the message in a single write, which is * possible since it dumps its buffer bytewise. In any case, we'd * need two reads since we don't know the message length * initially. */ int nread = 0; int targetlen = sizeof(PgStat_MsgHdr); /* initial */ bool pipeEOF = false; while (nread < targetlen) { len = piperead(readPipe, ((char *) &msg) + nread, targetlen - nread); if (len < 0) { if (errno == EINTR) continue; ereport(ERROR, (errcode_for_socket_access(), errmsg("could not read from statistics collector pipe: %m"))); } if (len == 0) /* EOF on the pipe! */ { pipeEOF = true; break; } nread += len; if (nread == sizeof(PgStat_MsgHdr)) { /* we have the header, compute actual msg length */ targetlen = msg.msg_hdr.m_size; if (targetlen < (int) sizeof(PgStat_MsgHdr) || targetlen > (int) sizeof(msg)) { /* * Bogus message length implies that we got out of * sync with the buffer process somehow. Abort so that * we can restart both processes. */ ereport(ERROR, (errmsg("invalid statistics message length"))); } } } /* * EOF on the pipe implies that the buffer process exited. Fall * out of outer loop. */ if (pipeEOF) break; /* * Distribute the message to the specific function handling it. */ switch (msg.msg_hdr.m_type) { case PGSTAT_MTYPE_DUMMY: break; case PGSTAT_MTYPE_BESTART: pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread); break; case PGSTAT_MTYPE_BETERM: pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread); break; case PGSTAT_MTYPE_TABSTAT: pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread); break; case PGSTAT_MTYPE_TABPURGE: pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread); break; case PGSTAT_MTYPE_ACTIVITY: pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread); break; case PGSTAT_MTYPE_DROPDB: pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread); break; case PGSTAT_MTYPE_RESETCOUNTER: pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg, nread); break; case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread); break; case PGSTAT_MTYPE_VACUUM: pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread); break; case PGSTAT_MTYPE_ANALYZE: pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread); break; default: break; } /* * Globally count messages. */ pgStatNumMessages++; /* * If this is the first message after we wrote the stats file the * last time, setup the timeout that it'd be written. */ if (!need_statwrite) { gettimeofday(&next_statwrite, NULL); next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000); next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000); next_statwrite.tv_usec %= 1000000; need_statwrite = TRUE; } } /* * Note that we do NOT check for postmaster exit inside the loop; only * EOF on the buffer pipe causes us to fall out. This ensures we * don't exit prematurely if there are still a few messages in the * buffer or pipe at postmaster shutdown. */ } /* * Okay, we saw EOF on the buffer pipe, so there are no more messages to * process. If the buffer process quit because of postmaster shutdown, we * want to save the final stats to reuse at next startup. But if the * buffer process failed, it seems best not to (there may even now be a * new collector firing up, and we don't want it to read a * partially-rewritten stats file). */ if (!PostmasterIsAlive(false)) pgstat_write_statsfile();}/* ---------- * pgstat_recvbuffer() - * * This is the body of the separate buffering process. Its only * purpose is to receive messages from the UDP socket as fast as * possible and forward them over a pipe into the collector itself. * If the collector is slow to absorb messages, they are buffered here. * ---------- */static voidpgstat_recvbuffer(void){ fd_set rfds; fd_set wfds; struct timeval timeout; int writePipe = pgStatPipe[1]; int maxfd; int nready; int len; int xfr; int frm; PgStat_Msg input_buffer; char *msgbuffer; int msg_send = 0; /* next send index in buffer */ int msg_recv = 0; /* next receive index */ int msg_have = 0; /* number of bytes stored */ bool overflow = false; /* * Identify myself via ps */ init_ps_display("stats buffer process", "", ""); set_ps_display(""); /* * We want to die if our child collector process does. There are two ways * we might notice that it has died: receive SIGCHLD, or get a write * failure on the pipe leading to the child. We can set SIGPIPE to kill * us here. Our SIGCHLD handler was already set up before we forked (must * do it that way, else it's a race condition). */ pqsignal(SIGPIPE, SIG_DFL); PG_SETMASK(&UnBlockSig); /* * Set the write pipe to nonblock mode, so that we cannot block when the * collector falls behind. */ if (!pg_set_noblock(writePipe)) ereport(ERROR, (errcode_for_socket_access(), errmsg("could not set statistics collector pipe to nonblocking mode: %m"))); /* * Allocate the message buffer */ msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ); /* * Loop forever */ for (;;) { FD_ZERO(&rfds); FD_ZERO(&wfds); maxfd = -1; /* * As long as we have buffer space we add the socket to the read * descriptor set. */ if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg))) { FD_SET(pgStatSock, &rfds); maxfd = pgStatSock; overflow = false; } else { if (!overflow) { ereport(LOG, (errmsg("statistics buffer is full"))); overflow = true; } } /* * If we have messages to write out, we add the pipe to the write * descriptor set. */ if (msg_have > 0) { FD_SET(writePipe, &wfds); if (writePipe > maxfd) maxfd = writePipe; } /* * Wait for some work to do; but not for more than 10 seconds. (This * determines how quickly we will shut down after an ungraceful * postmaster termination; so it needn't be very fast.) */ timeout.tv_sec = 10; timeout.tv_usec = 0; nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout); if (nready < 0) { if (errno == EINTR) continue; ereport(ERROR, (errcode_for_socket_access(), errmsg("select() failed in statistics buffer: %m"))); } /* * If there is a message on the socket, read it and check for * validity. */ if (FD_ISSET(pgStatSock, &rfds)) { len = recv(pgStatSock, (char *) &input_buffer, sizeof(PgStat_Msg), 0); if (len < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("could not read statistics message: %m"))); /* * We ignore messages that are smaller than our common header */ if (len < sizeof(PgStat_MsgHdr)) continue; /* * The received length must match the length in the header */ if (input_buffer.msg_hdr.m_size != len) continue; /* * O.K. - we accept this message. Copy it to the circular * msgbuffer. */ frm = 0; while (len > 0) { xfr = PGSTAT_RECVBUFFERSZ - msg_recv; if (xfr > len) xfr = len; Assert(xfr > 0); memcpy(msgbuffer + msg_recv, ((char *) &input_buffer) + frm, xfr); msg_recv += xfr; if (msg_recv == PGSTAT_RECVBUFFERSZ) msg_recv = 0; msg_have += xfr; frm += xfr; len -= xfr; } } /* * If the collector is ready to receive, write some data into his * pipe. We may or may not be able to write all that we have. * * NOTE: if what we have is less than PIPE_BUF bytes but more than the * space available in the pipe buffer, most kernels will refuse to * write any of it, and will return EAGAIN. This means we will * busy-loop until the situation changes (either because the collector * caught up, or because more data arrives so that we have more than * PIPE_BUF bytes buffered). This is not good, but is there any way * around it? We have no way to tell when the collector has caught * up... */ if (FD_ISSET(writePipe, &wfds)) { xfr = PGSTAT_RECVBUFFERSZ - msg_send; if (xfr > msg_have) xfr = msg_have; Assert(xfr > 0); len = pipewrite(writePipe, msgbuffer + msg_send, xfr); if (len < 0) { if (errno == EINTR || errno == EAGAIN) continue; /* not enough space in pipe */ ereport(ERROR, (errcode_for_socket_access(), errmsg("could not write to statistics collector pipe: %m"))); } /* NB: len < xfr is okay */ msg_send += len; if (msg_send == PGSTAT_RECVBUFFERSZ) msg_send = 0; msg_have -= len; } /* * Make sure we forwarded all messages before we check for postmaster * termination. */ if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds)) continue; /* * If the postmaster has terminated, we die too. (This is no longer * the normal exit path, however.) */ if (!PostmasterIsAlive(true)) exit(0); }}/* SIGQUIT signal handler for buffer process */static voidpgstat_exit(SIGNAL_ARGS){ /* * For now, we just nail the doors shut and get out of town. It might be * cleaner to allow any pending messages to be sent, but that creates a * tradeoff against speed of exit. */ /* * If running in bufferer, kill our collector as well. On some broken * win32 systems, it does not shut down automatically because of issues * with socket inheritance. XXX so why not fix the socket inheritance... */#ifdef WIN32 if (pgStatCollectorPid > 0) kill(pgStatCollectorPid, SIGQUIT);#endif exit(0);}/* SIGCHLD signal handler for buffer process */static voidpgstat_die(SIGNAL_ARGS){ exit(1);}/* ---------- * pgstat_add_backend() - * * Support function to keep our backend list up to date. * ---------- */static intpgstat_add_backend(PgStat_MsgHdr *msg){ PgStat_StatBeEntry *beentry; PgStat_StatBeDead *deadbe; /* * Check that the backend ID is valid */ if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends) { ereport(LOG, (errmsg("invalid server process ID %d", msg->m_backendid))); return -1; } /* * Get the slot for this backendid. */ beentry = &pgStatBeTable[msg->m_backendid - 1];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -