📄 pgstat.c
字号:
/* ---------- * pgstat.c * * All the statistics collector stuff hacked up in one big, ugly file. * * TODO: - Separate collector, postmaster and backend stuff * into different files. * * - Add some automatic call for pgstat vacuuming. * * - Add a pgstat config column to pg_database, so this * entire thing can be enabled/disabled on a per db basis. * * Copyright (c) 2001-2005, PostgreSQL Global Development Group * * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.111.2.3 2006/05/19 15:15:38 alvherre Exp $ * ---------- */#include "postgres.h"#include <unistd.h>#include <fcntl.h>#include <sys/param.h>#include <sys/time.h>#include <sys/socket.h>#include <netdb.h>#include <netinet/in.h>#include <arpa/inet.h>#include <signal.h>#include <time.h>#include "pgstat.h"#include "access/heapam.h"#include "access/xact.h"#include "catalog/pg_database.h"#include "libpq/libpq.h"#include "libpq/pqsignal.h"#include "mb/pg_wchar.h"#include "miscadmin.h"#include "postmaster/autovacuum.h"#include "postmaster/fork_process.h"#include "postmaster/postmaster.h"#include "storage/backendid.h"#include "storage/fd.h"#include "storage/ipc.h"#include "storage/pg_shmem.h"#include "storage/pmsignal.h"#include "storage/procarray.h"#include "tcop/tcopprot.h"#include "utils/hsearch.h"#include "utils/memutils.h"#include "utils/ps_status.h"#include "utils/rel.h"#include "utils/syscache.h"/* ---------- * Paths for the statistics files (relative to installation's $PGDATA). * ---------- */#define PGSTAT_STAT_FILENAME "global/pgstat.stat"#define PGSTAT_STAT_TMPFILE "global/pgstat.tmp"/* ---------- * Timer definitions. * ---------- */#define PGSTAT_STAT_INTERVAL 500 /* How often to write the status file; * in milliseconds. */#define PGSTAT_DESTROY_DELAY 10000 /* How long to keep destroyed objects * known, to give delayed UDP packets * time to arrive; in milliseconds. */#define PGSTAT_DESTROY_COUNT (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)#define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a * failed statistics collector; in * seconds. *//* ---------- * Amount of space reserved in pgstat_recvbuffer(). * ---------- */#define PGSTAT_RECVBUFFERSZ ((int) (1024 * sizeof(PgStat_Msg)))/* ---------- * The initial size hints for the hash tables used in the collector. * ---------- */#define PGSTAT_DB_HASH_SIZE 16#define PGSTAT_BE_HASH_SIZE 512#define PGSTAT_TAB_HASH_SIZE 512/* ---------- * GUC parameters * ---------- */bool pgstat_collect_startcollector = true;bool pgstat_collect_resetonpmstart = false;bool pgstat_collect_querystring = false;bool pgstat_collect_tuplelevel = false;bool pgstat_collect_blocklevel = false;/* ---------- * Local data * ---------- */NON_EXEC_STATIC int pgStatSock = -1;NON_EXEC_STATIC int pgStatPipe[2] = {-1, -1};static struct sockaddr_storage pgStatAddr;static pid_t pgStatCollectorPid = 0;static time_t last_pgstat_start_time;static long pgStatNumMessages = 0;static bool pgStatRunningInCollector = FALSE;/* * Place where backends store per-table info to be sent to the collector. * We store shared relations separately from non-shared ones, to be able to * send them in separate messages. */typedef struct TabStatArray{ int tsa_alloc; /* num allocated */ int tsa_used; /* num actually used */ PgStat_MsgTabstat **tsa_messages; /* the array itself */} TabStatArray;#define TABSTAT_QUANTUM 4 /* we alloc this many at a time */static TabStatArray RegularTabStat = {0, 0, NULL};static TabStatArray SharedTabStat = {0, 0, NULL};static int pgStatXactCommit = 0;static int pgStatXactRollback = 0;static TransactionId pgStatDBHashXact = InvalidTransactionId;static HTAB *pgStatDBHash = NULL;static HTAB *pgStatBeDead = NULL;static PgStat_StatBeEntry *pgStatBeTable = NULL;static int pgStatNumBackends = 0;/* ---------- * Local function forward declarations * ---------- */#ifdef EXEC_BACKENDtypedef enum STATS_PROCESS_TYPE{ STAT_PROC_BUFFER, STAT_PROC_COLLECTOR} STATS_PROCESS_TYPE;static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);static void pgstat_parseArgs(int argc, char *argv[]);#endifNON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);static void pgstat_recvbuffer(void);static void pgstat_exit(SIGNAL_ARGS);static void pgstat_die(SIGNAL_ARGS);static void pgstat_beshutdown_hook(int code, Datum arg);static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);static int pgstat_add_backend(PgStat_MsgHdr *msg);static void pgstat_sub_backend(int procpid);static void pgstat_drop_database(Oid databaseid);static void pgstat_write_statsfile(void);static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb, PgStat_StatBeEntry **betab, int *numbackends);static void backend_read_statsfile(void);static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);static void pgstat_send(void *msg, int len);static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);/* ------------------------------------------------------------ * Public functions called from postmaster follow * ------------------------------------------------------------ *//* ---------- * pgstat_init() - * * Called from postmaster at startup. Create the resources required * by the statistics collector process. If unable to do so, do not * fail --- better to let the postmaster start with stats collection * disabled. * ---------- */voidpgstat_init(void){ ACCEPT_TYPE_ARG3 alen; struct addrinfo *addrs = NULL, *addr, hints; int ret; fd_set rset; struct timeval tv; char test_byte; int sel_res;#define TESTBYTEVAL ((char) 199) /* * Force start of collector daemon if something to collect */ if (pgstat_collect_querystring || pgstat_collect_tuplelevel || pgstat_collect_blocklevel) pgstat_collect_startcollector = true; /* * If we don't have to start a collector or should reset the collected * statistics on postmaster start, simply remove the stats file. */ if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart) pgstat_reset_all(); /* * Nothing else required if collector will not get started */ if (!pgstat_collect_startcollector) return; /* * Create the UDP socket for sending and receiving statistic messages */ hints.ai_flags = AI_PASSIVE; hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_DGRAM; hints.ai_protocol = 0; hints.ai_addrlen = 0; hints.ai_addr = NULL; hints.ai_canonname = NULL; hints.ai_next = NULL; ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs); if (ret || !addrs) { ereport(LOG, (errmsg("could not resolve \"localhost\": %s", gai_strerror(ret)))); goto startup_failed; } /* * On some platforms, pg_getaddrinfo_all() may return multiple addresses * only one of which will actually work (eg, both IPv6 and IPv4 addresses * when kernel will reject IPv6). Worse, the failure may occur at the * bind() or perhaps even connect() stage. So we must loop through the * results till we find a working combination. We will generate LOG * messages, but no error, for bogus combinations. */ for (addr = addrs; addr; addr = addr->ai_next) {#ifdef HAVE_UNIX_SOCKETS /* Ignore AF_UNIX sockets, if any are returned. */ if (addr->ai_family == AF_UNIX) continue;#endif /* * Create the socket. */ if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not create socket for statistics collector: %m"))); continue; } /* * Bind it to a kernel assigned port on localhost and get the assigned * port via getsockname(). */ if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not bind socket for statistics collector: %m"))); closesocket(pgStatSock); pgStatSock = -1; continue; } alen = sizeof(pgStatAddr); if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not get address of socket for statistics collector: %m"))); closesocket(pgStatSock); pgStatSock = -1; continue; } /* * Connect the socket to its own address. This saves a few cycles by * not having to respecify the target address on every send. This also * provides a kernel-level check that only packets from this same * address will be received. */ if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not connect socket for statistics collector: %m"))); closesocket(pgStatSock); pgStatSock = -1; continue; } /* * Try to send and receive a one-byte test message on the socket. This * is to catch situations where the socket can be created but will not * actually pass data (for instance, because kernel packet filtering * rules prevent it). */ test_byte = TESTBYTEVAL; if (send(pgStatSock, &test_byte, 1, 0) != 1) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not send test message on socket for statistics collector: %m"))); closesocket(pgStatSock); pgStatSock = -1; continue; } /* * There could possibly be a little delay before the message can be * received. We arbitrarily allow up to half a second before deciding * it's broken. */ for (;;) /* need a loop to handle EINTR */ { FD_ZERO(&rset); FD_SET(pgStatSock, &rset); tv.tv_sec = 0; tv.tv_usec = 500000; sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv); if (sel_res >= 0 || errno != EINTR) break; } if (sel_res < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("select() failed in statistics collector: %m"))); closesocket(pgStatSock); pgStatSock = -1; continue; } if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset)) { /* * This is the case we actually think is likely, so take pains to * give a specific message for it. * * errno will not be set meaningfully here, so don't use it. */ ereport(LOG, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("test message did not get through on socket for statistics collector"))); closesocket(pgStatSock); pgStatSock = -1; continue; } test_byte++; /* just make sure variable is changed */ if (recv(pgStatSock, &test_byte, 1, 0) != 1) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not receive test message on socket for statistics collector: %m"))); closesocket(pgStatSock); pgStatSock = -1; continue; } if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */ { ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("incorrect test message transmission on socket for statistics collector"))); closesocket(pgStatSock); pgStatSock = -1; continue; } /* If we get here, we have a working socket */ break; } /* Did we find a working address? */ if (!addr || pgStatSock < 0) goto startup_failed; /* * Set the socket to non-blocking IO. This ensures that if the collector * falls behind (despite the buffering process), statistics messages will * be discarded; backends won't block waiting to send messages to the * collector. */ if (!pg_set_noblock(pgStatSock)) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not set statistics collector socket to nonblocking mode: %m"))); goto startup_failed; } pg_freeaddrinfo_all(hints.ai_family, addrs); return;startup_failed: ereport(LOG, (errmsg("disabling statistics collector for lack of working socket"))); if (addrs) pg_freeaddrinfo_all(hints.ai_family, addrs); if (pgStatSock >= 0) closesocket(pgStatSock); pgStatSock = -1; /* Adjust GUC variables to suppress useless activity */ pgstat_collect_startcollector = false; pgstat_collect_querystring = false; pgstat_collect_tuplelevel = false; pgstat_collect_blocklevel = false;}/* * pgstat_reset_all() - * * Remove the stats file. This is used on server start if the * stats_reset_on_server_start feature is enabled, or if WAL * recovery is needed after a crash. */voidpgstat_reset_all(void){ unlink(PGSTAT_STAT_FILENAME);}#ifdef EXEC_BACKEND/* * pgstat_forkexec() - * * Format up the arglist for, then fork and exec, statistics * (buffer and collector) processes */static pid_tpgstat_forkexec(STATS_PROCESS_TYPE procType){ char *av[10]; int ac = 0, bufc = 0, i; char pgstatBuf[2][32]; av[ac++] = "postgres"; switch (procType) { case STAT_PROC_BUFFER: av[ac++] = "-forkbuf"; break; case STAT_PROC_COLLECTOR: av[ac++] = "-forkcol"; break; default: Assert(false); } av[ac++] = NULL; /* filled in by postmaster_forkexec */ /* postgres_exec_path is not passed by write_backend_variables */ av[ac++] = postgres_exec_path; /* Add to the arg list */ Assert(bufc <= lengthof(pgstatBuf)); for (i = 0; i < bufc; i++) av[ac++] = pgstatBuf[i]; av[ac] = NULL; Assert(ac < lengthof(av)); return postmaster_forkexec(ac, av);}/* * pgstat_parseArgs() - * * Extract data from the arglist for exec'ed statistics * (buffer and collector) processes */static voidpgstat_parseArgs(int argc, char *argv[]){ Assert(argc == 4);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -