⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pgstat.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 5 页
字号:
	closesocket(pgStatSock);	/*	 * Identify myself via ps	 */	init_ps_display("stats collector process", "", "");	set_ps_display("");	/*	 * Arrange to write the initial status file right away	 */	gettimeofday(&next_statwrite, NULL);	need_statwrite = TRUE;	/*	 * Read in an existing statistics stats file or initialize the stats to	 * zero.	 */	pgStatRunningInCollector = TRUE;	pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);	/*	 * Create the dead backend hashtable	 */	memset(&hash_ctl, 0, sizeof(hash_ctl));	hash_ctl.keysize = sizeof(int);	hash_ctl.entrysize = sizeof(PgStat_StatBeDead);	hash_ctl.hash = tag_hash;	pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,							   &hash_ctl, HASH_ELEM | HASH_FUNCTION);	/*	 * Create the known backends table	 */	pgStatBeTable = (PgStat_StatBeEntry *)		palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends);	readPipe = pgStatPipe[0];	/*	 * Process incoming messages and handle all the reporting stuff until	 * there are no more messages.	 */	for (;;)	{		/*		 * If we need to write the status file again (there have been changes		 * in the statistics since we wrote it last) calculate the timeout		 * until we have to do so.		 */		if (need_statwrite)		{			struct timeval now;			gettimeofday(&now, NULL);			/* avoid assuming that tv_sec is signed */			if (now.tv_sec > next_statwrite.tv_sec ||				(now.tv_sec == next_statwrite.tv_sec &&				 now.tv_usec >= next_statwrite.tv_usec))			{				timeout.tv_sec = 0;				timeout.tv_usec = 0;			}			else			{				timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;				timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;				if (timeout.tv_usec < 0)				{					timeout.tv_sec--;					timeout.tv_usec += 1000000;				}			}		}		/*		 * Setup the descriptor set for select(2)		 */		FD_ZERO(&rfds);		FD_SET(readPipe, &rfds);		/*		 * Now wait for something to do.		 */		nready = select(readPipe + 1, &rfds, NULL, NULL,						(need_statwrite) ? &timeout : NULL);		if (nready < 0)		{			if (errno == EINTR)				continue;			ereport(ERROR,					(errcode_for_socket_access(),					 errmsg("select() failed in statistics collector: %m")));		}		/*		 * If there are no descriptors ready, our timeout for writing the		 * stats file happened.		 */		if (nready == 0)		{			pgstat_write_statsfile();			need_statwrite = FALSE;			continue;		}		/*		 * Check if there is a new statistics message to collect.		 */		if (FD_ISSET(readPipe, &rfds))		{			/*			 * We may need to issue multiple read calls in case the buffer			 * process didn't write the message in a single write, which is			 * possible since it dumps its buffer bytewise. In any case, we'd			 * need two reads since we don't know the message length			 * initially.			 */			int			nread = 0;			int			targetlen = sizeof(PgStat_MsgHdr);		/* initial */			bool		pipeEOF = false;			while (nread < targetlen)			{				len = piperead(readPipe, ((char *) &msg) + nread,							   targetlen - nread);				if (len < 0)				{					if (errno == EINTR)						continue;					ereport(ERROR,							(errcode_for_socket_access(),							 errmsg("could not read from statistics collector pipe: %m")));				}				if (len == 0)	/* EOF on the pipe! */				{					pipeEOF = true;					break;				}				nread += len;				if (nread == sizeof(PgStat_MsgHdr))				{					/* we have the header, compute actual msg length */					targetlen = msg.msg_hdr.m_size;					if (targetlen < (int) sizeof(PgStat_MsgHdr) ||						targetlen > (int) sizeof(msg))					{						/*						 * Bogus message length implies that we got out of						 * sync with the buffer process somehow. Abort so that						 * we can restart both processes.						 */						ereport(ERROR,							  (errmsg("invalid statistics message length")));					}				}			}			/*			 * EOF on the pipe implies that the buffer process exited. Fall			 * out of outer loop.			 */			if (pipeEOF)				break;			/*			 * Distribute the message to the specific function handling it.			 */			switch (msg.msg_hdr.m_type)			{				case PGSTAT_MTYPE_DUMMY:					break;				case PGSTAT_MTYPE_BESTART:					pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);					break;				case PGSTAT_MTYPE_BETERM:					pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);					break;				case PGSTAT_MTYPE_TABSTAT:					pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);					break;				case PGSTAT_MTYPE_TABPURGE:					pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);					break;				case PGSTAT_MTYPE_ACTIVITY:					pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);					break;				case PGSTAT_MTYPE_DROPDB:					pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);					break;				case PGSTAT_MTYPE_RESETCOUNTER:					pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,											 nread);					break;				case PGSTAT_MTYPE_AUTOVAC_START:					pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread);					break;				case PGSTAT_MTYPE_VACUUM:					pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread);					break;				case PGSTAT_MTYPE_ANALYZE:					pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread);					break;				default:					break;			}			/*			 * Globally count messages.			 */			pgStatNumMessages++;			/*			 * If this is the first message after we wrote the stats file the			 * last time, setup the timeout that it'd be written.			 */			if (!need_statwrite)			{				gettimeofday(&next_statwrite, NULL);				next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);				next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);				next_statwrite.tv_usec %= 1000000;				need_statwrite = TRUE;			}		}		/*		 * Note that we do NOT check for postmaster exit inside the loop; only		 * EOF on the buffer pipe causes us to fall out.  This ensures we		 * don't exit prematurely if there are still a few messages in the		 * buffer or pipe at postmaster shutdown.		 */	}	/*	 * Okay, we saw EOF on the buffer pipe, so there are no more messages to	 * process.  If the buffer process quit because of postmaster shutdown, we	 * want to save the final stats to reuse at next startup. But if the	 * buffer process failed, it seems best not to (there may even now be a	 * new collector firing up, and we don't want it to read a	 * partially-rewritten stats file).	 */	if (!PostmasterIsAlive(false))		pgstat_write_statsfile();}/* ---------- * pgstat_recvbuffer() - * *	This is the body of the separate buffering process. Its only *	purpose is to receive messages from the UDP socket as fast as *	possible and forward them over a pipe into the collector itself. *	If the collector is slow to absorb messages, they are buffered here. * ---------- */static voidpgstat_recvbuffer(void){	fd_set		rfds;	fd_set		wfds;	struct timeval timeout;	int			writePipe = pgStatPipe[1];	int			maxfd;	int			nready;	int			len;	int			xfr;	int			frm;	PgStat_Msg	input_buffer;	char	   *msgbuffer;	int			msg_send = 0;	/* next send index in buffer */	int			msg_recv = 0;	/* next receive index */	int			msg_have = 0;	/* number of bytes stored */	bool		overflow = false;	/*	 * Identify myself via ps	 */	init_ps_display("stats buffer process", "", "");	set_ps_display("");	/*	 * We want to die if our child collector process does.	There are two ways	 * we might notice that it has died: receive SIGCHLD, or get a write	 * failure on the pipe leading to the child.  We can set SIGPIPE to kill	 * us here.  Our SIGCHLD handler was already set up before we forked (must	 * do it that way, else it's a race condition).	 */	pqsignal(SIGPIPE, SIG_DFL);	PG_SETMASK(&UnBlockSig);	/*	 * Set the write pipe to nonblock mode, so that we cannot block when the	 * collector falls behind.	 */	if (!pg_set_noblock(writePipe))		ereport(ERROR,				(errcode_for_socket_access(),				 errmsg("could not set statistics collector pipe to nonblocking mode: %m")));	/*	 * Allocate the message buffer	 */	msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);	/*	 * Loop forever	 */	for (;;)	{		FD_ZERO(&rfds);		FD_ZERO(&wfds);		maxfd = -1;		/*		 * As long as we have buffer space we add the socket to the read		 * descriptor set.		 */		if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))		{			FD_SET(pgStatSock, &rfds);			maxfd = pgStatSock;			overflow = false;		}		else		{			if (!overflow)			{				ereport(LOG,						(errmsg("statistics buffer is full")));				overflow = true;			}		}		/*		 * If we have messages to write out, we add the pipe to the write		 * descriptor set.		 */		if (msg_have > 0)		{			FD_SET(writePipe, &wfds);			if (writePipe > maxfd)				maxfd = writePipe;		}		/*		 * Wait for some work to do; but not for more than 10 seconds. (This		 * determines how quickly we will shut down after an ungraceful		 * postmaster termination; so it needn't be very fast.)		 */		timeout.tv_sec = 10;		timeout.tv_usec = 0;		nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);		if (nready < 0)		{			if (errno == EINTR)				continue;			ereport(ERROR,					(errcode_for_socket_access(),					 errmsg("select() failed in statistics buffer: %m")));		}		/*		 * If there is a message on the socket, read it and check for		 * validity.		 */		if (FD_ISSET(pgStatSock, &rfds))		{			len = recv(pgStatSock, (char *) &input_buffer,					   sizeof(PgStat_Msg), 0);			if (len < 0)				ereport(ERROR,						(errcode_for_socket_access(),						 errmsg("could not read statistics message: %m")));			/*			 * We ignore messages that are smaller than our common header			 */			if (len < sizeof(PgStat_MsgHdr))				continue;			/*			 * The received length must match the length in the header			 */			if (input_buffer.msg_hdr.m_size != len)				continue;			/*			 * O.K. - we accept this message.  Copy it to the circular			 * msgbuffer.			 */			frm = 0;			while (len > 0)			{				xfr = PGSTAT_RECVBUFFERSZ - msg_recv;				if (xfr > len)					xfr = len;				Assert(xfr > 0);				memcpy(msgbuffer + msg_recv,					   ((char *) &input_buffer) + frm,					   xfr);				msg_recv += xfr;				if (msg_recv == PGSTAT_RECVBUFFERSZ)					msg_recv = 0;				msg_have += xfr;				frm += xfr;				len -= xfr;			}		}		/*		 * If the collector is ready to receive, write some data into his		 * pipe.  We may or may not be able to write all that we have.		 *		 * NOTE: if what we have is less than PIPE_BUF bytes but more than the		 * space available in the pipe buffer, most kernels will refuse to		 * write any of it, and will return EAGAIN.  This means we will		 * busy-loop until the situation changes (either because the collector		 * caught up, or because more data arrives so that we have more than		 * PIPE_BUF bytes buffered).  This is not good, but is there any way		 * around it?  We have no way to tell when the collector has caught		 * up...		 */		if (FD_ISSET(writePipe, &wfds))		{			xfr = PGSTAT_RECVBUFFERSZ - msg_send;			if (xfr > msg_have)				xfr = msg_have;			Assert(xfr > 0);			len = pipewrite(writePipe, msgbuffer + msg_send, xfr);			if (len < 0)			{				if (errno == EINTR || errno == EAGAIN)					continue;	/* not enough space in pipe */				ereport(ERROR,						(errcode_for_socket_access(),				errmsg("could not write to statistics collector pipe: %m")));			}			/* NB: len < xfr is okay */			msg_send += len;			if (msg_send == PGSTAT_RECVBUFFERSZ)				msg_send = 0;			msg_have -= len;		}		/*		 * Make sure we forwarded all messages before we check for postmaster		 * termination.		 */		if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))			continue;		/*		 * If the postmaster has terminated, we die too.  (This is no longer		 * the normal exit path, however.)		 */		if (!PostmasterIsAlive(true))			exit(0);	}}/* SIGQUIT signal handler for buffer process */static voidpgstat_exit(SIGNAL_ARGS){	/*	 * For now, we just nail the doors shut and get out of town.  It might be	 * cleaner to allow any pending messages to be sent, but that creates a	 * tradeoff against speed of exit.	 */	/*	 * If running in bufferer, kill our collector as well. On some broken	 * win32 systems, it does not shut down automatically because of issues	 * with socket inheritance.  XXX so why not fix the socket inheritance...	 */#ifdef WIN32	if (pgStatCollectorPid > 0)		kill(pgStatCollectorPid, SIGQUIT);#endif	exit(0);}/* SIGCHLD signal handler for buffer process */static voidpgstat_die(SIGNAL_ARGS){	exit(1);}/* ---------- * pgstat_add_backend() - * *	Support function to keep our backend list up to date. * ---------- */static intpgstat_add_backend(PgStat_MsgHdr *msg){	PgStat_StatBeEntry *beentry;	PgStat_StatBeDead *deadbe;	/*	 * Check that the backend ID is valid	 */	if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)	{		ereport(LOG,				(errmsg("invalid server process ID %d", msg->m_backendid)));		return -1;	}	/*	 * Get the slot for this backendid.	 */	beentry = &pgStatBeTable[msg->m_backendid - 1];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -