📄 pqcomm.c
字号:
/*------------------------------------------------------------------------- * * pqcomm.c * Communication functions between the Frontend and the Backend * * These routines handle the low-level details of communication between * frontend and backend. They just shove data across the communication * channel, and are ignorant of the semantics of the data --- or would be, * except for major brain damage in the design of the old COPY OUT protocol. * Unfortunately, COPY OUT was designed to commandeer the communication * channel (it just transfers data without wrapping it into messages). * No other messages can be sent while COPY OUT is in progress; and if the * copy is aborted by an ereport(ERROR), we need to close out the copy so that * the frontend gets back into sync. Therefore, these routines have to be * aware of COPY OUT state. (New COPY-OUT is message-based and does *not* * set the DoingCopyOut flag.) * * NOTE: generally, it's a bad idea to emit outgoing messages directly with * pq_putbytes(), especially if the message would require multiple calls * to send. Instead, use the routines in pqformat.c to construct the message * in a buffer and then emit it in one call to pq_putmessage. This ensures * that the channel will not be clogged by an incomplete message if execution * is aborted by ereport(ERROR) partway through the message. The only * non-libpq code that should call pq_putbytes directly is old-style COPY OUT. * * At one time, libpq was shared between frontend and backend, but now * the backend's "backend/libpq" is quite separate from "interfaces/libpq". * All that remains is similarities of names to trap the unwary... * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * $PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.182.2.1 2006/01/24 16:38:50 tgl Exp $ * *------------------------------------------------------------------------- *//*------------------------ * INTERFACE ROUTINES * * setup/teardown: * StreamServerPort - Open postmaster's server port * StreamConnection - Create new connection with client * StreamClose - Close a client/backend connection * TouchSocketFile - Protect socket file against /tmp cleaners * pq_init - initialize libpq at backend startup * pq_comm_reset - reset libpq during error recovery * pq_close - shutdown libpq at backend exit * * low-level I/O: * pq_getbytes - get a known number of bytes from connection * pq_getstring - get a null terminated string from connection * pq_getmessage - get a message with length word from connection * pq_getbyte - get next byte from connection * pq_peekbyte - peek at next byte from connection * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning * pq_endcopyout - end a COPY OUT transfer * *------------------------ */#include "postgres.h"#include <signal.h>#include <errno.h>#include <fcntl.h>#include <grp.h>#include <unistd.h>#include <sys/file.h>#include <sys/socket.h>#include <sys/stat.h>#include <sys/time.h>#include <netdb.h>#include <netinet/in.h>#ifdef HAVE_NETINET_TCP_H#include <netinet/tcp.h>#endif#include <arpa/inet.h>#ifdef HAVE_UTIME_H#include <utime.h>#endif#include "libpq/libpq.h"#include "miscadmin.h"#include "storage/ipc.h"#include "utils/guc.h"/* * Configuration options */int Unix_socket_permissions;char *Unix_socket_group;/* Where the Unix socket file is */static char sock_path[MAXPGPATH];/* * Buffers for low-level I/O */#define PQ_BUFFER_SIZE 8192static char PqSendBuffer[PQ_BUFFER_SIZE];static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */static char PqRecvBuffer[PQ_BUFFER_SIZE];static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */static int PqRecvLength; /* End of data available in PqRecvBuffer *//* * Message status */static bool PqCommBusy;static bool DoingCopyOut;/* Internal functions */static void pq_close(int code, Datum arg);static int internal_putbytes(const char *s, size_t len);static int internal_flush(void);#ifdef HAVE_UNIX_SOCKETSstatic int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);static int Setup_AF_UNIX(void);#endif /* HAVE_UNIX_SOCKETS *//* -------------------------------- * pq_init - initialize libpq at backend startup * -------------------------------- */voidpq_init(void){ PqSendPointer = PqRecvPointer = PqRecvLength = 0; PqCommBusy = false; DoingCopyOut = false; on_proc_exit(pq_close, 0);}/* -------------------------------- * pq_comm_reset - reset libpq during error recovery * * This is called from error recovery at the outer idle loop. It's * just to get us out of trouble if we somehow manage to elog() from * inside a pqcomm.c routine (which ideally will never happen, but...) * -------------------------------- */voidpq_comm_reset(void){ /* Do not throw away pending data, but do reset the busy flag */ PqCommBusy = false; /* We can abort any old-style COPY OUT, too */ pq_endcopyout(true);}/* -------------------------------- * pq_close - shutdown libpq at backend exit * * Note: in a standalone backend MyProcPort will be null, * don't crash during exit... * -------------------------------- */static voidpq_close(int code, Datum arg){ if (MyProcPort != NULL) { /* Cleanly shut down SSL layer */ secure_close(MyProcPort); /* * Formerly we did an explicit close() here, but it seems better to * leave the socket open until the process dies. This allows clients * to perform a "synchronous close" if they care --- wait till the * transport layer reports connection closure, and you can be sure the * backend has exited. * * We do set sock to -1 to prevent any further I/O, though. */ MyProcPort->sock = -1; }}/* * Streams -- wrapper around Unix socket system calls * * * Stream functions are used for vanilla TCP connection protocol. *//* StreamDoUnlink() * Shutdown routine for backend connection * If a Unix socket is used for communication, explicitly close it. */#ifdef HAVE_UNIX_SOCKETSstatic voidStreamDoUnlink(int code, Datum arg){ Assert(sock_path[0]); unlink(sock_path);}#endif /* HAVE_UNIX_SOCKETS *//* * StreamServerPort -- open a "listening" port to accept connections. * * Successfully opened sockets are added to the ListenSocket[] array, * at the first position that isn't -1. * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamServerPort(int family, char *hostName, unsigned short portNumber, char *unixSocketName, int ListenSocket[], int MaxListen){ int fd, err; int maxconn; int one = 1; int ret; char portNumberStr[32]; const char *familyDesc; char familyDescBuf[64]; char *service; struct addrinfo *addrs = NULL, *addr; struct addrinfo hint; int listen_index = 0; int added = 0; /* Initialize hint structure */ MemSet(&hint, 0, sizeof(hint)); hint.ai_family = family; hint.ai_flags = AI_PASSIVE; hint.ai_socktype = SOCK_STREAM;#ifdef HAVE_UNIX_SOCKETS if (family == AF_UNIX) { /* Lock_AF_UNIX will also fill in sock_path. */ if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK) return STATUS_ERROR; service = sock_path; } else#endif /* HAVE_UNIX_SOCKETS */ { snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber); service = portNumberStr; } ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs); if (ret || !addrs) { if (hostName) ereport(LOG, (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s", hostName, service, gai_strerror(ret)))); else ereport(LOG, (errmsg("could not translate service \"%s\" to address: %s", service, gai_strerror(ret)))); if (addrs) pg_freeaddrinfo_all(hint.ai_family, addrs); return STATUS_ERROR; } for (addr = addrs; addr; addr = addr->ai_next) { if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family)) { /* * Only set up a unix domain socket when they really asked for it. * The service/port is different in that case. */ continue; } /* See if there is still room to add 1 more socket. */ for (; listen_index < MaxListen; listen_index++) { if (ListenSocket[listen_index] == -1) break; } if (listen_index >= MaxListen) { ereport(LOG, (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded", MaxListen))); break; } /* set up family name for possible error messages */ switch (addr->ai_family) { case AF_INET: familyDesc = _("IPv4"); break;#ifdef HAVE_IPV6 case AF_INET6: familyDesc = _("IPv6"); break;#endif#ifdef HAVE_UNIX_SOCKETS case AF_UNIX: familyDesc = _("Unix"); break;#endif default: snprintf(familyDescBuf, sizeof(familyDescBuf), _("unrecognized address family %d"), addr->ai_family); familyDesc = familyDescBuf; break; } if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0) { ereport(LOG, (errcode_for_socket_access(), /* translator: %s is IPv4, IPv6, or Unix */ errmsg("could not create %s socket: %m", familyDesc))); continue; } if (!IS_AF_UNIX(addr->ai_family)) { if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one))) == -1) { ereport(LOG, (errcode_for_socket_access(), errmsg("setsockopt(SO_REUSEADDR) failed: %m"))); closesocket(fd); continue; } }#ifdef IPV6_V6ONLY if (addr->ai_family == AF_INET6) { if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &one, sizeof(one)) == -1) { ereport(LOG, (errcode_for_socket_access(), errmsg("setsockopt(IPV6_V6ONLY) failed: %m"))); closesocket(fd); continue; } }#endif /* * Note: This might fail on some OS's, like Linux older than * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map * ipv4 addresses to ipv6. It will show ::ffff:ipv4 for all ipv4 * connections. */ err = bind(fd, addr->ai_addr, addr->ai_addrlen); if (err < 0) { ereport(LOG, (errcode_for_socket_access(), /* translator: %s is IPv4, IPv6, or Unix */ errmsg("could not bind %s socket: %m", familyDesc), (IS_AF_UNIX(addr->ai_family)) ? errhint("Is another postmaster already running on port %d?" " If not, remove socket file \"%s\" and retry.", (int) portNumber, sock_path) : errhint("Is another postmaster already running on port %d?" " If not, wait a few seconds and retry.", (int) portNumber))); closesocket(fd); continue; }#ifdef HAVE_UNIX_SOCKETS if (addr->ai_family == AF_UNIX) { if (Setup_AF_UNIX() != STATUS_OK) { closesocket(fd); break; } }#endif /* * Select appropriate accept-queue length limit. PG_SOMAXCONN is only * intended to provide a clamp on the request on platforms where an * overly large request provokes a kernel error (are there any?). */ maxconn = MaxBackends * 2; if (maxconn > PG_SOMAXCONN) maxconn = PG_SOMAXCONN; err = listen(fd, maxconn); if (err < 0) { ereport(LOG, (errcode_for_socket_access(), /* translator: %s is IPv4, IPv6, or Unix */ errmsg("could not listen on %s socket: %m", familyDesc))); closesocket(fd); continue; } ListenSocket[listen_index] = fd; added++; } pg_freeaddrinfo_all(hint.ai_family, addrs); if (!added) return STATUS_ERROR; return STATUS_OK;}#ifdef HAVE_UNIX_SOCKETS/* * Lock_AF_UNIX -- configure unix socket file path */static intLock_AF_UNIX(unsigned short portNumber, char *unixSocketName){ UNIXSOCK_PATH(sock_path, portNumber, unixSocketName); /* * Grab an interlock file associated with the socket file. */ CreateSocketLockFile(sock_path, true); /* * Once we have the interlock, we can safely delete any pre-existing * socket file to avoid failure at bind() time. */ unlink(sock_path); return STATUS_OK;}/* * Setup_AF_UNIX -- configure unix socket permissions
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -