📄 pqcomm.c
字号:
/*------------------------------------------------------------------------- * * pqcomm.c * Communication functions between the Frontend and the Backend * * These routines handle the low-level details of communication between * frontend and backend. They just shove data across the communication * channel, and are ignorant of the semantics of the data --- or would be, * except for major brain damage in the design of the COPY OUT protocol. * Unfortunately, COPY OUT is designed to commandeer the communication * channel (it just transfers data without wrapping it into messages). * No other messages can be sent while COPY OUT is in progress; and if the * copy is aborted by an elog(ERROR), we need to close out the copy so that * the frontend gets back into sync. Therefore, these routines have to be * aware of COPY OUT state. * * NOTE: generally, it's a bad idea to emit outgoing messages directly with * pq_putbytes(), especially if the message would require multiple calls * to send. Instead, use the routines in pqformat.c to construct the message * in a buffer and then emit it in one call to pq_putmessage. This helps * ensure that the channel will not be clogged by an incomplete message * if execution is aborted by elog(ERROR) partway through the message. * The only non-libpq code that should call pq_putbytes directly is COPY OUT. * * At one time, libpq was shared between frontend and backend, but now * the backend's "backend/libpq" is quite separate from "interfaces/libpq". * All that remains is similarities of names to trap the unwary... * * Copyright (c) 1994, Regents of the University of California * * $Id: pqcomm.c,v 1.73.2.1 1999/09/08 23:00:51 tgl Exp $ * *------------------------------------------------------------------------- *//*------------------------ * INTERFACE ROUTINES * * setup/teardown: * StreamServerPort - Open postmaster's server port * StreamConnection - Create new connection with client * StreamClose - Close a client/backend connection * pq_getport - return the PGPORT setting * pq_init - initialize libpq at backend startup * pq_close - shutdown libpq at backend exit * * low-level I/O: * pq_getbytes - get a known number of bytes from connection * pq_getstring - get a null terminated string from connection * pq_peekbyte - peek at next byte from connection * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output * * message-level I/O (and COPY OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning * pq_endcopyout - end a COPY OUT transfer * *------------------------ */#include "postgres.h"#include <stdio.h>#if defined(HAVE_STRING_H)#include <string.h>#else#include <strings.h>#endif#include <signal.h>#include <errno.h>#include <fcntl.h>#include <unistd.h> /* for ttyname() */#include <sys/types.h>#include <sys/stat.h>#include <sys/socket.h>#include <netdb.h>#include <netinet/in.h>#include <netinet/tcp.h>#include <arpa/inet.h>#include <sys/file.h>#include "libpq/libpq.h" /* where my declarations go */#include "miscadmin.h"#include "libpq/pqsignal.h"#include "libpq/auth.h"#include "storage/ipc.h"#include "utils/trace.h"#ifndef SOMAXCONN#define SOMAXCONN 5 /* from Linux listen(2) man page */#endif /* SOMAXCONN */extern FILE *debug_port; /* in util.c *//* * Buffers for low-level I/O */#define PQ_BUFFER_SIZE 8192static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];static int PqSendPointer; /* Next index to store a byte in * PqSendBuffer */static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];static int PqRecvPointer; /* Next index to read a byte from * PqRecvBuffer */static int PqRecvLength; /* End of data available in PqRecvBuffer *//* * Message status */static bool DoingCopyOut;/* -------------------------------- * pq_init - initialize libpq at backend startup * -------------------------------- */voidpq_init(void){ PqSendPointer = PqRecvPointer = PqRecvLength = 0; DoingCopyOut = false; if (getenv("LIBPQ_DEBUG")) debug_port = stderr;}/* -------------------------------- * pq_getport - return the PGPORT setting * -------------------------------- */intpq_getport(void){ char *envport = getenv("PGPORT"); if (envport) return atoi(envport); return atoi(DEF_PGPORT);}/* -------------------------------- * pq_close - shutdown libpq at backend exit * -------------------------------- */voidpq_close(void){ close(MyProcPort->sock);}/* * Streams -- wrapper around Unix socket system calls * * * Stream functions are used for vanilla TCP connection protocol. */static char sock_path[MAXPGPATH + 1] = "";/* StreamDoUnlink() * Shutdown routine for backend connection * If a Unix socket is used for communication, explicitly close it. */static voidStreamDoUnlink(){ Assert(sock_path[0]); unlink(sock_path);}/* * StreamServerPort -- open a sock stream "listening" port. * * This initializes the Postmaster's connection-accepting port. * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamServerPort(char *hostName, unsigned short portName, int *fdP){ SockAddr saddr; int fd, err, family; size_t len; int one = 1;#ifdef HAVE_FCNTL_SETLK int lock_fd;#endif family = ((hostName != NULL) ? AF_INET : AF_UNIX); if ((fd = socket(family, SOCK_STREAM, 0)) < 0) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: StreamServerPort: socket() failed: %s\n", strerror(errno)); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return STATUS_ERROR; }#ifdef ONLY_REUSE_INET_SOCKETS if (family == AF_INET) {#endif if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one))) == -1) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: StreamServerPort: setsockopt(SO_REUSEADDR) failed: %s\n", strerror(errno)); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return STATUS_ERROR; }#ifdef ONLY_REUSE_INET_SOCKETS }#endif MemSet((char *) &saddr, 0, sizeof(saddr)); saddr.sa.sa_family = family; if (family == AF_UNIX) { len = UNIXSOCK_PATH(saddr.un, portName); strcpy(sock_path, saddr.un.sun_path); /* * If the socket exists but nobody has an advisory lock on it we * can safely delete the file. */#ifdef HAVE_FCNTL_SETLK#ifndef __CYGWIN32__ if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK, 0666)) >= 0)#else if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK | O_BINARY, 0666)) >= 0)#endif { struct flock lck; lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; if (fcntl(lock_fd, F_SETLK, &lck) == 0) { TPRINTF(TRACE_VERBOSE, "flock on %s, deleting", sock_path); unlink(sock_path); } else TPRINTF(TRACE_VERBOSE, "flock failed for %s", sock_path); close(lock_fd); }#endif /* HAVE_FCNTL_SETLK */ } else { saddr.in.sin_addr.s_addr = htonl(INADDR_ANY); saddr.in.sin_port = htons(portName); len = sizeof(struct sockaddr_in); } err = bind(fd, &saddr.sa, len); if (err < 0) { snprintf(PQerrormsg, ERROR_MSG_LENGTH, "FATAL: StreamServerPort: bind() failed: %s\n", strerror(errno)); strcat(PQerrormsg, "\tIs another postmaster already running on that port?\n"); if (family == AF_UNIX) { snprintf(PQerrormsg + strlen(PQerrormsg), ERROR_MSG_LENGTH - strlen(PQerrormsg), "\tIf not, remove socket node (%s) and retry.\n", sock_path); } else strcat(PQerrormsg, "\tIf not, wait a few seconds and retry.\n"); fputs(PQerrormsg, stderr); pqdebug("%s", PQerrormsg); return STATUS_ERROR; } if (family == AF_UNIX) { on_proc_exit(StreamDoUnlink, NULL); /* * Open the socket file and get an advisory lock on it. The * lock_fd is left open to keep the lock. */#ifdef HAVE_FCNTL_SETLK#ifndef __CYGWIN32__ if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK, 0666)) >= 0)#else if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK | O_BINARY, 0666)) >= 0)#endif { struct flock lck; lck.l_whence = SEEK_SET; lck.l_start = lck.l_len = 0; lck.l_type = F_WRLCK; if (fcntl(lock_fd, F_SETLK, &lck) != 0) TPRINTF(TRACE_VERBOSE, "flock error for %s", sock_path); }#endif /* HAVE_FCNTL_SETLK */ } listen(fd, SOMAXCONN); /* * MS: I took this code from Dillon's version. It makes the listening * port non-blocking. That is not necessary (and may tickle kernel * bugs). * * fcntl(fd, F_SETFD, 1); fcntl(fd, F_SETFL, FNDELAY); */ *fdP = fd; if (family == AF_UNIX) chmod(sock_path, 0777); return STATUS_OK;}/* * StreamConnection -- create a new connection with client using * server port. * * ASSUME: that this doesn't need to be non-blocking because * the Postmaster uses select() to tell when the server master * socket is ready for accept(). * * NB: this can NOT call elog() because it is invoked in the postmaster, * not in standard backend context. If we get an error, the best we can do * is log it to stderr. * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamConnection(int server_fd, Port *port){ SOCKET_SIZE_TYPE addrlen; /* accept connection (and fill in the client (remote) address) */ addrlen = sizeof(port->raddr); if ((port->sock = accept(server_fd,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -