pqcomm.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 1,111 行 · 第 1/2 页
C
1,111 行
* shouldn't hurt to catch it for all versions of those platforms. */ if (port->raddr.addr.ss_family == 0) port->raddr.addr.ss_family = AF_UNIX;#endif /* fill in the server (local) address */ port->laddr.salen = sizeof(port->laddr.addr); if (getsockname(port->sock, (struct sockaddr *) & port->laddr.addr, &port->laddr.salen) < 0) { elog(LOG, "getsockname() failed: %m"); return STATUS_ERROR; } /* select NODELAY and KEEPALIVE options if it's a TCP connection */ if (!IS_AF_UNIX(port->laddr.addr.ss_family)) { int on;#ifdef TCP_NODELAY on = 1; if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0) { elog(LOG, "setsockopt(TCP_NODELAY) failed: %m"); return STATUS_ERROR; }#endif on = 1; if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) < 0) { elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m"); return STATUS_ERROR; } } return STATUS_OK;}/* * StreamClose -- close a client/backend connection * * NOTE: this is NOT used to terminate a session; it is just used to release * the file descriptor in a process that should no longer have the socket * open. (For example, the postmaster calls this after passing ownership * of the connection to a child process.) It is expected that someone else * still has the socket open. So, we only want to close the descriptor, * we do NOT want to send anything to the far end. */voidStreamClose(int sock){ closesocket(sock);}/* * TouchSocketFile -- mark socket file as recently accessed * * This routine should be called every so often to ensure that the socket * file has a recent mod date (ordinary operations on sockets usually won't * change the mod date). That saves it from being removed by * overenthusiastic /tmp-directory-cleaner daemons. (Another reason we should * never have put the socket file in /tmp...) */voidTouchSocketFile(void){ /* Do nothing if we did not create a socket... */ if (sock_path[0] != '\0') { /* * utime() is POSIX standard, utimes() is a common alternative. If * we have neither, there's no way to affect the mod or access * time of the socket :-( * * In either path, we ignore errors; there's no point in complaining. */#ifdef HAVE_UTIME utime(sock_path, NULL);#else /* !HAVE_UTIME */#ifdef HAVE_UTIMES utimes(sock_path, NULL);#endif /* HAVE_UTIMES */#endif /* HAVE_UTIME */ }}/* -------------------------------- * Low-level I/O routines begin here. * * These routines communicate with a frontend client across a connection * already established by the preceding routines. * -------------------------------- *//* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * * returns 0 if OK, EOF if trouble * -------------------------------- */static intpq_recvbuf(void){ if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) { /* still some unread data, left-justify it in the buffer */ memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer, PqRecvLength - PqRecvPointer); PqRecvLength -= PqRecvPointer; PqRecvPointer = 0; } else PqRecvLength = PqRecvPointer = 0; } /* Can fill buffer from PqRecvLength and upwards */ for (;;) { int r; r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, PQ_BUFFER_SIZE - PqRecvLength); if (r < 0) { if (errno == EINTR) continue; /* Ok if interrupted */ /* * Careful: an ereport() that tries to write to the client * would cause recursion to here, leading to stack overflow * and core dump! This message must go *only* to the * postmaster log. */ ereport(COMMERROR, (errcode_for_socket_access(), errmsg("could not receive data from client: %m"))); return EOF; } if (r == 0) { /* * EOF detected. We used to write a log message here, but * it's better to expect the ultimate caller to do that. */ return EOF; } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; return 0; }}/* -------------------------------- * pq_getbyte - get a single byte from connection, or return EOF * -------------------------------- */intpq_getbyte(void){ while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return PqRecvBuffer[PqRecvPointer++];}/* -------------------------------- * pq_peekbyte - peek at next byte from connection * * Same as pq_getbyte() except we don't advance the pointer. * -------------------------------- */intpq_peekbyte(void){ while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return PqRecvBuffer[PqRecvPointer];}/* -------------------------------- * pq_getbytes - get a known number of bytes from connection * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_getbytes(char *s, size_t len){ size_t amount; while (len > 0) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; if (amount > len) amount = len; memcpy(s, PqRecvBuffer + PqRecvPointer, amount); PqRecvPointer += amount; s += amount; len -= amount; } return 0;}/* -------------------------------- * pq_getstring - get a null terminated string from connection * * The return value is placed in an expansible StringInfo, which has * already been initialized by the caller. * * This is used only for dealing with old-protocol clients. The idea * is to produce a StringInfo that looks the same as we would get from * pq_getmessage() with a newer client; we will then process it with * pq_getmsgstring. Therefore, no character set conversion is done here, * even though this is presumably useful only for text. * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_getstring(StringInfo s){ int i; /* Reset string to empty */ s->len = 0; s->data[0] = '\0'; s->cursor = 0; /* Read until we get the terminating '\0' */ for (;;) { while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } for (i = PqRecvPointer; i < PqRecvLength; i++) { if (PqRecvBuffer[i] == '\0') { /* include the '\0' in the copy */ appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer, i - PqRecvPointer + 1); PqRecvPointer = i + 1; /* advance past \0 */ return 0; } } /* If we're here we haven't got the \0 in the buffer yet. */ appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer, PqRecvLength - PqRecvPointer); PqRecvPointer = PqRecvLength; }}/* -------------------------------- * pq_getmessage - get a message with length word from connection * * The return value is placed in an expansible StringInfo, which has * already been initialized by the caller. * Only the message body is placed in the StringInfo; the length word * is removed. Also, s->cursor is initialized to zero for convenience * in scanning the message contents. * * If maxlen is not zero, it is an upper limit on the length of the * message we are willing to accept. We abort the connection (by * returning EOF) if client tries to send more than that. * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_getmessage(StringInfo s, int maxlen){ int32 len; /* Reset message buffer to empty */ s->len = 0; s->data[0] = '\0'; s->cursor = 0; /* Read message length word */ if (pq_getbytes((char *) &len, 4) == EOF) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF within message length word"))); return EOF; } len = ntohl(len); len -= 4; /* discount length itself */ if (len < 0 || (maxlen > 0 && len > maxlen)) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid message length"))); return EOF; } if (len > 0) { /* Allocate space for message */ enlargeStringInfo(s, len); /* And grab the message */ if (pq_getbytes(s->data, len) == EOF) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("incomplete message from client"))); return EOF; } s->len = len; /* Place a trailing null per StringInfo convention */ s->data[len] = '\0'; } return 0;}/* -------------------------------- * pq_putbytes - send bytes to connection (not flushed until pq_flush) * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_putbytes(const char *s, size_t len){ int res; /* Should only be called by old-style COPY OUT */ Assert(DoingCopyOut); /* No-op if reentrant call */ if (PqCommBusy) return 0; PqCommBusy = true; res = internal_putbytes(s, len); PqCommBusy = false; return res;}static intinternal_putbytes(const char *s, size_t len){ size_t amount; while (len > 0) { /* If buffer is full, then flush it out */ if (PqSendPointer >= PQ_BUFFER_SIZE) if (internal_flush()) return EOF; amount = PQ_BUFFER_SIZE - PqSendPointer; if (amount > len) amount = len; memcpy(PqSendBuffer + PqSendPointer, s, amount); PqSendPointer += amount; s += amount; len -= amount; } return 0;}/* -------------------------------- * pq_flush - flush pending output * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_flush(void){ int res; /* No-op if reentrant call */ if (PqCommBusy) return 0; PqCommBusy = true; res = internal_flush(); PqCommBusy = false; return res;}static intinternal_flush(void){ static int last_reported_send_errno = 0; unsigned char *bufptr = PqSendBuffer; unsigned char *bufend = PqSendBuffer + PqSendPointer; while (bufptr < bufend) { int r; r = secure_write(MyProcPort, bufptr, bufend - bufptr); if (r <= 0) { if (errno == EINTR) continue; /* Ok if we were interrupted */ /* * Careful: an ereport() that tries to write to the client * would cause recursion to here, leading to stack overflow * and core dump! This message must go *only* to the * postmaster log. * * If a client disconnects while we're in the midst of output, we * might write quite a bit of data before we get to a safe * query abort point. So, suppress duplicate log messages. */ if (errno != last_reported_send_errno) { last_reported_send_errno = errno; ereport(COMMERROR, (errcode_for_socket_access(), errmsg("could not send data to client: %m"))); } /* * We drop the buffered data anyway so that processing can * continue, even though we'll probably quit soon. */ PqSendPointer = 0; return EOF; } last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; } PqSendPointer = 0; return 0;}/* -------------------------------- * Message-level I/O routines begin here. * * These routines understand about the old-style COPY OUT protocol. * -------------------------------- *//* -------------------------------- * pq_putmessage - send a normal message (suppressed in COPY OUT mode) * * If msgtype is not '\0', it is a message type code to place before * the message body. If msgtype is '\0', then the message has no type * code (this is only valid in pre-3.0 protocols). * * len is the length of the message body data at *s. In protocol 3.0 * and later, a message length word (equal to len+4 because it counts * itself too) is inserted by this routine. * * All normal messages are suppressed while old-style COPY OUT is in * progress. (In practice only a few notice messages might get emitted * then; dropping them is annoying, but at least they will still appear * in the postmaster log.) * * We also suppress messages generated while pqcomm.c is busy. This * avoids any possibility of messages being inserted within other * messages. The only known trouble case arises if SIGQUIT occurs * during a pqcomm.c routine --- quickdie() will try to send a warning * message, and the most reasonable approach seems to be to drop it. * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_putmessage(char msgtype, const char *s, size_t len){ if (DoingCopyOut || PqCommBusy) return 0; PqCommBusy = true; if (msgtype) if (internal_putbytes(&msgtype, 1)) goto fail; if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { uint32 n32; n32 = htonl((uint32) (len + 4)); if (internal_putbytes((char *) &n32, 4)) goto fail; } if (internal_putbytes(s, len)) goto fail; PqCommBusy = false; return 0;fail: PqCommBusy = false; return EOF;}/* -------------------------------- * pq_startcopyout - inform libpq that an old-style COPY OUT transfer * is beginning * -------------------------------- */voidpq_startcopyout(void){ DoingCopyOut = true;}/* -------------------------------- * pq_endcopyout - end an old-style COPY OUT transfer * * If errorAbort is indicated, we are aborting a COPY OUT due to an error, * and must send a terminator line. Since a partial data line might have * been emitted, send a couple of newlines first (the first one could * get absorbed by a backslash...) Note that old-style COPY OUT does * not allow binary transfers, so a textual terminator is always correct. * -------------------------------- */voidpq_endcopyout(bool errorAbort){ if (!DoingCopyOut) return; if (errorAbort) pq_putbytes("\n\n\\.\n", 5); /* in non-error case, copy.c will have emitted the terminator line */ DoingCopyOut = false;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?