⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pqcomm.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------- * * pqcomm.c *	  Communication functions between the Frontend and the Backend * * These routines handle the low-level details of communication between * frontend and backend.  They just shove data across the communication * channel, and are ignorant of the semantics of the data --- or would be, * except for major brain damage in the design of the old COPY OUT protocol. * Unfortunately, COPY OUT was designed to commandeer the communication * channel (it just transfers data without wrapping it into messages). * No other messages can be sent while COPY OUT is in progress; and if the * copy is aborted by an ereport(ERROR), we need to close out the copy so that * the frontend gets back into sync.  Therefore, these routines have to be * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not* * set the DoingCopyOut flag.) * * NOTE: generally, it's a bad idea to emit outgoing messages directly with * pq_putbytes(), especially if the message would require multiple calls * to send.  Instead, use the routines in pqformat.c to construct the message * in a buffer and then emit it in one call to pq_putmessage.  This ensures * that the channel will not be clogged by an incomplete message if execution * is aborted by ereport(ERROR) partway through the message.  The only * non-libpq code that should call pq_putbytes directly is old-style COPY OUT. * * At one time, libpq was shared between frontend and backend, but now * the backend's "backend/libpq" is quite separate from "interfaces/libpq". * All that remains is similarities of names to trap the unwary... * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * *	$PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.182.2.1 2006/01/24 16:38:50 tgl Exp $ * *------------------------------------------------------------------------- *//*------------------------ * INTERFACE ROUTINES * * setup/teardown: *		StreamServerPort	- Open postmaster's server port *		StreamConnection	- Create new connection with client *		StreamClose			- Close a client/backend connection *		TouchSocketFile		- Protect socket file against /tmp cleaners *		pq_init			- initialize libpq at backend startup *		pq_comm_reset	- reset libpq during error recovery *		pq_close		- shutdown libpq at backend exit * * low-level I/O: *		pq_getbytes		- get a known number of bytes from connection *		pq_getstring	- get a null terminated string from connection *		pq_getmessage	- get a message with length word from connection *		pq_getbyte		- get next byte from connection *		pq_peekbyte		- peek at next byte from connection *		pq_putbytes		- send bytes to connection (not flushed until pq_flush) *		pq_flush		- flush pending output * * message-level I/O (and old-style-COPY-OUT cruft): *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode) *		pq_startcopyout - inform libpq that a COPY OUT transfer is beginning *		pq_endcopyout	- end a COPY OUT transfer * *------------------------ */#include "postgres.h"#include <signal.h>#include <errno.h>#include <fcntl.h>#include <grp.h>#include <unistd.h>#include <sys/file.h>#include <sys/socket.h>#include <sys/stat.h>#include <sys/time.h>#include <netdb.h>#include <netinet/in.h>#ifdef HAVE_NETINET_TCP_H#include <netinet/tcp.h>#endif#include <arpa/inet.h>#ifdef HAVE_UTIME_H#include <utime.h>#endif#include "libpq/libpq.h"#include "miscadmin.h"#include "storage/ipc.h"#include "utils/guc.h"/* * Configuration options */int			Unix_socket_permissions;char	   *Unix_socket_group;/* Where the Unix socket file is */static char sock_path[MAXPGPATH];/* * Buffers for low-level I/O */#define PQ_BUFFER_SIZE 8192static char PqSendBuffer[PQ_BUFFER_SIZE];static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */static char PqRecvBuffer[PQ_BUFFER_SIZE];static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */static int	PqRecvLength;		/* End of data available in PqRecvBuffer *//* * Message status */static bool PqCommBusy;static bool DoingCopyOut;/* Internal functions */static void pq_close(int code, Datum arg);static int	internal_putbytes(const char *s, size_t len);static int	internal_flush(void);#ifdef HAVE_UNIX_SOCKETSstatic int	Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);static int	Setup_AF_UNIX(void);#endif   /* HAVE_UNIX_SOCKETS *//* -------------------------------- *		pq_init - initialize libpq at backend startup * -------------------------------- */voidpq_init(void){	PqSendPointer = PqRecvPointer = PqRecvLength = 0;	PqCommBusy = false;	DoingCopyOut = false;	on_proc_exit(pq_close, 0);}/* -------------------------------- *		pq_comm_reset - reset libpq during error recovery * * This is called from error recovery at the outer idle loop.  It's * just to get us out of trouble if we somehow manage to elog() from * inside a pqcomm.c routine (which ideally will never happen, but...) * -------------------------------- */voidpq_comm_reset(void){	/* Do not throw away pending data, but do reset the busy flag */	PqCommBusy = false;	/* We can abort any old-style COPY OUT, too */	pq_endcopyout(true);}/* -------------------------------- *		pq_close - shutdown libpq at backend exit * * Note: in a standalone backend MyProcPort will be null, * don't crash during exit... * -------------------------------- */static voidpq_close(int code, Datum arg){	if (MyProcPort != NULL)	{		/* Cleanly shut down SSL layer */		secure_close(MyProcPort);		/*		 * Formerly we did an explicit close() here, but it seems better to		 * leave the socket open until the process dies.  This allows clients		 * to perform a "synchronous close" if they care --- wait till the		 * transport layer reports connection closure, and you can be sure the		 * backend has exited.		 *		 * We do set sock to -1 to prevent any further I/O, though.		 */		MyProcPort->sock = -1;	}}/* * Streams -- wrapper around Unix socket system calls * * *		Stream functions are used for vanilla TCP connection protocol. *//* StreamDoUnlink() * Shutdown routine for backend connection * If a Unix socket is used for communication, explicitly close it. */#ifdef HAVE_UNIX_SOCKETSstatic voidStreamDoUnlink(int code, Datum arg){	Assert(sock_path[0]);	unlink(sock_path);}#endif   /* HAVE_UNIX_SOCKETS *//* * StreamServerPort -- open a "listening" port to accept connections. * * Successfully opened sockets are added to the ListenSocket[] array, * at the first position that isn't -1. * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamServerPort(int family, char *hostName, unsigned short portNumber,				 char *unixSocketName,				 int ListenSocket[], int MaxListen){	int			fd,				err;	int			maxconn;	int			one = 1;	int			ret;	char		portNumberStr[32];	const char *familyDesc;	char		familyDescBuf[64];	char	   *service;	struct addrinfo *addrs = NULL,			   *addr;	struct addrinfo hint;	int			listen_index = 0;	int			added = 0;	/* Initialize hint structure */	MemSet(&hint, 0, sizeof(hint));	hint.ai_family = family;	hint.ai_flags = AI_PASSIVE;	hint.ai_socktype = SOCK_STREAM;#ifdef HAVE_UNIX_SOCKETS	if (family == AF_UNIX)	{		/* Lock_AF_UNIX will also fill in sock_path. */		if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)			return STATUS_ERROR;		service = sock_path;	}	else#endif   /* HAVE_UNIX_SOCKETS */	{		snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);		service = portNumberStr;	}	ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);	if (ret || !addrs)	{		if (hostName)			ereport(LOG,					(errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",							hostName, service, gai_strerror(ret))));		else			ereport(LOG,				 (errmsg("could not translate service \"%s\" to address: %s",						 service, gai_strerror(ret))));		if (addrs)			pg_freeaddrinfo_all(hint.ai_family, addrs);		return STATUS_ERROR;	}	for (addr = addrs; addr; addr = addr->ai_next)	{		if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))		{			/*			 * Only set up a unix domain socket when they really asked for it.			 * The service/port is different in that case.			 */			continue;		}		/* See if there is still room to add 1 more socket. */		for (; listen_index < MaxListen; listen_index++)		{			if (ListenSocket[listen_index] == -1)				break;		}		if (listen_index >= MaxListen)		{			ereport(LOG,					(errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",							MaxListen)));			break;		}		/* set up family name for possible error messages */		switch (addr->ai_family)		{			case AF_INET:				familyDesc = _("IPv4");				break;#ifdef HAVE_IPV6			case AF_INET6:				familyDesc = _("IPv6");				break;#endif#ifdef HAVE_UNIX_SOCKETS			case AF_UNIX:				familyDesc = _("Unix");				break;#endif			default:				snprintf(familyDescBuf, sizeof(familyDescBuf),						 _("unrecognized address family %d"),						 addr->ai_family);				familyDesc = familyDescBuf;				break;		}		if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)		{			ereport(LOG,					(errcode_for_socket_access(),			/* translator: %s is IPv4, IPv6, or Unix */					 errmsg("could not create %s socket: %m",							familyDesc)));			continue;		}		if (!IS_AF_UNIX(addr->ai_family))		{			if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,							(char *) &one, sizeof(one))) == -1)			{				ereport(LOG,						(errcode_for_socket_access(),						 errmsg("setsockopt(SO_REUSEADDR) failed: %m")));				closesocket(fd);				continue;			}		}#ifdef IPV6_V6ONLY		if (addr->ai_family == AF_INET6)		{			if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,						   (char *) &one, sizeof(one)) == -1)			{				ereport(LOG,						(errcode_for_socket_access(),						 errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));				closesocket(fd);				continue;			}		}#endif		/*		 * Note: This might fail on some OS's, like Linux older than		 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map		 * ipv4 addresses to ipv6.	It will show ::ffff:ipv4 for all ipv4		 * connections.		 */		err = bind(fd, addr->ai_addr, addr->ai_addrlen);		if (err < 0)		{			ereport(LOG,					(errcode_for_socket_access(),			/* translator: %s is IPv4, IPv6, or Unix */					 errmsg("could not bind %s socket: %m",							familyDesc),					 (IS_AF_UNIX(addr->ai_family)) ?				  errhint("Is another postmaster already running on port %d?"						  " If not, remove socket file \"%s\" and retry.",						  (int) portNumber, sock_path) :				  errhint("Is another postmaster already running on port %d?"						  " If not, wait a few seconds and retry.",						  (int) portNumber)));			closesocket(fd);			continue;		}#ifdef HAVE_UNIX_SOCKETS		if (addr->ai_family == AF_UNIX)		{			if (Setup_AF_UNIX() != STATUS_OK)			{				closesocket(fd);				break;			}		}#endif		/*		 * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only		 * intended to provide a clamp on the request on platforms where an		 * overly large request provokes a kernel error (are there any?).		 */		maxconn = MaxBackends * 2;		if (maxconn > PG_SOMAXCONN)			maxconn = PG_SOMAXCONN;		err = listen(fd, maxconn);		if (err < 0)		{			ereport(LOG,					(errcode_for_socket_access(),			/* translator: %s is IPv4, IPv6, or Unix */					 errmsg("could not listen on %s socket: %m",							familyDesc)));			closesocket(fd);			continue;		}		ListenSocket[listen_index] = fd;		added++;	}	pg_freeaddrinfo_all(hint.ai_family, addrs);	if (!added)		return STATUS_ERROR;	return STATUS_OK;}#ifdef HAVE_UNIX_SOCKETS/* * Lock_AF_UNIX -- configure unix socket file path */static intLock_AF_UNIX(unsigned short portNumber, char *unixSocketName){	UNIXSOCK_PATH(sock_path, portNumber, unixSocketName);	/*	 * Grab an interlock file associated with the socket file.	 */	CreateSocketLockFile(sock_path, true);	/*	 * Once we have the interlock, we can safely delete any pre-existing	 * socket file to avoid failure at bind() time.	 */	unlink(sock_path);	return STATUS_OK;}/* * Setup_AF_UNIX -- configure unix socket permissions

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -