📄 copy.c
字号:
readmessage: mtype = pq_getbyte(); if (mtype == EOF) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); if (pq_getmessage(cstate->fe_msgbuf, 0)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); switch (mtype) { case 'd': /* CopyData */ break; case 'c': /* CopyDone */ /* COPY IN correctly terminated by frontend */ cstate->fe_eof = true; return bytesread; case 'f': /* CopyFail */ ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", pq_getmsgstring(cstate->fe_msgbuf)))); break; case 'H': /* Flush */ case 'S': /* Sync */ /* * Ignore Flush/Sync for the convenience of client * libraries (such as libpq) that may send those * without noticing that the command they just * sent was COPY. */ goto readmessage; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type 0x%02X during COPY from stdin", mtype))); break; } } avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; if (avail > maxread) avail = maxread; pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); databuf = (void *) ((char *) databuf + avail); maxread -= avail; bytesread += avail; } break; } return bytesread;}/* * These functions do apply some data conversion *//* * CopySendInt32 sends an int32 in network byte order */static voidCopySendInt32(CopyState cstate, int32 val){ uint32 buf; buf = htonl((uint32) val); CopySendData(cstate, &buf, sizeof(buf));}/* * CopyGetInt32 reads an int32 that appears in network byte order * * Returns true if OK, false if EOF */static boolCopyGetInt32(CopyState cstate, int32 *val){ uint32 buf; if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; } *val = (int32) ntohl(buf); return true;}/* * CopySendInt16 sends an int16 in network byte order */static voidCopySendInt16(CopyState cstate, int16 val){ uint16 buf; buf = htons((uint16) val); CopySendData(cstate, &buf, sizeof(buf));}/* * CopyGetInt16 reads an int16 that appears in network byte order */static boolCopyGetInt16(CopyState cstate, int16 *val){ uint16 buf; if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; } *val = (int16) ntohs(buf); return true;}/* * CopyLoadRawBuf loads some more data into raw_buf * * Returns TRUE if able to obtain at least one more byte, else FALSE. * * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred * down to the start of the buffer and then we load more data after that. * This case is used only when a frontend multibyte character crosses a * bufferload boundary. */static boolCopyLoadRawBuf(CopyState cstate){ int nbytes; int inbytes; if (cstate->raw_buf_index < cstate->raw_buf_len) { /* Copy down the unprocessed data */ nbytes = cstate->raw_buf_len - cstate->raw_buf_index; memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes); } else nbytes = 0; /* no data need be saved */ inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, 1, RAW_BUF_SIZE - nbytes); nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; cstate->raw_buf_len = nbytes; return (inbytes > 0);}/* * DoCopy executes the SQL COPY statement. * * Either unload or reload contents of table <relation>, depending on <from>. * (<from> = TRUE means we are inserting into the table.) * * If <pipe> is false, transfer is between the table and the file named * <filename>. Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * * Iff <binary>, unload or reload in the binary format, as opposed to the * more wasteful but more robust and portable text format. * * Iff <oids>, unload or reload the format that includes OID information. * On input, we accept OIDs whether or not the table has an OID column, * but silently drop them if it does not. On output, we report an error * if the user asks for OIDs in a table that has none (not providing an * OID column might seem friendlier, but could seriously confuse programs). * * If in the text format, delimit columns with delimiter <delim> and print * NULL values as <null_print>. * * Do not allow a Postgres user without superuser privilege to read from * or write to a file. * * Do not allow the copy if user doesn't have proper permission to access * the table. */voidDoCopy(const CopyStmt *stmt){ CopyState cstate; RangeVar *relation = stmt->relation; char *filename = stmt->filename; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); List *attnamelist = stmt->attlist; List *force_quote = NIL; List *force_notnull = NIL; AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); AclResult aclresult; ListCell *option; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); /* Extract options from the statement node tree */ foreach(option, stmt->options) { DefElem *defel = (DefElem *) lfirst(option); if (strcmp(defel->defname, "binary") == 0) { if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->binary = intVal(defel->arg); } else if (strcmp(defel->defname, "oids") == 0) { if (cstate->oids) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->oids = intVal(defel->arg); } else if (strcmp(defel->defname, "delimiter") == 0) { if (cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->delim = strVal(defel->arg); } else if (strcmp(defel->defname, "null") == 0) { if (cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->null_print = strVal(defel->arg); } else if (strcmp(defel->defname, "csv") == 0) { if (cstate->csv_mode) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->csv_mode = intVal(defel->arg); } else if (strcmp(defel->defname, "header") == 0) { if (cstate->header_line) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->header_line = intVal(defel->arg); } else if (strcmp(defel->defname, "quote") == 0) { if (cstate->quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->quote = strVal(defel->arg); } else if (strcmp(defel->defname, "escape") == 0) { if (cstate->escape) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->escape = strVal(defel->arg); } else if (strcmp(defel->defname, "force_quote") == 0) { if (force_quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); force_quote = (List *) defel->arg; } else if (strcmp(defel->defname, "force_notnull") == 0) { if (force_notnull) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); force_notnull = (List *) defel->arg; } else elog(ERROR, "option \"%s\" not recognized", defel->defname); } /* Check for incompatible options */ if (cstate->binary && cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DELIMITER in BINARY mode"))); if (cstate->binary && cstate->csv_mode) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify CSV in BINARY mode"))); if (cstate->binary && cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); /* Set defaults for omitted options */ if (!cstate->delim) cstate->delim = cstate->csv_mode ? "," : "\t"; if (!cstate->null_print) cstate->null_print = cstate->csv_mode ? "" : "\\N"; cstate->null_print_len = strlen(cstate->null_print); if (cstate->csv_mode) { if (!cstate->quote) cstate->quote = "\""; if (!cstate->escape) cstate->escape = cstate->quote; } /* Only single-character delimiter strings are supported. */ if (strlen(cstate->delim) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single character"))); /* Check header */ if (!cstate->csv_mode && cstate->header_line) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY HEADER available only in CSV mode"))); /* Check quote */ if (!cstate->csv_mode && cstate->quote != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote available only in CSV mode"))); if (cstate->csv_mode && strlen(cstate->quote) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote must be a single character"))); /* Check escape */ if (!cstate->csv_mode && cstate->escape != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape available only in CSV mode"))); if (cstate->csv_mode && strlen(cstate->escape) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape must be a single character"))); /* Check force_quote */ if (!cstate->csv_mode && force_quote != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote available only in CSV mode"))); if (force_quote != NIL && is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote only available using COPY TO"))); /* Check force_notnull */ if (!cstate->csv_mode && force_notnull != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null available only in CSV mode"))); if (force_notnull != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null only available using COPY FROM"))); /* Don't allow the delimiter to appear in the null string. */ if (strchr(cstate->null_print, cstate->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must not appear in the NULL specification"))); /* Don't allow the CSV quote char to appear in the null string. */ if (cstate->csv_mode && strchr(cstate->null_print, cstate->quote[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); /* Open and lock the relation, using the appropriate lock type. */ cstate->rel = heap_openrv(relation, (is_from ? RowExclusiveLock : AccessShareLock)); /* check read-only transaction */ if (XactReadOnly && is_from && !isTempNamespace(RelationGetNamespace(cstate->rel))) ereport(ERROR, (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), errmsg("transaction is read-only"))); /* Check permissions. */ aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel), GetUserId(), required_access); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_CLASS, RelationGetRelationName(cstate->rel)); if (!pipe && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to COPY to or from a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); /* Don't allow COPY w/ OIDs to or from a table without them */ if (cstate->oids && !cstate->rel->rd_rel->relhasoids) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(cstate->rel)))); /* Generate or convert list of attributes to process */ cstate->attnumlist = CopyGetAttnums(cstate->rel, attnamelist); /* Convert FORCE QUOTE name list to column numbers, check validity */ if (force_quote) { TupleDesc tupDesc = RelationGetDescr(cstate->rel); Form_pg_attribute *attr = tupDesc->attrs; ListCell *cur; cstate->force_quote_atts = CopyGetAttnums(cstate->rel, force_quote); foreach(cur, cstate->force_quote_atts) { int attnum = lfirst_int(cur); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE QUOTE column \"%s\" not referenced by COPY", NameStr(attr[attnum - 1]->attname)))); } } /* Convert FORCE NOT NULL name list to column numbers, check validity */ if (force_notnull) { TupleDesc tupDesc = RelationGetDescr(cstate->rel); Form_pg_attribute *attr = tupDesc->attrs; ListCell *cur; cstate->force_notnull_atts = CopyGetAttnums(cstate->rel, force_notnull);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -