📄 copy.c
字号:
lineval = limit_printout_length(cstate->line_buf.data); errcontext("COPY %s, line %d: \"%s\"", cstate->cur_relname, cstate->cur_lineno, lineval); pfree(lineval); } else { /* * Here, the line buffer is still in a foreign encoding, and * indeed it's quite likely that the error is precisely a * failure to do encoding conversion (ie, bad data). We dare * not try to convert it, and at present there's no way to * regurgitate it without conversion. So we have to punt and * just report the line number. */ errcontext("COPY %s, line %d", cstate->cur_relname, cstate->cur_lineno); } } }}/* * Make sure we don't print an unreasonable amount of COPY data in a message. * * It would seem a lot easier to just use the sprintf "precision" limit to * truncate the string. However, some versions of glibc have a bug/misfeature * that vsnprintf will always fail (return -1) if it is asked to truncate * a string that contains invalid byte sequences for the current encoding. * So, do our own truncation. We return a pstrdup'd copy of the input. */static char *limit_printout_length(const char *str){#define MAX_COPY_DATA_DISPLAY 100 int slen = strlen(str); int len; char *res; /* Fast path if definitely okay */ if (slen <= MAX_COPY_DATA_DISPLAY) return pstrdup(str); /* Apply encoding-dependent truncation */ len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); /* * Truncate, and add "..." to show we truncated the input. */ res = (char *) palloc(len + 4); memcpy(res, str, len); strcpy(res + len, "..."); return res;}/* * Copy FROM file to relation. */static voidCopyFrom(CopyState cstate){ HeapTuple tuple; TupleDesc tupDesc; Form_pg_attribute *attr; AttrNumber num_phys_attrs, attr_count, num_defaults; FmgrInfo *in_functions; FmgrInfo oid_in_function; Oid *typioparams; Oid oid_typioparam; ExprState **constraintexprs; bool *force_notnull; bool hasConstraints = false; int attnum; int i; Oid in_func_oid; Datum *values; char *nulls; int nfields; char **field_strings; bool done = false; bool isnull; ResultRelInfo *resultRelInfo; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ TupleTableSlot *slot; bool file_has_oids; int *defmap; ExprState **defexprs; /* array of default att expressions */ ExprContext *econtext; /* used for ExecEvalExpr for default atts */ MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; tupDesc = RelationGetDescr(cstate->rel); attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); num_defaults = 0; /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code * here that basically duplicated execUtils.c ...) */ resultRelInfo = makeNode(ResultRelInfo); resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ resultRelInfo->ri_RelationDesc = cstate->rel; resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc); if (resultRelInfo->ri_TrigDesc) resultRelInfo->ri_TrigFunctions = (FmgrInfo *) palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo)); resultRelInfo->ri_TrigInstrument = NULL; ExecOpenIndices(resultRelInfo); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; estate->es_result_relation_info = resultRelInfo; /* Set up a tuple slot too */ slot = MakeSingleTupleTableSlot(tupDesc); econtext = GetPerTupleExprContext(estate); /* * Pick up the required catalog information for each attribute in the * relation, including the input function, the element type (to pass to * the input function), and info about defaults and constraints. (Which * input function we use depends on text/binary format choice.) */ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); constraintexprs = (ExprState **) palloc0(num_phys_attrs * sizeof(ExprState *)); force_notnull = (bool *) palloc(num_phys_attrs * sizeof(bool)); for (attnum = 1; attnum <= num_phys_attrs; attnum++) { /* We don't need info for dropped attributes */ if (attr[attnum - 1]->attisdropped) continue; /* Fetch the input function and typioparam info */ if (cstate->binary) getTypeBinaryInputInfo(attr[attnum - 1]->atttypid, &in_func_oid, &typioparams[attnum - 1]); else getTypeInputInfo(attr[attnum - 1]->atttypid, &in_func_oid, &typioparams[attnum - 1]); fmgr_info(in_func_oid, &in_functions[attnum - 1]); if (list_member_int(cstate->force_notnull_atts, attnum)) force_notnull[attnum - 1] = true; else force_notnull[attnum - 1] = false; /* Get default info if needed */ if (!list_member_int(cstate->attnumlist, attnum)) { /* attribute is NOT to be copied from input */ /* use default value if one exists */ Node *defexpr = build_column_default(cstate->rel, attnum); if (defexpr != NULL) { defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr, estate); defmap[num_defaults] = attnum - 1; num_defaults++; } } /* If it's a domain type, set up to check domain constraints */ if (get_typtype(attr[attnum - 1]->atttypid) == 'd') { Param *prm; Node *node; /* * Easiest way to do this is to use parse_coerce.c to set up an * expression that checks the constraints. (At present, the * expression might contain a length-coercion-function call and/or * CoerceToDomain nodes.) The bottom of the expression is a Param * node so that we can fill in the actual datum during the data * input loop. */ prm = makeNode(Param); prm->paramkind = PARAM_EXEC; prm->paramid = 0; prm->paramtype = getBaseType(attr[attnum - 1]->atttypid); node = coerce_to_domain((Node *) prm, prm->paramtype, attr[attnum - 1]->atttypid, COERCE_IMPLICIT_CAST, false, false); constraintexprs[attnum - 1] = ExecPrepareExpr((Expr *) node, estate); hasConstraints = true; } } /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); /* * Check BEFORE STATEMENT insertion triggers. It's debateable whether we * should do this for COPY, since it's not really an "INSERT" statement as * such. However, executing these triggers maintains consistency with the * EACH ROW triggers that we already fire on COPY. */ ExecBSInsertTriggers(estate, resultRelInfo); if (!cstate->binary) file_has_oids = cstate->oids; /* must rely on user to tell us... */ else { /* Read and verify binary header */ char readSig[11]; int32 tmp; /* Signature */ if (CopyGetData(cstate, readSig, 11, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("COPY file signature not recognized"))); /* Flags field */ if (!CopyGetInt32(cstate, &tmp)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); file_has_oids = (tmp & (1 << 16)) != 0; tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unrecognized critical flags in COPY file header"))); /* Header extension length */ if (!CopyGetInt32(cstate, &tmp) || tmp < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing length)"))); /* Skip extension header, if present */ while (tmp-- > 0) { if (CopyGetData(cstate, readSig, 1, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); } } if (file_has_oids && cstate->binary) { getTypeBinaryInputInfo(OIDOID, &in_func_oid, &oid_typioparam); fmgr_info(in_func_oid, &oid_in_function); } values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); nulls = (char *) palloc(num_phys_attrs * sizeof(char)); /* create workspace for CopyReadAttributes results */ nfields = file_has_oids ? (attr_count + 1) : attr_count; field_strings = (char **) palloc(nfields * sizeof(char *)); /* Make room for a PARAM_EXEC value for domain constraint checks */ if (hasConstraints) econtext->ecxt_param_exec_vals = (ParamExecData *) palloc0(sizeof(ParamExecData)); /* Initialize state variables */ cstate->fe_eof = false; cstate->eol_type = EOL_UNKNOWN; cstate->cur_relname = RelationGetRelationName(cstate->rel); cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; /* Set up callback to identify error line number */ errcontext.callback = copy_in_error_callback; errcontext.arg = (void *) cstate; errcontext.previous = error_context_stack; error_context_stack = &errcontext; /* on input just throw the header line away */ if (cstate->header_line) { cstate->cur_lineno++; done = CopyReadLine(cstate); } while (!done) { bool skip_tuple; Oid loaded_oid = InvalidOid; CHECK_FOR_INTERRUPTS(); cstate->cur_lineno++; /* Reset the per-tuple exprcontext */ ResetPerTupleExprContext(estate); /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, 'n', num_phys_attrs * sizeof(char)); if (!cstate->binary) { ListCell *cur; int fldct; int fieldno; char *string; /* Actually read the line into memory here */ done = CopyReadLine(cstate); /* * EOF at start of line means we're done. If we see EOF after * some characters, we act as though it was newline followed by * EOF, ie, process the line and then exit loop on next iteration. */ if (done && cstate->line_buf.len == 0) break; /* Parse the line into de-escaped field values */ if (cstate->csv_mode) fldct = CopyReadAttributesCSV(cstate, nfields, field_strings); else fldct = CopyReadAttributesText(cstate, nfields, field_strings); fieldno = 0; /* Read the OID field if present */ if (file_has_oids) { if (fieldno >= fldct) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for OID column"))); string = field_strings[fieldno++]; if (string == NULL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("null OID in COPY data"))); else { cstate->cur_attname = "oid"; cstate->cur_attval = string; loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(string))); if (loaded_oid == InvalidOid) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); cstate->cur_attname = NULL; cstate->cur_attval = NULL; } } /* Loop to read the user attributes on the line. */ foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); int m = attnum - 1; if (fieldno >= fldct) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\"", NameStr(attr[m]->attname)))); string = field_strings[fieldno++]; if (cstate->csv_mode && string == NULL && force_notnull[m]) { /* Go ahead and read the NULL string */ string = cstate->null_print; } /* If we read an SQL NULL, no need to do anything */ if (string != NULL) { cstate->cur_attname = NameStr(attr[m]->attname); cstate->cur_attval = string; values[m] = FunctionCall3(&in_functions[m], CStringGetDatum(string), ObjectIdGetDatum(typioparams[m]), Int32GetDatum(attr[m]->atttypmod)); nulls[m] = ' '; cstate->cur_attname = NULL; cstate->cur_attval = NULL; } } Assert(fieldno == nfields); } else { /* binary */ int16 fld_count; ListCell *cur; if (!CopyGetInt16(cstate, &fld_count) || fld_count == -1) { done = true; break; } if (fld_count != attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("row field count is %d, expected %d", (int) fld_count, attr_count))); if (file_has_oids) { cstate->cur_attname = "oid"; loaded_oid = DatumGetObjectId(CopyReadBinaryAttribute(cstate, 0, &oid_in_function, oid_typioparam, -1, &isnull)); if (isnull || loaded_oid == InvalidOid) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); cstate->cur_attname = NULL; } i = 0; foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); int m = attnum - 1; cstate->cur_attname = NameStr(attr[m]->attname); i++; values[m] = CopyReadBinaryAttribute(cstate, i, &in_functions[m], typioparams[m], attr[m]->atttypmod, &isnull); nulls[m] = isnull ? 'n' : ' '; cstate->cur_attname = NULL; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -