📄 pqcomm.c
字号:
*/static intSetup_AF_UNIX(void){ /* Arrange to unlink the socket file at exit */ on_proc_exit(StreamDoUnlink, 0); /* * Fix socket ownership/permission if requested. Note we must do this * before we listen() to avoid a window where unwanted connections could * get accepted. */ Assert(Unix_socket_group); if (Unix_socket_group[0] != '\0') {#ifdef WIN32 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");#else char *endptr; unsigned long val; gid_t gid; val = strtoul(Unix_socket_group, &endptr, 10); if (*endptr == '\0') { /* numeric group id */ gid = val; } else { /* convert group name to id */ struct group *gr; gr = getgrnam(Unix_socket_group); if (!gr) { ereport(LOG, (errmsg("group \"%s\" does not exist", Unix_socket_group))); return STATUS_ERROR; } gid = gr->gr_gid; } if (chown(sock_path, -1, gid) == -1) { ereport(LOG, (errcode_for_file_access(), errmsg("could not set group of file \"%s\": %m", sock_path))); return STATUS_ERROR; }#endif } if (chmod(sock_path, Unix_socket_permissions) == -1) { ereport(LOG, (errcode_for_file_access(), errmsg("could not set permissions of file \"%s\": %m", sock_path))); return STATUS_ERROR; } return STATUS_OK;}#endif /* HAVE_UNIX_SOCKETS *//* * StreamConnection -- create a new connection with client using * server port. * * ASSUME: that this doesn't need to be non-blocking because * the Postmaster uses select() to tell when the server master * socket is ready for accept(). * * RETURNS: STATUS_OK or STATUS_ERROR */intStreamConnection(int server_fd, Port *port){ /* accept connection and fill in the client (remote) address */ port->raddr.salen = sizeof(port->raddr.addr); if ((port->sock = accept(server_fd, (struct sockaddr *) & port->raddr.addr, &port->raddr.salen)) < 0) { ereport(LOG, (errcode_for_socket_access(), errmsg("could not accept new connection: %m"))); return STATUS_ERROR; }#ifdef SCO_ACCEPT_BUG /* * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it * shouldn't hurt to catch it for all versions of those platforms. */ if (port->raddr.addr.ss_family == 0) port->raddr.addr.ss_family = AF_UNIX;#endif /* fill in the server (local) address */ port->laddr.salen = sizeof(port->laddr.addr); if (getsockname(port->sock, (struct sockaddr *) & port->laddr.addr, &port->laddr.salen) < 0) { elog(LOG, "getsockname() failed: %m"); return STATUS_ERROR; } /* select NODELAY and KEEPALIVE options if it's a TCP connection */ if (!IS_AF_UNIX(port->laddr.addr.ss_family)) { int on;#ifdef TCP_NODELAY on = 1; if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0) { elog(LOG, "setsockopt(TCP_NODELAY) failed: %m"); return STATUS_ERROR; }#endif on = 1; if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) < 0) { elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m"); return STATUS_ERROR; } /* * Also apply the current keepalive parameters. If we fail to set a * parameter, don't error out, because these aren't universally * supported. (Note: you might think we need to reset the GUC * variables to 0 in such a case, but it's not necessary because the * show hooks for these variables report the truth anyway.) */ (void) pq_setkeepalivesidle(tcp_keepalives_idle, port); (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port); (void) pq_setkeepalivescount(tcp_keepalives_count, port); } return STATUS_OK;}/* * StreamClose -- close a client/backend connection * * NOTE: this is NOT used to terminate a session; it is just used to release * the file descriptor in a process that should no longer have the socket * open. (For example, the postmaster calls this after passing ownership * of the connection to a child process.) It is expected that someone else * still has the socket open. So, we only want to close the descriptor, * we do NOT want to send anything to the far end. */voidStreamClose(int sock){ closesocket(sock);}/* * TouchSocketFile -- mark socket file as recently accessed * * This routine should be called every so often to ensure that the socket * file has a recent mod date (ordinary operations on sockets usually won't * change the mod date). That saves it from being removed by * overenthusiastic /tmp-directory-cleaner daemons. (Another reason we should * never have put the socket file in /tmp...) */voidTouchSocketFile(void){ /* Do nothing if we did not create a socket... */ if (sock_path[0] != '\0') { /* * utime() is POSIX standard, utimes() is a common alternative. If we * have neither, there's no way to affect the mod or access time of * the socket :-( * * In either path, we ignore errors; there's no point in complaining. */#ifdef HAVE_UTIME utime(sock_path, NULL);#else /* !HAVE_UTIME */#ifdef HAVE_UTIMES utimes(sock_path, NULL);#endif /* HAVE_UTIMES */#endif /* HAVE_UTIME */ }}/* -------------------------------- * Low-level I/O routines begin here. * * These routines communicate with a frontend client across a connection * already established by the preceding routines. * -------------------------------- *//* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * * returns 0 if OK, EOF if trouble * -------------------------------- */static intpq_recvbuf(void){ if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) { /* still some unread data, left-justify it in the buffer */ memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer, PqRecvLength - PqRecvPointer); PqRecvLength -= PqRecvPointer; PqRecvPointer = 0; } else PqRecvLength = PqRecvPointer = 0; } /* Can fill buffer from PqRecvLength and upwards */ for (;;) { int r; r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, PQ_BUFFER_SIZE - PqRecvLength); if (r < 0) { if (errno == EINTR) continue; /* Ok if interrupted */ /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core * dump! This message must go *only* to the postmaster log. */ ereport(COMMERROR, (errcode_for_socket_access(), errmsg("could not receive data from client: %m"))); return EOF; } if (r == 0) { /* * EOF detected. We used to write a log message here, but it's * better to expect the ultimate caller to do that. */ return EOF; } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; return 0; }}/* -------------------------------- * pq_getbyte - get a single byte from connection, or return EOF * -------------------------------- */intpq_getbyte(void){ while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++];}/* -------------------------------- * pq_peekbyte - peek at next byte from connection * * Same as pq_getbyte() except we don't advance the pointer. * -------------------------------- */intpq_peekbyte(void){ while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer];}/* -------------------------------- * pq_getbytes - get a known number of bytes from connection * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_getbytes(char *s, size_t len){ size_t amount; while (len > 0) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; if (amount > len) amount = len; memcpy(s, PqRecvBuffer + PqRecvPointer, amount); PqRecvPointer += amount; s += amount; len -= amount; } return 0;}/* -------------------------------- * pq_discardbytes - throw away a known number of bytes * * same as pq_getbytes except we do not copy the data to anyplace. * this is used for resynchronizing after read errors. * * returns 0 if OK, EOF if trouble * -------------------------------- */static intpq_discardbytes(size_t len){ size_t amount; while (len > 0) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; if (amount > len) amount = len; PqRecvPointer += amount; len -= amount; } return 0;}/* -------------------------------- * pq_getstring - get a null terminated string from connection * * The return value is placed in an expansible StringInfo, which has * already been initialized by the caller. * * This is used only for dealing with old-protocol clients. The idea * is to produce a StringInfo that looks the same as we would get from * pq_getmessage() with a newer client; we will then process it with * pq_getmsgstring. Therefore, no character set conversion is done here, * even though this is presumably useful only for text. * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_getstring(StringInfo s){ int i; /* Reset string to empty */ s->len = 0; s->data[0] = '\0'; s->cursor = 0; /* Read until we get the terminating '\0' */ for (;;) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } for (i = PqRecvPointer; i < PqRecvLength; i++) { if (PqRecvBuffer[i] == '\0') { /* include the '\0' in the copy */ appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer, i - PqRecvPointer + 1); PqRecvPointer = i + 1; /* advance past \0 */ return 0; } } /* If we're here we haven't got the \0 in the buffer yet. */ appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer, PqRecvLength - PqRecvPointer); PqRecvPointer = PqRecvLength; }}/* -------------------------------- * pq_getmessage - get a message with length word from connection * * The return value is placed in an expansible StringInfo, which has * already been initialized by the caller. * Only the message body is placed in the StringInfo; the length word * is removed. Also, s->cursor is initialized to zero for convenience * in scanning the message contents. * * If maxlen is not zero, it is an upper limit on the length of the * message we are willing to accept. We abort the connection (by * returning EOF) if client tries to send more than that. * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_getmessage(StringInfo s, int maxlen){ int32 len; /* Reset message buffer to empty */ s->len = 0; s->data[0] = '\0'; s->cursor = 0; /* Read message length word */ if (pq_getbytes((char *) &len, 4) == EOF) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF within message length word"))); return EOF; } len = ntohl(len); if (len < 4 || (maxlen > 0 && len > maxlen)) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid message length"))); return EOF; } len -= 4; /* discount length itself */ if (len > 0) { /* * Allocate space for message. If we run out of room (ridiculously * large message), we will elog(ERROR), but we want to discard the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -