pqcomm.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 1,111 行 · 第 1/2 页
C
1,111 行
/*------------------------------------------------------------------------- * * pqcomm.c * Communication functions between the Frontend and the Backend * * These routines handle the low-level details of communication between * frontend and backend. They just shove data across the communication * channel, and are ignorant of the semantics of the data --- or would be, * except for major brain damage in the design of the old COPY OUT protocol. * Unfortunately, COPY OUT was designed to commandeer the communication * channel (it just transfers data without wrapping it into messages). * No other messages can be sent while COPY OUT is in progress; and if the * copy is aborted by an ereport(ERROR), we need to close out the copy so that * the frontend gets back into sync. Therefore, these routines have to be * aware of COPY OUT state. (New COPY-OUT is message-based and does *not* * set the DoingCopyOut flag.) * * NOTE: generally, it's a bad idea to emit outgoing messages directly with * pq_putbytes(), especially if the message would require multiple calls * to send. Instead, use the routines in pqformat.c to construct the message * in a buffer and then emit it in one call to pq_putmessage. This ensures * that the channel will not be clogged by an incomplete message if execution * is aborted by ereport(ERROR) partway through the message. The only * non-libpq code that should call pq_putbytes directly is old-style COPY OUT. * * At one time, libpq was shared between frontend and backend, but now * the backend's "backend/libpq" is quite separate from "interfaces/libpq". * All that remains is similarities of names to trap the unwary... * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * $Header: /cvsroot/pgsql/src/backend/libpq/pqcomm.c,v 1.166.2.1 2004/09/26 00:26:50 tgl Exp $ * *------------------------------------------------------------------------- *//*------------------------ * INTERFACE ROUTINES * * setup/teardown: * StreamServerPort - Open postmaster's server port * StreamConnection - Create new connection with client * StreamClose - Close a client/backend connection * TouchSocketFile - Protect socket file against /tmp cleaners * pq_init - initialize libpq at backend startup * pq_comm_reset - reset libpq during error recovery * pq_close - shutdown libpq at backend exit * * low-level I/O: * pq_getbytes - get a known number of bytes from connection * pq_getstring - get a null terminated string from connection * pq_getmessage - get a message with length word from connection * pq_getbyte - get next byte from connection * pq_peekbyte - peek at next byte from connection * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning * pq_endcopyout - end a COPY OUT transfer * *------------------------ */#include "postgres.h"#include <signal.h>#include <errno.h>#include <fcntl.h>#include <grp.h>#include <unistd.h>#include <sys/file.h>#include <sys/socket.h>#include <sys/stat.h>#include <sys/time.h>#include <netdb.h>#include <netinet/in.h>#ifdef HAVE_NETINET_TCP_H#include <netinet/tcp.h>#endif#include <arpa/inet.h>#ifdef HAVE_UTIME_H#include <utime.h>#endif#include "libpq/libpq.h"#include "miscadmin.h"#include "storage/ipc.h"/* * Configuration options */int Unix_socket_permissions;char *Unix_socket_group;/* Where the Unix socket file is */static char sock_path[MAXPGPATH];/* * Buffers for low-level I/O */#define PQ_BUFFER_SIZE 8192static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];static int PqSendPointer; /* Next index to store a byte in * PqSendBuffer */static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];static int PqRecvPointer; /* Next index to read a byte from * PqRecvBuffer */static int PqRecvLength; /* End of data available in PqRecvBuffer *//* * Message status */static bool PqCommBusy;static bool DoingCopyOut;/* Internal functions */static void pq_close(void);static int internal_putbytes(const char *s, size_t len);static int internal_flush(void);#ifdef HAVE_UNIX_SOCKETSstatic int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);static int Setup_AF_UNIX(void);#endif /* HAVE_UNIX_SOCKETS *//* -------------------------------- * pq_init - initialize libpq at backend startup * -------------------------------- */voidpq_init(void){ PqSendPointer = PqRecvPointer = PqRecvLength = 0; PqCommBusy = false; DoingCopyOut = false; on_proc_exit(pq_close, 0);}/* -------------------------------- * pq_comm_reset - reset libpq during error recovery * * This is called from error recovery at the outer idle loop. It's * just to get us out of trouble if we somehow manage to elog() from * inside a pqcomm.c routine (which ideally will never happen, but...) * -------------------------------- */voidpq_comm_reset(void){ /* Do not throw away pending data, but do reset the busy flag */ PqCommBusy = false; /* We can abort any old-style COPY OUT, too */ pq_endcopyout(true);}/* -------------------------------- * pq_close - shutdown libpq at backend exit * * Note: in a standalone backend MyProcPort will be null, * don't crash during exit... * -------------------------------- */static voidpq_close(void){ if (MyProcPort != NULL) { /* Cleanly shut down SSL layer */ secure_close(MyProcPort); /* * Formerly we did an explicit close() here, but it seems better * to leave the socket open until the process dies. This allows * clients to perform a "synchronous close" if they care --- wait * till the transport layer reports connection closure, and you * can be sure the backend has exited. * * We do set sock to -1 to prevent any further I/O, though. */ MyProcPort->sock = -1; }}/* * Streams -- wrapper around Unix socket system calls * * * Stream functions are used for vanilla TCP connection protocol. *//* StreamDoUnlink() * Shutdown routine for backend connection * If a Unix socket is used for communication, explicitly close it. */#ifdef HAVE_UNIX_SOCKETSstatic voidStreamDoUnlink(void){ Assert(sock_path[0]); unlink(sock_path);}#endif /* HAVE_UNIX_SOCKETS *//* * StreamServerPort -- open a "listening" port to accept connections. * * Successfully opened sockets are added to the ListenSocket[] array, * at the first position that isn't -1. * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamServerPort(int family, char *hostName, unsigned short portNumber, char *unixSocketName, int ListenSocket[], int MaxListen){ int fd, err; int maxconn; int one = 1; int ret; char portNumberStr[32]; const char *familyDesc; char familyDescBuf[64]; char *service; struct addrinfo *addrs = NULL, *addr; struct addrinfo hint; int listen_index = 0; int added = 0; /* Initialize hint structure */ MemSet(&hint, 0, sizeof(hint)); hint.ai_family = family; hint.ai_flags = AI_PASSIVE; hint.ai_socktype = SOCK_STREAM;#ifdef HAVE_UNIX_SOCKETS if (family == AF_UNIX) { /* Lock_AF_UNIX will also fill in sock_path. */ if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK) return STATUS_ERROR; service = sock_path; } else#endif /* HAVE_UNIX_SOCKETS */ { snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber); service = portNumberStr; } ret = getaddrinfo_all(hostName, service, &hint, &addrs); if (ret || !addrs) { if (hostName) ereport(LOG, (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s", hostName, service, gai_strerror(ret)))); else ereport(LOG, (errmsg("could not translate service \"%s\" to address: %s", service, gai_strerror(ret)))); freeaddrinfo_all(hint.ai_family, addrs); return STATUS_ERROR; } for (addr = addrs; addr; addr = addr->ai_next) { if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family)) { /* * Only set up a unix domain socket when they really asked for * it. The service/port is different in that case. */ continue; } /* See if there is still room to add 1 more socket. */ for (; listen_index < MaxListen; listen_index++) { if (ListenSocket[listen_index] == -1) break; } if (listen_index >= MaxListen) { /* Nothing found. */ break; } /* set up family name for possible error messages */ switch (addr->ai_family) { case AF_INET: familyDesc = gettext("IPv4"); break;#ifdef HAVE_IPV6 case AF_INET6: familyDesc = gettext("IPv6"); break;#endif#ifdef HAVE_UNIX_SOCKETS case AF_UNIX: familyDesc = gettext("Unix"); break;#endif default: snprintf(familyDescBuf, sizeof(familyDescBuf), gettext("unrecognized address family %d"), addr->ai_family); familyDesc = familyDescBuf; break; } if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0) { ereport(LOG, (errcode_for_socket_access(), /* translator: %s is IPv4, IPv6, or Unix */ errmsg("could not create %s socket: %m", familyDesc))); continue; } if (!IS_AF_UNIX(addr->ai_family)) { if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one))) == -1) { ereport(LOG, (errcode_for_socket_access(), errmsg("setsockopt(SO_REUSEADDR) failed: %m"))); closesocket(fd); continue; } }#ifdef IPV6_V6ONLY if (addr->ai_family == AF_INET6) { if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &one, sizeof(one)) == -1) { ereport(LOG, (errcode_for_socket_access(), errmsg("setsockopt(IPV6_V6ONLY) failed: %m"))); closesocket(fd); continue; } }#endif /* * Note: This might fail on some OS's, like Linux older than * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and * map ipv4 addresses to ipv6. It will show ::ffff:ipv4 for all * ipv4 connections. */ err = bind(fd, addr->ai_addr, addr->ai_addrlen); if (err < 0) { ereport(LOG, (errcode_for_socket_access(), /* translator: %s is IPv4, IPv6, or Unix */ errmsg("could not bind %s socket: %m", familyDesc), (IS_AF_UNIX(addr->ai_family)) ? errhint("Is another postmaster already running on port %d?" " If not, remove socket file \"%s\" and retry.", (int) portNumber, sock_path) : errhint("Is another postmaster already running on port %d?" " If not, wait a few seconds and retry.", (int) portNumber))); closesocket(fd); continue; }#ifdef HAVE_UNIX_SOCKETS if (addr->ai_family == AF_UNIX) { if (Setup_AF_UNIX() != STATUS_OK) { closesocket(fd); break; } }#endif /* * Select appropriate accept-queue length limit. PG_SOMAXCONN is * only intended to provide a clamp on the request on platforms * where an overly large request provokes a kernel error (are * there any?). */ maxconn = MaxBackends * 2; if (maxconn > PG_SOMAXCONN) maxconn = PG_SOMAXCONN; err = listen(fd, maxconn); if (err < 0) { ereport(LOG, (errcode_for_socket_access(), /* translator: %s is IPv4, IPv6, or Unix */ errmsg("could not listen on %s socket: %m", familyDesc))); closesocket(fd); continue; } ListenSocket[listen_index] = fd; added++; } freeaddrinfo_all(hint.ai_family, addrs); if (!added) return STATUS_ERROR; return STATUS_OK;}#ifdef HAVE_UNIX_SOCKETS/* * Lock_AF_UNIX -- configure unix socket file path */static intLock_AF_UNIX(unsigned short portNumber, char *unixSocketName){ UNIXSOCK_PATH(sock_path, portNumber, unixSocketName); /* * Grab an interlock file associated with the socket file. */ CreateSocketLockFile(sock_path, true); /* * Once we have the interlock, we can safely delete any pre-existing * socket file to avoid failure at bind() time. */ unlink(sock_path); return STATUS_OK;}/* * Setup_AF_UNIX -- configure unix socket permissions */static intSetup_AF_UNIX(void){ /* Arrange to unlink the socket file at exit */ on_proc_exit(StreamDoUnlink, 0); /* * Fix socket ownership/permission if requested. Note we must do this * before we listen() to avoid a window where unwanted connections * could get accepted. */ Assert(Unix_socket_group); if (Unix_socket_group[0] != '\0') {#ifdef WIN32 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");#else char *endptr; unsigned long int val; gid_t gid; val = strtoul(Unix_socket_group, &endptr, 10); if (*endptr == '\0') { /* numeric group id */ gid = val; } else { /* convert group name to id */ struct group *gr; gr = getgrnam(Unix_socket_group); if (!gr) { ereport(LOG, (errmsg("group \"%s\" does not exist", Unix_socket_group))); return STATUS_ERROR; } gid = gr->gr_gid; } if (chown(sock_path, -1, gid) == -1) { ereport(LOG, (errcode_for_file_access(), errmsg("could not set group of file \"%s\": %m", sock_path))); return STATUS_ERROR; }#endif } if (chmod(sock_path, Unix_socket_permissions) == -1) { ereport(LOG, (errcode_for_file_access(), errmsg("could not set permissions of file \"%s\": %m", sock_path))); return STATUS_ERROR; } return STATUS_OK;}#endif /* HAVE_UNIX_SOCKETS *//* * StreamConnection -- create a new connection with client using * server port. * * ASSUME: that this doesn't need to be non-blocking because * the Postmaster uses select() to tell when the server master * socket is ready for accept(). * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamConnection(int server_fd, Port *port){ /* accept connection and fill in the client (remote) address */ port->raddr.salen = sizeof(port->raddr.addr); if ((port->sock = accept(server_fd, (struct sockaddr *) & port->raddr.addr, &port->raddr.salen)) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not accept new connection: %m"))); return STATUS_ERROR; }#ifdef SCO_ACCEPT_BUG /* * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?