📄 copy.c
字号:
foreach(cur, cstate->force_notnull_atts) { int attnum = lfirst_int(cur); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY", NameStr(attr[attnum - 1]->attname)))); } } /* Set up variables to avoid per-attribute overhead. */ initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; /* * Set up encoding conversion info. Even if the client and server * encodings are the same, we must apply pg_client_to_server() to * validate data in multibyte encodings. */ cstate->client_encoding = pg_get_client_encoding(); cstate->need_transcoding = (cstate->client_encoding != GetDatabaseEncoding() || pg_database_encoding_max_length() > 1); cstate->client_only_encoding = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ if (is_from) { /* copy from file to database */ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) { if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", RelationGetRelationName(cstate->rel)))); else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to sequence \"%s\"", RelationGetRelationName(cstate->rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } if (pipe) { if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); else cstate->copy_file = stdin; } else { struct stat st; cstate->copy_file = AllocateFile(filename, PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for reading: %m", filename))); fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) { FreeFile(cstate->copy_file); ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } } CopyFrom(cstate); } else { /* copy from database to file */ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) { if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from view \"%s\"", RelationGetRelationName(cstate->rel)))); else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from sequence \"%s\"", RelationGetRelationName(cstate->rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } if (pipe) { if (whereToSendOutput == DestRemote) cstate->fe_copy = true; else cstate->copy_file = stdout; } else { mode_t oumask; /* Pre-existing umask value */ struct stat st; /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... */ if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask((mode_t) 022); cstate->copy_file = AllocateFile(filename, PG_BINARY_W); umask(oumask); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for writing: %m", filename))); fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) { FreeFile(cstate->copy_file); ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } } DoCopyTo(cstate); } if (!pipe) { /* we assume only the write case could fail here */ if (FreeFile(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", filename))); } /* * Close the relation. If reading, we can release the AccessShareLock we * got; if writing, we should hold the lock until end of transaction to * ensure that updates will be committed before lock is released. */ heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); /* Clean up storage (probably not really necessary) */ pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); pfree(cstate->raw_buf); pfree(cstate);}/* * This intermediate routine just exists to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */static voidDoCopyTo(CopyState cstate){ PG_TRY(); { if (cstate->fe_copy) SendCopyBegin(cstate); CopyTo(cstate); if (cstate->fe_copy) SendCopyEnd(cstate); } PG_CATCH(); { /* * Make sure we turn off old-style COPY OUT mode upon error. It is * okay to do this in all cases, since it does nothing if the mode is * not on. */ pq_endcopyout(true); PG_RE_THROW(); } PG_END_TRY();}/* * Copy from relation TO file. */static voidCopyTo(CopyState cstate){ HeapTuple tuple; TupleDesc tupDesc; HeapScanDesc scandesc; int num_phys_attrs; int attr_count; Form_pg_attribute *attr; FmgrInfo *out_functions; bool *force_quote; char *string; char *null_print_client; ListCell *cur; MemoryContext oldcontext; MemoryContext mycontext; tupDesc = cstate->rel->rd_att; attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); null_print_client = cstate->null_print; /* default */ /* Get info about the columns we need to process. */ out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); force_quote = (bool *) palloc(num_phys_attrs * sizeof(bool)); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Oid out_func_oid; bool isvarlena; if (cstate->binary) getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &isvarlena); else getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &isvarlena); fmgr_info(out_func_oid, &out_functions[attnum - 1]); if (list_member_int(cstate->force_quote_atts, attnum)) force_quote[attnum - 1] = true; else force_quote[attnum - 1] = false; } /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside * datatype output routines, and should be faster than retail pfree's * anyway. (We don't need a whole econtext as CopyFrom does.) */ mycontext = AllocSetContextCreate(CurrentMemoryContext, "COPY TO", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); if (cstate->binary) { /* Generate header for a binary copy */ int32 tmp; /* Signature */ CopySendData(cstate, (char *) BinarySignature, 11); /* Flags field */ tmp = 0; if (cstate->oids) tmp |= (1 << 16); CopySendInt32(cstate, tmp); /* No header extension */ tmp = 0; CopySendInt32(cstate, tmp); } else { /* * For non-binary copy, we need to convert null_print to client * encoding, because it will be sent directly with CopySendString. */ if (cstate->need_transcoding) null_print_client = pg_server_to_client(cstate->null_print, cstate->null_print_len); /* if a header has been requested send the line */ if (cstate->header_line) { bool hdr_delim = false; foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); char *colname; if (hdr_delim) CopySendChar(cstate, cstate->delim[0]); hdr_delim = true; colname = NameStr(attr[attnum - 1]->attname); CopyAttributeOutCSV(cstate, colname, false, list_length(cstate->attnumlist) == 1); } CopySendEndOfRow(cstate); } } scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL); while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { bool need_delim = false; CHECK_FOR_INTERRUPTS(); MemoryContextReset(mycontext); oldcontext = MemoryContextSwitchTo(mycontext); if (cstate->binary) { /* Binary per-tuple header */ CopySendInt16(cstate, attr_count); /* Send OID if wanted --- note attr_count doesn't include it */ if (cstate->oids) { Oid oid = HeapTupleGetOid(tuple); /* Hack --- assume Oid is same size as int32 */ CopySendInt32(cstate, sizeof(int32)); CopySendInt32(cstate, oid); } } else { /* Text format has no per-tuple header, but send OID if wanted */ /* Assume digits don't need any quoting or encoding conversion */ if (cstate->oids) { string = DatumGetCString(DirectFunctionCall1(oidout, ObjectIdGetDatum(HeapTupleGetOid(tuple)))); CopySendString(cstate, string); need_delim = true; } } foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Datum value; bool isnull; value = heap_getattr(tuple, attnum, tupDesc, &isnull); if (!cstate->binary) { if (need_delim) CopySendChar(cstate, cstate->delim[0]); need_delim = true; } if (isnull) { if (!cstate->binary) CopySendString(cstate, null_print_client); else CopySendInt32(cstate, -1); } else { if (!cstate->binary) { string = DatumGetCString(FunctionCall1(&out_functions[attnum - 1], value)); if (cstate->csv_mode) CopyAttributeOutCSV(cstate, string, force_quote[attnum - 1], list_length(cstate->attnumlist) == 1); else CopyAttributeOutText(cstate, string); } else { bytea *outputbytes; outputbytes = DatumGetByteaP(FunctionCall1(&out_functions[attnum - 1], value)); /* We assume the result will not have been toasted */ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); CopySendData(cstate, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); } } } CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); } heap_endscan(scandesc); if (cstate->binary) { /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); } MemoryContextDelete(mycontext); pfree(out_functions); pfree(force_quote);}/* * error context callback for COPY FROM */static voidcopy_in_error_callback(void *arg){ CopyState cstate = (CopyState) arg; if (cstate->binary) { /* can't usefully display the data */ if (cstate->cur_attname) errcontext("COPY %s, line %d, column %s", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname); else errcontext("COPY %s, line %d", cstate->cur_relname, cstate->cur_lineno); } else { if (cstate->cur_attname && cstate->cur_attval) { /* error is relevant to a particular column */ char *attval; attval = limit_printout_length(cstate->cur_attval); errcontext("COPY %s, line %d, column %s: \"%s\"", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname, attval); pfree(attval); } else { /* error is relevant to a particular line */ if (cstate->line_buf_converted || !cstate->need_transcoding) { char *lineval;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -