📄 fe-protocol3.c
字号:
libpq_gettext("PQgetline: not doing text COPY OUT\n")); *s = '\0'; return EOF; } while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0) { /* need to load more data */ if (pqWait(TRUE, FALSE, conn) || pqReadData(conn) < 0) { *s = '\0'; return EOF; } } if (status < 0) { /* End of copy detected; gin up old-style terminator */ strcpy(s, "\\."); return 0; } /* Add null terminator, and strip trailing \n if present */ if (s[status - 1] == '\n') { s[status - 1] = '\0'; return 0; } else { s[status] = '\0'; return 1; }}/* * PQgetlineAsync - gets a COPY data row without blocking. * * See fe-exec.c for documentation. */intpqGetlineAsync3(PGconn *conn, char *buffer, int bufsize){ char id; int msgLength; int avail; if (conn->asyncStatus != PGASYNC_COPY_OUT) return -1; /* we are not doing a copy... */ /* * Recognize the next input message. To make life simpler for async * callers, we keep returning 0 until the next message is fully available * even if it is not Copy Data. This should keep PQendcopy from blocking. */ conn->inCursor = conn->inStart; if (pqGetc(&id, conn)) return 0; if (pqGetInt(&msgLength, 4, conn)) return 0; avail = conn->inEnd - conn->inCursor; if (avail < msgLength - 4) return 0; /* * Cannot proceed unless it's a Copy Data message. Anything else means * end of copy mode. */ if (id != 'd') return -1; /* * Move data from libpq's buffer to the caller's. In the case where a * prior call found the caller's buffer too small, we use * conn->copy_already_done to remember how much of the row was already * returned to the caller. */ conn->inCursor += conn->copy_already_done; avail = msgLength - 4 - conn->copy_already_done; if (avail <= bufsize) { /* Able to consume the whole message */ memcpy(buffer, &conn->inBuffer[conn->inCursor], avail); /* Mark message consumed */ conn->inStart = conn->inCursor + avail; /* Reset state for next time */ conn->copy_already_done = 0; return avail; } else { /* We must return a partial message */ memcpy(buffer, &conn->inBuffer[conn->inCursor], bufsize); /* The message is NOT consumed from libpq's buffer */ conn->copy_already_done += bufsize; return bufsize; }}/* * PQendcopy * * See fe-exec.c for documentation. */intpqEndcopy3(PGconn *conn){ PGresult *result; if (conn->asyncStatus != PGASYNC_COPY_IN && conn->asyncStatus != PGASYNC_COPY_OUT) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); return 1; } /* Send the CopyDone message if needed */ if (conn->asyncStatus == PGASYNC_COPY_IN) { if (pqPutMsgStart('c', false, conn) < 0 || pqPutMsgEnd(conn) < 0) return 1; /* * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ if (conn->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', false, conn) < 0 || pqPutMsgEnd(conn) < 0) return 1; } } /* * make sure no data is waiting to be sent, abort if we are non-blocking * and the flush fails */ if (pqFlush(conn) && pqIsnonblocking(conn)) return (1); /* Return to active duty */ conn->asyncStatus = PGASYNC_BUSY; resetPQExpBuffer(&conn->errorMessage); /* * Non blocking connections may have to abort at this point. If everyone * played the game there should be no problem, but in error scenarios the * expected messages may not have arrived yet. (We are assuming that the * backend's packetizing will ensure that CommandComplete arrives along * with the CopyDone; are there corner cases where that doesn't happen?) */ if (pqIsnonblocking(conn) && PQisBusy(conn)) return (1); /* Wait for the completion response */ result = PQgetResult(conn); /* Expecting a successful result */ if (result && result->resultStatus == PGRES_COMMAND_OK) { PQclear(result); return 0; } /* * Trouble. For backwards-compatibility reasons, we issue the error * message as if it were a notice (would be nice to get rid of this * silliness, but too many apps probably don't handle errors from * PQendcopy reasonably). Note that the app can still obtain the error * status from the PGconn object. */ if (conn->errorMessage.len > 0) { /* We have to strip the trailing newline ... pain in neck... */ char svLast = conn->errorMessage.data[conn->errorMessage.len - 1]; if (svLast == '\n') conn->errorMessage.data[conn->errorMessage.len - 1] = '\0'; pqInternalNotice(&conn->noticeHooks, "%s", conn->errorMessage.data); conn->errorMessage.data[conn->errorMessage.len - 1] = svLast; } PQclear(result); return 1;}/* * PQfn - Send a function call to the POSTGRES backend. * * See fe-exec.c for documentation. */PGresult *pqFunctionCall3(PGconn *conn, Oid fnid, int *result_buf, int *actual_result_len, int result_is_int, const PQArgBlock *args, int nargs){ bool needInput = false; ExecStatusType status = PGRES_FATAL_ERROR; char id; int msgLength; int avail; int i; /* PQfn already validated connection state */ if (pqPutMsgStart('F', false, conn) < 0 || /* function call msg */ pqPutInt(fnid, 4, conn) < 0 || /* function id */ pqPutInt(1, 2, conn) < 0 || /* # of format codes */ pqPutInt(1, 2, conn) < 0 || /* format code: BINARY */ pqPutInt(nargs, 2, conn) < 0) /* # of args */ { pqHandleSendFailure(conn); return NULL; } for (i = 0; i < nargs; ++i) { /* len.int4 + contents */ if (pqPutInt(args[i].len, 4, conn)) { pqHandleSendFailure(conn); return NULL; } if (args[i].len == -1) continue; /* it's NULL */ if (args[i].isint) { if (pqPutInt(args[i].u.integer, args[i].len, conn)) { pqHandleSendFailure(conn); return NULL; } } else { if (pqPutnchar((char *) args[i].u.ptr, args[i].len, conn)) { pqHandleSendFailure(conn); return NULL; } } } if (pqPutInt(1, 2, conn) < 0) /* result format code: BINARY */ { pqHandleSendFailure(conn); return NULL; } if (pqPutMsgEnd(conn) < 0 || pqFlush(conn)) { pqHandleSendFailure(conn); return NULL; } for (;;) { if (needInput) { /* Wait for some data to arrive (or for the channel to close) */ if (pqWait(TRUE, FALSE, conn) || pqReadData(conn) < 0) break; } /* * Scan the message. If we run out of data, loop around to try again. */ needInput = true; conn->inCursor = conn->inStart; if (pqGetc(&id, conn)) continue; if (pqGetInt(&msgLength, 4, conn)) continue; /* * Try to validate message type/length here. A length less than 4 is * definitely broken. Large lengths should only be believed for a few * message types. */ if (msgLength < 4) { handleSyncLoss(conn, id, msgLength); break; } if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id)) { handleSyncLoss(conn, id, msgLength); break; } /* * Can't process if message body isn't all here yet. */ msgLength -= 4; avail = conn->inEnd - conn->inCursor; if (avail < msgLength) { /* * Before looping, enlarge the input buffer if needed to hold the * whole message. See notes in parseInput. */ if (pqCheckInBufferSpace(conn->inCursor + msgLength, conn)) { /* * XXX add some better recovery code... plan is to skip over * the message using its length, then report an error. For the * moment, just treat this like loss of sync (which indeed it * might be!) */ handleSyncLoss(conn, id, msgLength); break; } continue; } /* * We should see V or E response to the command, but might get N * and/or A notices first. We also need to swallow the final Z before * returning. */ switch (id) { case 'V': /* function result */ if (pqGetInt(actual_result_len, 4, conn)) continue; if (*actual_result_len != -1) { if (result_is_int) { if (pqGetInt(result_buf, *actual_result_len, conn)) continue; } else { if (pqGetnchar((char *) result_buf, *actual_result_len, conn)) continue; } } /* correctly finished function result message */ status = PGRES_COMMAND_OK; break; case 'E': /* error return */ if (pqGetErrorNotice3(conn, true)) continue; status = PGRES_FATAL_ERROR; break; case 'A': /* notify message */ /* handle notify and go back to processing return values */ if (getNotify(conn)) continue; break; case 'N': /* notice */ /* handle notice and go back to processing return values */ if (pqGetErrorNotice3(conn, false)) continue; break; case 'Z': /* backend is ready for new query */ if (getReadyForQuery(conn)) continue; /* consume the message and exit */ conn->inStart += 5 + msgLength; /* if we saved a result object (probably an error), use it */ if (conn->result) return pqPrepareAsyncResult(conn); return PQmakeEmptyPGresult(conn, status); case 'S': /* parameter status */ if (getParameterStatus(conn)) continue; break; default: /* The backend violates the protocol. */ printfPQExpBuffer(&conn->errorMessage, libpq_gettext("protocol error: id=0x%x\n"), id); pqSaveErrorResult(conn); /* trust the specified message length as what to skip */ conn->inStart += 5 + msgLength; return pqPrepareAsyncResult(conn); } /* Completed this message, keep going */ /* trust the specified message length as what to skip */ conn->inStart += 5 + msgLength; needInput = false; } /* * We fall out of the loop only upon failing to read data. * conn->errorMessage has been set by pqWait or pqReadData. We want to * append it to any already-received error message. */ pqSaveErrorResult(conn); return pqPrepareAsyncResult(conn);}/* * Construct startup packet * * Returns a malloc'd packet buffer, or NULL if out of memory */char *pqBuildStartupPacket3(PGconn *conn, int *packetlen, const PQEnvironmentOption *options){ char *startpacket; *packetlen = build_startup_packet(conn, NULL, options); startpacket = (char *) malloc(*packetlen); if (!startpacket) return NULL; *packetlen = build_startup_packet(conn, startpacket, options); return startpacket;}/* * Build a startup packet given a filled-in PGconn structure. * * We need to figure out how much space is needed, then fill it in. * To avoid duplicate logic, this routine is called twice: the first time * (with packet == NULL) just counts the space needed, the second time * (with packet == allocated space) fills it in. Return value is the number * of bytes used. */static intbuild_startup_packet(const PGconn *conn, char *packet, const PQEnvironmentOption *options){ int packet_len = 0; const PQEnvironmentOption *next_eo; /* Protocol version comes first. */ if (packet) { ProtocolVersion pv = htonl(conn->pversion); memcpy(packet + packet_len, &pv, sizeof(ProtocolVersion)); } packet_len += sizeof(ProtocolVersion); /* Add user name, database name, options */ if (conn->pguser && conn->pguser[0]) { if (packet) strcpy(packet + packet_len, "user"); packet_len += strlen("user") + 1; if (packet) strcpy(packet + packet_len, conn->pguser); packet_len += strlen(conn->pguser) + 1; } if (conn->dbName && conn->dbName[0]) { if (packet) strcpy(packet + packet_len, "database"); packet_len += strlen("database") + 1; if (packet) strcpy(packet + packet_len, conn->dbName); packet_len += strlen(conn->dbName) + 1; } if (conn->pgoptions && conn->pgoptions[0]) { if (packet) strcpy(packet + packet_len, "options"); packet_len += strlen("options") + 1; if (packet) strcpy(packet + packet_len, conn->pgoptions); packet_len += strlen(conn->pgoptions) + 1; } /* Add any environment-driven GUC settings needed */ for (next_eo = options; next_eo->envName; next_eo++) { const char *val; if ((val = getenv(next_eo->envName)) != NULL) { if (pg_strcasecmp(val, "default") != 0) { if (packet) strcpy(packet + packet_len, next_eo->pgName); packet_len += strlen(next_eo->pgName) + 1; if (packet) strcpy(packet + packet_len, val); packet_len += strlen(val) + 1; } } } /* Add trailing terminator */ if (packet) packet[packet_len] = '\0'; packet_len++; return packet_len;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -