📄 pgstat.c
字号:
if (pgStatRunningInCollector) { use_mcxt = NULL; mcxt_flags = 0; } else { use_mcxt = TopTransactionContext; mcxt_flags = HASH_CONTEXT; } /* * Create the DB hashtable */ memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(Oid); hash_ctl.entrysize = sizeof(PgStat_StatDBEntry); hash_ctl.hash = tag_hash; hash_ctl.hcxt = use_mcxt; *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION | mcxt_flags); if (*dbhash == NULL) { /* assume the problem is out-of-memory */ if (pgStatRunningInCollector) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } /* in backend, can do normal error */ ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } /* * Initialize the number of known backends to zero, just in case we do * a silent error return below. */ if (numbackends != NULL) *numbackends = 0; if (betab != NULL) *betab = NULL; /* * Try to open the status file. If it doesn't exist, the backends * simply return zero for anything and the collector simply starts * from scratch with empty counters. */ if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL) return; /* * We found an existing collector stats file. Read it and put all the * hashtable entries into place. */ for (;;) { switch (fgetc(fpin)) { /* * 'D' A PgStat_StatDBEntry struct describing a database * follows. Subsequently, zero to many 'T' entries will * follow until a 'd' is encountered. */ case 'D': if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } /* * Add to the DB hash */ dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash, (void *) &dbbuf.databaseid, HASH_ENTER, &found); if (dbentry == NULL) { if (pgStatRunningInCollector) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } else { fclose(fpin); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } } if (found) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; dbentry->destroy = 0; dbentry->n_backends = 0; /* * Don't collect tables if not the requested DB */ if (onlydb != InvalidOid && onlydb != dbbuf.databaseid) break; memset(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(Oid); hash_ctl.entrysize = sizeof(PgStat_StatTabEntry); hash_ctl.hash = tag_hash; hash_ctl.hcxt = use_mcxt; dbentry->tables = hash_create("Per-database table", PGSTAT_TAB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_FUNCTION | mcxt_flags); if (dbentry->tables == NULL) { /* assume the problem is out-of-memory */ if (pgStatRunningInCollector) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } /* in backend, can do normal error */ fclose(fpin); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } /* * Arrange that following 'T's add entries to this * databases tables hash table. */ tabhash = dbentry->tables; break; /* * 'd' End of this database. */ case 'd': tabhash = NULL; break; /* * 'T' A PgStat_StatTabEntry follows. */ case 'T': if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } /* * Skip if table belongs to a not requested database. */ if (tabhash == NULL) break; tabentry = (PgStat_StatTabEntry *) hash_search(tabhash, (void *) &tabbuf.tableid, HASH_ENTER, &found); if (tabentry == NULL) { if (pgStatRunningInCollector) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } /* in backend, can do normal error */ fclose(fpin); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); } if (found) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } memcpy(tabentry, &tabbuf, sizeof(tabbuf)); break; /* * 'M' The maximum number of backends to expect follows. */ case 'M': if (betab == NULL || numbackends == NULL) { fclose(fpin); return; } if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) != sizeof(maxbackends)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } if (maxbackends == 0) { fclose(fpin); return; } /* * Allocate space (in TopTransactionContext too) for the * backend table. */ if (use_mcxt == NULL) *betab = (PgStat_StatBeEntry *) malloc( sizeof(PgStat_StatBeEntry) * maxbackends); else *betab = (PgStat_StatBeEntry *) MemoryContextAlloc( use_mcxt, sizeof(PgStat_StatBeEntry) * maxbackends); break; /* * 'B' A PgStat_StatBeEntry follows. */ case 'B': if (betab == NULL || numbackends == NULL) { fclose(fpin); return; } if (*betab == NULL) { fclose(fpin); return; } /* * Read it directly into the table. */ if (fread(&(*betab)[havebackends], 1, sizeof(PgStat_StatBeEntry), fpin) != sizeof(PgStat_StatBeEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } /* * Count backends per database here. */ dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash, (void *) &((*betab)[havebackends].databaseid), HASH_FIND, NULL); if (dbentry) dbentry->n_backends++; havebackends++; if (numbackends != 0) *numbackends = havebackends; if (havebackends >= maxbackends) { fclose(fpin); return; } break; /* * 'E' The EOF marker of a complete stats file. */ case 'E': fclose(fpin); return; default: ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted pgstat.stat file"))); fclose(fpin); return; } } fclose(fpin);}/* ---------- * pgstat_recv_bestart() - * * Process a backend starup message. * ---------- */static voidpgstat_recv_bestart(PgStat_MsgBestart *msg, int len){ pgstat_add_backend(&msg->m_hdr);}/* ---------- * pgstat_recv_beterm() - * * Process a backend termination message. * ---------- */static voidpgstat_recv_beterm(PgStat_MsgBeterm *msg, int len){ pgstat_sub_backend(msg->m_hdr.m_procpid);}/* ---------- * pgstat_recv_activity() - * * Remember what the backend is doing. * ---------- */static voidpgstat_recv_activity(PgStat_MsgActivity *msg, int len){ PgStat_StatBeEntry *entry; /* * Here we check explicitly for 0 return, since we don't want to * mangle the activity of an active backend by a delayed packed from a * dead one. */ if (pgstat_add_backend(&msg->m_hdr) != 0) return; entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]); strncpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE); entry->activity_start_sec = GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);}/* ---------- * pgstat_recv_tabstat() - * * Count what the backend has done. * ---------- */static voidpgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len){ PgStat_TableEntry *tabmsg = &(msg->m_entry[0]); PgStat_StatDBEntry *dbentry; PgStat_StatTabEntry *tabentry; int i; bool found; /* * Make sure the backend is counted for. */ if (pgstat_add_backend(&msg->m_hdr) < 0) return; /* * Lookup the database in the hashtable. */ dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, (void *) &(msg->m_hdr.m_databaseid), HASH_FIND, NULL); if (!dbentry) return; /* * If the database is marked for destroy, this is a delayed UDP packet * and not worth being counted. */ if (dbentry->destroy > 0) return; dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit); dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback); /* * Process all table entries in the message. */ for (i = 0; i < msg->m_nentries; i++) { tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, (void *) &(tabmsg[i].t_id), HASH_ENTER, &found); if (tabentry == NULL) { ereport(LOG, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory in statistics collector --- abort"))); exit(1); } if (!found) { /* * If it's a new table entry, initialize counters to the * values we just got. */ tabentry->numscans = tabmsg[i].t_numscans; tabentry->tuples_returned = tabmsg[i].t_tuples_returned; tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched; tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted; tabentry->tuples_updated = tabmsg[i].t_tuples_updated; tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted; tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched; tabentry->blocks_hit = tabmsg[i].t_blocks_hit; tabentry->destroy = 0; } else { /* * Otherwise add the values to the existing entry. */ tabentry->numscans += tabmsg[i].t_numscans; tabentry->tuples_returned += tabmsg[i].t_tuples_returned; tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched; tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted; tabentry->tuples_updated += tabmsg[i].t_tuples_updated; tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted; tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched; tabentry->blocks_hit += tabmsg[i].t_blocks_hit; } /* * And add the block IO to the database entry. */ dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched; dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit; }}/* ---------- * pgstat_recv_tabpurge() - * * Arrange for dead table removal. * ---------- */static voidpgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len){ PgStat_StatDBEntry *dbentry; PgStat_StatTabEntry *tabentry; int i; /* * Make sure the backend is counted for. */ if (pgstat_add_backend(&msg->m_hdr) < 0) return; /* * Lookup the database in the hashtable. */ dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, (void *) &(msg->m_hdr.m_databaseid), HASH_FIND, NULL); if (!dbentry) return; /* * If the database is marked for destroy, this is a delayed UDP packet * and the tables will go away at DB destruction. */ if (dbentry->destroy > 0) return; /* * Process all table entries in the message. */ for (i = 0; i < msg->m_nentries; i++) { tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables, (void *) &(msg->m_tableid[i]), HASH_FIND, NULL); if (tabentry) tabentry->destroy = PGSTAT_DESTROY_COUNT; }}/* ---------- * pgstat_recv_dropdb() - * * Arrange for dead database removal * ---------- */static voidpgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len){ PgStat_StatDBEntry *dbentry; /* * Make sure the backend is counted for. */ if (pgstat_add_backend(&msg->m_hdr) < 0) return; /* * Lookup the database in the hashtable. */ dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash, (void *) &(msg->m_databaseid), HASH_FIND, NULL); if (!dbentry) return; /* * Mark the database for destruction. */ dbentry->destroy = PGSTAT_DESTROY_COUNT;}/* ---------- * pgstat_recv_dropdb() - * * Arrange for dead database removal * ---------- */static voidpgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len){ HASHCTL hash_ctl; PgStat_StatDBEntry *dbentry; /* * Mak
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -