📄 pqcomm.c
字号:
* message body so as not to lose communication sync. */ PG_TRY(); { enlargeStringInfo(s, len); } PG_CATCH(); { if (pq_discardbytes(len) == EOF) ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("incomplete message from client"))); PG_RE_THROW(); } PG_END_TRY(); /* And grab the message */ if (pq_getbytes(s->data, len) == EOF) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("incomplete message from client"))); return EOF; } s->len = len; /* Place a trailing null per StringInfo convention */ s->data[len] = '\0'; } return 0;}/* -------------------------------- * pq_putbytes - send bytes to connection (not flushed until pq_flush) * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_putbytes(const char *s, size_t len){ int res; /* Should only be called by old-style COPY OUT */ Assert(DoingCopyOut); /* No-op if reentrant call */ if (PqCommBusy) return 0; PqCommBusy = true; res = internal_putbytes(s, len); PqCommBusy = false; return res;}static intinternal_putbytes(const char *s, size_t len){ size_t amount; while (len > 0) { /* If buffer is full, then flush it out */ if (PqSendPointer >= PQ_BUFFER_SIZE) if (internal_flush()) return EOF; amount = PQ_BUFFER_SIZE - PqSendPointer; if (amount > len) amount = len; memcpy(PqSendBuffer + PqSendPointer, s, amount); PqSendPointer += amount; s += amount; len -= amount; } return 0;}/* -------------------------------- * pq_flush - flush pending output * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_flush(void){ int res; /* No-op if reentrant call */ if (PqCommBusy) return 0; PqCommBusy = true; res = internal_flush(); PqCommBusy = false; return res;}static intinternal_flush(void){ static int last_reported_send_errno = 0; char *bufptr = PqSendBuffer; char *bufend = PqSendBuffer + PqSendPointer; while (bufptr < bufend) { int r; r = secure_write(MyProcPort, bufptr, bufend - bufptr); if (r <= 0) { if (errno == EINTR) continue; /* Ok if we were interrupted */ /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core * dump! This message must go *only* to the postmaster log. * * If a client disconnects while we're in the midst of output, we * might write quite a bit of data before we get to a safe query * abort point. So, suppress duplicate log messages. */ if (errno != last_reported_send_errno) { last_reported_send_errno = errno; ereport(COMMERROR, (errcode_for_socket_access(), errmsg("could not send data to client: %m"))); } /* * We drop the buffered data anyway so that processing can * continue, even though we'll probably quit soon. */ PqSendPointer = 0; return EOF; } last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; } PqSendPointer = 0; return 0;}/* -------------------------------- * Message-level I/O routines begin here. * * These routines understand about the old-style COPY OUT protocol. * -------------------------------- *//* -------------------------------- * pq_putmessage - send a normal message (suppressed in COPY OUT mode) * * If msgtype is not '\0', it is a message type code to place before * the message body. If msgtype is '\0', then the message has no type * code (this is only valid in pre-3.0 protocols). * * len is the length of the message body data at *s. In protocol 3.0 * and later, a message length word (equal to len+4 because it counts * itself too) is inserted by this routine. * * All normal messages are suppressed while old-style COPY OUT is in * progress. (In practice only a few notice messages might get emitted * then; dropping them is annoying, but at least they will still appear * in the postmaster log.) * * We also suppress messages generated while pqcomm.c is busy. This * avoids any possibility of messages being inserted within other * messages. The only known trouble case arises if SIGQUIT occurs * during a pqcomm.c routine --- quickdie() will try to send a warning * message, and the most reasonable approach seems to be to drop it. * * returns 0 if OK, EOF if trouble * -------------------------------- */intpq_putmessage(char msgtype, const char *s, size_t len){ if (DoingCopyOut || PqCommBusy) return 0; PqCommBusy = true; if (msgtype) if (internal_putbytes(&msgtype, 1)) goto fail; if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { uint32 n32; n32 = htonl((uint32) (len + 4)); if (internal_putbytes((char *) &n32, 4)) goto fail; } if (internal_putbytes(s, len)) goto fail; PqCommBusy = false; return 0;fail: PqCommBusy = false; return EOF;}/* -------------------------------- * pq_startcopyout - inform libpq that an old-style COPY OUT transfer * is beginning * -------------------------------- */voidpq_startcopyout(void){ DoingCopyOut = true;}/* -------------------------------- * pq_endcopyout - end an old-style COPY OUT transfer * * If errorAbort is indicated, we are aborting a COPY OUT due to an error, * and must send a terminator line. Since a partial data line might have * been emitted, send a couple of newlines first (the first one could * get absorbed by a backslash...) Note that old-style COPY OUT does * not allow binary transfers, so a textual terminator is always correct. * -------------------------------- */voidpq_endcopyout(bool errorAbort){ if (!DoingCopyOut) return; if (errorAbort) pq_putbytes("\n\n\\.\n", 5); /* in non-error case, copy.c will have emitted the terminator line */ DoingCopyOut = false;}/* * Support for TCP Keepalive parameters */intpq_getkeepalivesidle(Port *port){#ifdef TCP_KEEPIDLE if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family)) return 0; if (port->keepalives_idle != 0) return port->keepalives_idle; if (port->default_keepalives_idle == 0) { ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle); if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE, (char *) &port->default_keepalives_idle, &size) < 0) { elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m"); port->default_keepalives_idle = -1; /* don't know */ } } return port->default_keepalives_idle;#else return 0;#endif}intpq_setkeepalivesidle(int idle, Port *port){ if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family)) return STATUS_OK;#ifdef TCP_KEEPIDLE if (idle == port->keepalives_idle) return STATUS_OK; if (port->default_keepalives_idle <= 0) { if (pq_getkeepalivesidle(port) < 0) { if (idle == 0) return STATUS_OK; /* default is set but unknown */ else return STATUS_ERROR; } } if (idle == 0) idle = port->default_keepalives_idle; if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE, (char *) &idle, sizeof(idle)) < 0) { elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m"); return STATUS_ERROR; } port->keepalives_idle = idle;#else if (idle != 0) { elog(LOG, "setsockopt(TCP_KEEPIDLE) not supported"); return STATUS_ERROR; }#endif return STATUS_OK;}intpq_getkeepalivesinterval(Port *port){#ifdef TCP_KEEPINTVL if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family)) return 0; if (port->keepalives_interval != 0) return port->keepalives_interval; if (port->default_keepalives_interval == 0) { ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval); if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL, (char *) &port->default_keepalives_interval, &size) < 0) { elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m"); port->default_keepalives_interval = -1; /* don't know */ } } return port->default_keepalives_interval;#else return 0;#endif}intpq_setkeepalivesinterval(int interval, Port *port){ if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family)) return STATUS_OK;#ifdef TCP_KEEPINTVL if (interval == port->keepalives_interval) return STATUS_OK; if (port->default_keepalives_interval <= 0) { if (pq_getkeepalivesinterval(port) < 0) { if (interval == 0) return STATUS_OK; /* default is set but unknown */ else return STATUS_ERROR; } } if (interval == 0) interval = port->default_keepalives_interval; if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL, (char *) &interval, sizeof(interval)) < 0) { elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m"); return STATUS_ERROR; } port->keepalives_interval = interval;#else if (interval != 0) { elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported"); return STATUS_ERROR; }#endif return STATUS_OK;}intpq_getkeepalivescount(Port *port){#ifdef TCP_KEEPCNT if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family)) return 0; if (port->keepalives_count != 0) return port->keepalives_count; if (port->default_keepalives_count == 0) { ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count); if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT, (char *) &port->default_keepalives_count, &size) < 0) { elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m"); port->default_keepalives_count = -1; /* don't know */ } } return port->default_keepalives_count;#else return 0;#endif}intpq_setkeepalivescount(int count, Port *port){ if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family)) return STATUS_OK;#ifdef TCP_KEEPCNT if (count == port->keepalives_count) return STATUS_OK; if (port->default_keepalives_count <= 0) { if (pq_getkeepalivescount(port) < 0) { if (count == 0) return STATUS_OK; /* default is set but unknown */ else return STATUS_ERROR; } } if (count == 0) count = port->default_keepalives_count; if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT, (char *) &count, sizeof(count)) < 0) { elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m"); return STATUS_ERROR; } port->keepalives_count = count;#else if (count != 0) { elog(LOG, "setsockopt(TCP_KEEPCNT) not supported"); return STATUS_ERROR; }#endif return STATUS_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -