copy.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,233 行 · 第 1/4 页
C
2,233 行
MemoryContextSwitchTo(oldcontext); } heap_endscan(scandesc); if (binary) { /* Generate trailer for a binary copy */ CopySendInt16(-1); } MemoryContextDelete(mycontext); pfree(out_functions); pfree(elements); pfree(isvarlena);}/* * error context callback for COPY FROM */static voidcopy_in_error_callback(void *arg){ if (copy_binary) { /* can't usefully display the data */ if (copy_attname) errcontext("COPY %s, line %d, column %s", copy_relname, copy_lineno, copy_attname); else errcontext("COPY %s, line %d", copy_relname, copy_lineno); } else { if (copy_attname) { /* error is relevant to a particular column */ limit_printout_length(&attribute_buf); errcontext("COPY %s, line %d, column %s: \"%s\"", copy_relname, copy_lineno, copy_attname, attribute_buf.data); } else { /* error is relevant to a particular line */ if (!line_buf_converted) { /* didn't convert the encoding yet... */ line_buf_converted = true; if (client_encoding != server_encoding) { char *cvt; cvt = (char *) pg_client_to_server((unsigned char *) line_buf.data, line_buf.len); if (cvt != line_buf.data) { /* transfer converted data back to line_buf */ line_buf.len = 0; line_buf.data[0] = '\0'; appendBinaryStringInfo(&line_buf, cvt, strlen(cvt)); } } } limit_printout_length(&line_buf); errcontext("COPY %s, line %d: \"%s\"", copy_relname, copy_lineno, line_buf.data); } }}/* * Make sure we don't print an unreasonable amount of COPY data in a message. * * It would seem a lot easier to just use the sprintf "precision" limit to * truncate the string. However, some versions of glibc have a bug/misfeature * that vsnprintf will always fail (return -1) if it is asked to truncate * a string that contains invalid byte sequences for the current encoding. * So, do our own truncation. We assume we can alter the StringInfo buffer * holding the input data. */static voidlimit_printout_length(StringInfo buf){#define MAX_COPY_DATA_DISPLAY 100 int len; /* Fast path if definitely okay */ if (buf->len <= MAX_COPY_DATA_DISPLAY) return; /* Apply encoding-dependent truncation */ len = pg_mbcliplen(buf->data, buf->len, MAX_COPY_DATA_DISPLAY); if (buf->len <= len) return; /* no need to truncate */ buf->len = len; buf->data[len] = '\0'; /* Add "..." to show we truncated the input */ appendStringInfoString(buf, "...");}/* * Copy FROM file to relation. */static voidCopyFrom(Relation rel, List *attnumlist, bool binary, bool oids, char *delim, char *null_print){ HeapTuple tuple; TupleDesc tupDesc; Form_pg_attribute *attr; AttrNumber num_phys_attrs, attr_count, num_defaults; FmgrInfo *in_functions; FmgrInfo oid_in_function; Oid *elements; Oid oid_in_element; ExprState **constraintexprs; bool hasConstraints = false; int i; List *cur; Oid in_func_oid; Datum *values; char *nulls; bool done = false; bool isnull; ResultRelInfo *resultRelInfo; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ TupleTable tupleTable; TupleTableSlot *slot; bool file_has_oids; int *defmap; ExprState **defexprs; /* array of default att expressions */ ExprContext *econtext; /* used for ExecEvalExpr for default atts */ MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; tupDesc = RelationGetDescr(rel); attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; attr_count = length(attnumlist); num_defaults = 0; /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of * code here that basically duplicated execUtils.c ...) */ resultRelInfo = makeNode(ResultRelInfo); resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ resultRelInfo->ri_RelationDesc = rel; resultRelInfo->ri_TrigDesc = CopyTriggerDesc(rel->trigdesc); ExecOpenIndices(resultRelInfo); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; estate->es_result_relation_info = resultRelInfo; /* Set up a dummy tuple table too */ tupleTable = ExecCreateTupleTable(1); slot = ExecAllocTableSlot(tupleTable); ExecSetSlotDescriptor(slot, tupDesc, false); econtext = GetPerTupleExprContext(estate); /* * Pick up the required catalog information for each attribute in the * relation, including the input function, the element type (to pass * to the input function), and info about defaults and constraints. * (Which input function we use depends on text/binary format choice.) * +1's here are to avoid palloc(0) in a zero-column table. */ in_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo)); elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid)); defmap = (int *) palloc((num_phys_attrs + 1) * sizeof(int)); defexprs = (ExprState **) palloc((num_phys_attrs + 1) * sizeof(ExprState *)); constraintexprs = (ExprState **) palloc0((num_phys_attrs + 1) * sizeof(ExprState *)); for (i = 0; i < num_phys_attrs; i++) { /* We don't need info for dropped attributes */ if (attr[i]->attisdropped) continue; /* Fetch the input function and typelem info */ if (binary) getTypeBinaryInputInfo(attr[i]->atttypid, &in_func_oid, &elements[i]); else getTypeInputInfo(attr[i]->atttypid, &in_func_oid, &elements[i]); fmgr_info(in_func_oid, &in_functions[i]); /* Get default info if needed */ if (!intMember(i + 1, attnumlist)) { /* attribute is NOT to be copied from input */ /* use default value if one exists */ Node *defexpr = build_column_default(rel, i + 1); if (defexpr != NULL) { defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr, estate); defmap[num_defaults] = i; num_defaults++; } } /* If it's a domain type, set up to check domain constraints */ if (get_typtype(attr[i]->atttypid) == 'd') { Param *prm; Node *node; /* * Easiest way to do this is to use parse_coerce.c to set up * an expression that checks the constraints. (At present, * the expression might contain a length-coercion-function * call and/or CoerceToDomain nodes.) The bottom of the * expression is a Param node so that we can fill in the * actual datum during the data input loop. */ prm = makeNode(Param); prm->paramkind = PARAM_EXEC; prm->paramid = 0; prm->paramtype = getBaseType(attr[i]->atttypid); node = coerce_to_domain((Node *) prm, prm->paramtype, attr[i]->atttypid, COERCE_IMPLICIT_CAST); constraintexprs[i] = ExecPrepareExpr((Expr *) node, estate); hasConstraints = true; } } /* * Check BEFORE STATEMENT insertion triggers. It's debateable whether * we should do this for COPY, since it's not really an "INSERT" * statement as such. However, executing these triggers maintains * consistency with the EACH ROW triggers that we already fire on * COPY. */ ExecBSInsertTriggers(estate, resultRelInfo); if (!binary) { file_has_oids = oids; /* must rely on user to tell us this... */ } else { /* Read and verify binary header */ char readSig[11]; int32 tmp; /* Signature */ CopyGetData(readSig, 11); if (CopyGetEof() || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("COPY file signature not recognized"))); /* Flags field */ tmp = CopyGetInt32(); if (CopyGetEof()) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); file_has_oids = (tmp & (1 << 16)) != 0; tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unrecognized critical flags in COPY file header"))); /* Header extension length */ tmp = CopyGetInt32(); if (CopyGetEof() || tmp < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing length)"))); /* Skip extension header, if present */ while (tmp-- > 0) { CopyGetData(readSig, 1); if (CopyGetEof()) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); } } if (file_has_oids && binary) { getTypeBinaryInputInfo(OIDOID, &in_func_oid, &oid_in_element); fmgr_info(in_func_oid, &oid_in_function); } values = (Datum *) palloc((num_phys_attrs + 1) * sizeof(Datum)); nulls = (char *) palloc((num_phys_attrs + 1) * sizeof(char)); /* Make room for a PARAM_EXEC value for domain constraint checks */ if (hasConstraints) econtext->ecxt_param_exec_vals = (ParamExecData *) palloc0(sizeof(ParamExecData)); /* Initialize static variables */ fe_eof = false; eol_type = EOL_UNKNOWN; copy_binary = binary; copy_relname = RelationGetRelationName(rel); copy_lineno = 0; copy_attname = NULL; /* Set up callback to identify error line number */ errcontext.callback = copy_in_error_callback; errcontext.arg = NULL; errcontext.previous = error_context_stack; error_context_stack = &errcontext; while (!done) { bool skip_tuple; Oid loaded_oid = InvalidOid; CHECK_FOR_INTERRUPTS(); copy_lineno++; /* Reset the per-tuple exprcontext */ ResetPerTupleExprContext(estate); /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, 'n', num_phys_attrs * sizeof(char)); if (!binary) { CopyReadResult result = NORMAL_ATTR; char *string; /* Actually read the line into memory here */ done = CopyReadLine(); /* * EOF at start of line means we're done. If we see EOF * after some characters, we act as though it was newline * followed by EOF, ie, process the line and then exit loop * on next iteration. */ if (done && line_buf.len == 0) break; if (file_has_oids) { string = CopyReadAttribute(delim, null_print, &result, &isnull); if (isnull) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("null OID in COPY data"))); else { copy_attname = "oid"; loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(string))); if (loaded_oid == InvalidOid) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); copy_attname = NULL; } } /* * Loop to read the user attributes on the line. */ foreach(cur, attnumlist) { int attnum = lfirsti(cur); int m = attnum - 1; /* * If prior attr on this line was ended by newline, * complain. */ if (result != NORMAL_ATTR) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\"", NameStr(attr[m]->attname)))); string = CopyReadAttribute(delim, null_print, &result, &isnull); if (isnull) { /* we read an SQL NULL, no need to do anything */ } else { copy_attname = NameStr(attr[m]->attname); values[m] = FunctionCall3(&in_functions[m], CStringGetDatum(string), ObjectIdGetDatum(elements[m]), Int32GetDatum(attr[m]->atttypmod)); nulls[m] = ' '; copy_attname = NULL; } } /* * Complain if there are more fields on the input line. * * Special case: if we're reading a zero-column table, we won't * yet have called CopyReadAttribute() at all; so no error if * line is empty. */ if (result == NORMAL_ATTR && line_buf.len != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); } else { /* binary */ int16 fld_count; fld_count = CopyGetInt16(); if (CopyGetEof() || fld_count == -1) { done = true; break; } if (fld_count != attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("row field count is %d, expected %d", (int) fld_count, attr_count))); if (file_has_oids) { copy_attname = "oid"; loaded_oid = DatumGetObjectId(CopyReadBinaryAttribute(0, &oid_in_function, oid_in_element, &isnull)); if (isnull || loaded_oid == InvalidOid) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); copy_attname = NULL; } i = 0; foreach(cur, attnumlist) { int attnum = lfirsti(cur); int m = attnum - 1; copy_attname = NameStr(attr[m]->attname); i++; values[m] = CopyReadBinaryAttribute(i, &in_functions[m], elements[m], &isnull); nulls[m] = isnull ? 'n' : ' '; copy_attname = NULL; } } /* * Now compute and insert any defaults available for the columns * not provided by the input data. Anything not processed here or * above will remain NULL. */ for (i = 0; i < num_defaults; i++) { values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, &isnull, NULL); if (!isnull) nulls[defmap[i]] = ' '; } /* * Next apply any domain constraints */ if (hasConstraints) { ParamExecData *prmdata = &econtext->ecxt_param_exec_vals[0]; for (i = 0; i < num_phys_attrs; i++) { ExprState *exprstate = constraintexprs[i]; if (exprstate == NULL) continue; /* no constraint for this attr */ /* Insert current row's value into the Param value */ prmdata->value = values[i]; prmdata->isnull = (nulls[i] == 'n'); /* * Execute the constraint expression. Allow the * expression to replace the value (consider e.g. a * timestamp precision restriction). */ values[i] = ExecEvalExpr(exprstate, econtext, &isnull, NULL); nulls[i] = isnull ? 'n' : ' '; } } /* * And now we can form the input tuple. */ tuple = heap_formtuple(tupDesc, values, nulls); if (oids && file_has_oids) HeapTupleSetOid(tuple, loaded_oid); /* * Triggers and stuff need to be invoked in query context. */ MemoryContextSwitchTo(oldcontext); skip_tuple = false; /* BEFORE ROW INSERT Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0) { HeapTuple newtuple; newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple); if (newtuple == NULL) /* "do nothing" */ skip_tuple = true; else if (newtuple != tuple) /* modified by Trigger(s) */ { heap_freetuple(tuple); tuple = newtuple; } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?