📄 pgstat.c
字号:
*/ if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg))) { FD_SET(pgStatSock, &rfds); maxfd = pgStatSock; overflow = false; } else { if (!overflow) { ereport(LOG, (errmsg("statistics buffer is full"))); overflow = true; } } /* * If we have messages to write out, we add the pipe to the write * descriptor set. Otherwise, we check if the postmaster might * have terminated. */ if (msg_have > 0) { FD_SET(writePipe, &wfds); if (writePipe > maxfd) maxfd = writePipe; } else { FD_SET(pmPipe, &rfds); if (pmPipe > maxfd) maxfd = pmPipe; } /* * Wait for some work to do. */ nready = select(maxfd + 1, &rfds, &wfds, NULL, NULL); if (nready < 0) { if (errno == EINTR) continue; ereport(LOG, (errcode_for_socket_access(), errmsg("select() failed in statistics buffer: %m"))); exit(1); } /* * If there is a message on the socket, read it and check for * validity. */ if (FD_ISSET(pgStatSock, &rfds)) { len = recv(pgStatSock, (char *) &input_buffer, sizeof(PgStat_Msg), 0); if (len < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not read statistics message: %m"))); exit(1); } /* * We ignore messages that are smaller than our common header */ if (len < sizeof(PgStat_MsgHdr)) continue; /* * The received length must match the length in the header */ if (input_buffer.msg_hdr.m_size != len) continue; /* * O.K. - we accept this message. Copy it to the circular * msgbuffer. */ frm = 0; while (len > 0) { xfr = PGSTAT_RECVBUFFERSZ - msg_recv; if (xfr > len) xfr = len; Assert(xfr > 0); memcpy(msgbuffer + msg_recv, ((char *) &input_buffer) + frm, xfr); msg_recv += xfr; if (msg_recv == PGSTAT_RECVBUFFERSZ) msg_recv = 0; msg_have += xfr; frm += xfr; len -= xfr; } } /* * If the collector is ready to receive, write some data into his * pipe. We may or may not be able to write all that we have. * * NOTE: if what we have is less than PIPE_BUF bytes but more than * the space available in the pipe buffer, most kernels will * refuse to write any of it, and will return EAGAIN. This means * we will busy-loop until the situation changes (either because * the collector caught up, or because more data arrives so that * we have more than PIPE_BUF bytes buffered). This is not good, * but is there any way around it? We have no way to tell when * the collector has caught up... */ if (FD_ISSET(writePipe, &wfds)) { xfr = PGSTAT_RECVBUFFERSZ - msg_send; if (xfr > msg_have) xfr = msg_have; Assert(xfr > 0); len = write(writePipe, msgbuffer + msg_send, xfr); if (len < 0) { if (errno == EINTR || errno == EAGAIN) continue; /* not enough space in pipe */ ereport(LOG, (errcode_for_socket_access(), errmsg("could not write to statistics collector pipe: %m"))); exit(1); } /* NB: len < xfr is okay */ msg_send += len; if (msg_send == PGSTAT_RECVBUFFERSZ) msg_send = 0; msg_have -= len; } /* * Make sure we forwarded all messages before we check for * Postmaster termination. */ if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds)) continue; /* * If the pipe from the postmaster is ready for reading, the * kernel must have closed it on exit() (the postmaster never * really writes to it). So we've done our job. */ if (FD_ISSET(pmPipe, &rfds)) exit(0); }}static voidpgstat_die(SIGNAL_ARGS){ exit(1);}/* ---------- * pgstat_add_backend() - * * Support function to keep our backend list up to date. * ---------- */static intpgstat_add_backend(PgStat_MsgHdr *msg){ PgStat_StatDBEntry *dbentry; PgStat_StatBeEntry *beentry; PgStat_StatBeDead *deadbe; bool found; /* * Check that the backend ID is valid */ if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends) { ereport(LOG, (errmsg("invalid server process ID %d", msg->m_backendid))); return -1; } /* * Get the slot for this backendid. */ beentry = &pgStatBeTable[msg->m_backendid - 1]; if (beentry->databaseid != InvalidOid) { /* * If the slot contains the PID of this backend, everything is * fine and we got nothing to do. */ if (beentry->procpid == msg->m_procpid) return 0; } /* * Lookup if this backend is known to be dead. This can be caused due * to messages arriving in the wrong order - i.e. Postmaster's BETERM * message might have arrived before we received all the backends * stats messages, or even a new backend with the same backendid was * faster in sending his BESTART. * * If the backend is known to be dead, we ignore this add. */ deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead, (void *) &(msg->m_procpid), HASH_FIND, NULL); if (deadbe) return 1; /* * Backend isn't known to be dead. If it's slot is currently used, we * have to kick out the old backend. */ if (beentry->databaseid != InvalidOid) pgstat_sub_backend(beentry->procpid); /* * Put this new backend into the slot. */ beentry->databaseid = msg->m_databaseid; beentry->procpid = msg->m_procpid; beentry->userid = msg->m_userid; beentry->activity_start_sec = 0; beentry->activity_start_usec = 0; MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE); /* * Lookup or create the database entry for this backends DB. */ dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, (void *) &(msg->m_databaseid), HASH_ENTER, &found); if (dbentry == NULL) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } /* * If not found, initialize the new one. */ if (!found) { HASHCTL hash_ctl; dbentry->tables = NULL; dbentry->n_xact_commit = 0; dbentry->n_xact_rollback = 0; dbentry->n_blocks_fetched = 0; dbentry->n_blocks_hit = 0; dbentry->n_connects = 0; dbentry->destroy = 0; memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(Oid); hash_ctl.entrysize = sizeof(PgStat_StatTabEntry); hash_ctl.hash = tag_hash; dbentry->tables = hash_create("Per-database table", PGSTAT_TAB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION); if (dbentry->tables == NULL) { /* assume the problem is out-of-memory */ ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } } /* * Count number of connects to the database */ dbentry->n_connects++; return 0;}/* ---------- * pgstat_sub_backend() - * * Remove a backend from the actual backends list. * ---------- */static voidpgstat_sub_backend(int procpid){ int i; PgStat_StatBeDead *deadbe; bool found; /* * Search in the known-backends table for the slot containing this * PID. */ for (i = 0; i < MaxBackends; i++) { if (pgStatBeTable[i].databaseid != InvalidOid && pgStatBeTable[i].procpid == procpid) { /* * That's him. Add an entry to the known to be dead backends. * Due to possible misorder in the arrival of UDP packets it's * possible that even if we know the backend is dead, there * could still be messages queued that arrive later. Those * messages must not cause our number of backends statistics * to get screwed up, so we remember for a couple of seconds * that this PID is dead and ignore them (only the counting of * backends, not the table access stats they sent). */ deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead, (void *) &procpid, HASH_ENTER, &found); if (deadbe == NULL) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } if (!found) { deadbe->backendid = i + 1; deadbe->destroy = PGSTAT_DESTROY_COUNT; } /* * Declare the backend slot empty. */ pgStatBeTable[i].databaseid = InvalidOid; return; } } /* * No big problem if not found. This can happen if UDP messages arrive * out of order here. */}/* ---------- * pgstat_write_statsfile() - * * Tell the news. * ---------- */static voidpgstat_write_statsfile(void){ HASH_SEQ_STATUS hstat; HASH_SEQ_STATUS tstat; PgStat_StatDBEntry *dbentry; PgStat_StatTabEntry *tabentry; PgStat_StatBeDead *deadbe; FILE *fpout; int i; /* * Open the statistics temp file to write out the current values. */ fpout = fopen(pgStat_tmpfname, PG_BINARY_W); if (fpout == NULL) { ereport(LOG, (errcode_for_file_access(), errmsg("could not open temporary statistics file \"%s\": %m", pgStat_tmpfname))); return; } /* * Walk through the database table. */ hash_seq_init(&hstat, pgStatDBHash); while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) { /* * If this database is marked destroyed, count down and do so if * it reaches 0. */ if (dbentry->destroy > 0) { if (--(dbentry->destroy) == 0) { if (dbentry->tables != NULL) hash_destroy(dbentry->tables); if (hash_search(pgStatDBHash, (void *) &(dbentry->databaseid), HASH_REMOVE, NULL) == NULL) { ereport(LOG, (errmsg("database hash table corrupted " "during cleanup --- abort"))); exit(1); } } /* * Don't include statistics for it. */ continue; } /* * Write out the DB line including the number of life backends. */ fputc('D', fpout); fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout); /* * Walk through the databases access stats per table. */ hash_seq_init(&tstat, dbentry->tables); while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL) { /* * If table entry marked for destruction, same as above for * the database entry. */ if (tabentry->destroy > 0) { if (--(tabentry->destroy) == 0) { if (hash_search(dbentry->tables, (void *) &(tabentry->tableid), HASH_REMOVE, NULL) == NULL) { ereport(LOG, (errmsg("tables hash table for " "database %u corrupted during " "cleanup --- abort", dbentry->databaseid))); exit(1); } } continue; } /* * At least we think this is still a life table. Print it's * access stats. */ fputc('T', fpout); fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout); } /* * Mark the end of this DB */ fputc('d', fpout); } /* * Write out the known running backends to the stats file. */ i = MaxBackends; fputc('M', fpout); fwrite(&i, sizeof(i), 1, fpout); for (i = 0; i < MaxBackends; i++) { if (pgStatBeTable[i].databaseid != InvalidOid) { fputc('B', fpout); fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout); } } /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. */ fputc('E', fpout); if (fclose(fpout) < 0) { ereport(LOG, (errcode_for_file_access(), errmsg("could not close temporary statistics file \"%s\": %m", pgStat_tmpfname))); } else { if (rename(pgStat_tmpfname, pgStat_fname) < 0) { ereport(LOG, (errcode_for_file_access(), errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m", pgStat_tmpfname, pgStat_fname))); } } /* * Clear out the dead backends table */ hash_seq_init(&hstat, pgStatBeDead); while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL) { /* * Count down the destroy delay and remove entries where it * reaches 0. */ if (--(deadbe->destroy) <= 0) { if (hash_search(pgStatBeDead, (void *) &(deadbe->procpid), HASH_REMOVE, NULL) == NULL) { ereport(LOG, (errmsg("dead-server-process hash table corrupted " "during cleanup --- abort"))); exit(1); } } }}/* ---------- * pgstat_read_statsfile() - * * Reads in an existing statistics collector and initializes the * databases hash table (who's entries point to the tables hash tables) * and the current backend table. * ---------- */static voidpgstat_read_statsfile(HTAB **dbhash, Oid onlydb, PgStat_StatBeEntry **betab, int *numbackends){ PgStat_StatDBEntry *dbentry; PgStat_StatDBEntry dbbuf; PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; HASHCTL hash_ctl; HTAB *tabhash = NULL; FILE *fpin; int maxbackends = 0; int havebackends = 0; bool found; MemoryContext use_mcxt; int mcxt_flags; /* * If running in the collector we use the DynaHashCxt memory context. * If running in a backend, we use the TopTransactionContext instead, * so the caller must only know the last XactId when this call * happened to know if his tables are still valid or already gone! */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -