copy.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,233 行 · 第 1/4 页

C
2,233
字号
		MemoryContextSwitchTo(oldcontext);	}	heap_endscan(scandesc);	if (binary)	{		/* Generate trailer for a binary copy */		CopySendInt16(-1);	}	MemoryContextDelete(mycontext);	pfree(out_functions);	pfree(elements);	pfree(isvarlena);}/* * error context callback for COPY FROM */static voidcopy_in_error_callback(void *arg){	if (copy_binary)	{		/* can't usefully display the data */		if (copy_attname)			errcontext("COPY %s, line %d, column %s",					   copy_relname, copy_lineno, copy_attname);		else			errcontext("COPY %s, line %d", copy_relname, copy_lineno);	}	else	{		if (copy_attname)		{			/* error is relevant to a particular column */			limit_printout_length(&attribute_buf);			errcontext("COPY %s, line %d, column %s: \"%s\"",					   copy_relname, copy_lineno, copy_attname,					   attribute_buf.data);		}		else		{			/* error is relevant to a particular line */			if (!line_buf_converted)			{				/* didn't convert the encoding yet... */				line_buf_converted = true;				if (client_encoding != server_encoding)				{					char	   *cvt;					cvt = (char *) pg_client_to_server((unsigned char *) line_buf.data,													   line_buf.len);					if (cvt != line_buf.data)					{						/* transfer converted data back to line_buf */						line_buf.len = 0;						line_buf.data[0] = '\0';						appendBinaryStringInfo(&line_buf, cvt, strlen(cvt));					}				}			}			limit_printout_length(&line_buf);			errcontext("COPY %s, line %d: \"%s\"",					   copy_relname, copy_lineno,					   line_buf.data);		}	}}/* * Make sure we don't print an unreasonable amount of COPY data in a message. * * It would seem a lot easier to just use the sprintf "precision" limit to * truncate the string.  However, some versions of glibc have a bug/misfeature * that vsnprintf will always fail (return -1) if it is asked to truncate * a string that contains invalid byte sequences for the current encoding. * So, do our own truncation.  We assume we can alter the StringInfo buffer * holding the input data. */static voidlimit_printout_length(StringInfo buf){#define MAX_COPY_DATA_DISPLAY 100	int len;	/* Fast path if definitely okay */	if (buf->len <= MAX_COPY_DATA_DISPLAY)		return;	/* Apply encoding-dependent truncation */	len = pg_mbcliplen(buf->data, buf->len, MAX_COPY_DATA_DISPLAY);	if (buf->len <= len)		return;					/* no need to truncate */	buf->len = len;	buf->data[len] = '\0';	/* Add "..." to show we truncated the input */	appendStringInfoString(buf, "...");}/* * Copy FROM file to relation. */static voidCopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,		 char *delim, char *null_print){	HeapTuple	tuple;	TupleDesc	tupDesc;	Form_pg_attribute *attr;	AttrNumber	num_phys_attrs,				attr_count,				num_defaults;	FmgrInfo   *in_functions;	FmgrInfo	oid_in_function;	Oid		   *elements;	Oid			oid_in_element;	ExprState **constraintexprs;	bool		hasConstraints = false;	int			i;	List	   *cur;	Oid			in_func_oid;	Datum	   *values;	char	   *nulls;	bool		done = false;	bool		isnull;	ResultRelInfo *resultRelInfo;	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */	TupleTable	tupleTable;	TupleTableSlot *slot;	bool		file_has_oids;	int		   *defmap;	ExprState **defexprs;		/* array of default att expressions */	ExprContext *econtext;		/* used for ExecEvalExpr for default atts */	MemoryContext oldcontext = CurrentMemoryContext;	ErrorContextCallback errcontext;	tupDesc = RelationGetDescr(rel);	attr = tupDesc->attrs;	num_phys_attrs = tupDesc->natts;	attr_count = length(attnumlist);	num_defaults = 0;	/*	 * We need a ResultRelInfo so we can use the regular executor's	 * index-entry-making machinery.  (There used to be a huge amount of	 * code here that basically duplicated execUtils.c ...)	 */	resultRelInfo = makeNode(ResultRelInfo);	resultRelInfo->ri_RangeTableIndex = 1;		/* dummy */	resultRelInfo->ri_RelationDesc = rel;	resultRelInfo->ri_TrigDesc = CopyTriggerDesc(rel->trigdesc);	ExecOpenIndices(resultRelInfo);	estate->es_result_relations = resultRelInfo;	estate->es_num_result_relations = 1;	estate->es_result_relation_info = resultRelInfo;	/* Set up a dummy tuple table too */	tupleTable = ExecCreateTupleTable(1);	slot = ExecAllocTableSlot(tupleTable);	ExecSetSlotDescriptor(slot, tupDesc, false);	econtext = GetPerTupleExprContext(estate);	/*	 * Pick up the required catalog information for each attribute in the	 * relation, including the input function, the element type (to pass	 * to the input function), and info about defaults and constraints.	 * (Which input function we use depends on text/binary format choice.)	 * +1's here are to avoid palloc(0) in a zero-column table.	 */	in_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo));	elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid));	defmap = (int *) palloc((num_phys_attrs + 1) * sizeof(int));	defexprs = (ExprState **) palloc((num_phys_attrs + 1) * sizeof(ExprState *));	constraintexprs = (ExprState **) palloc0((num_phys_attrs + 1) * sizeof(ExprState *));	for (i = 0; i < num_phys_attrs; i++)	{		/* We don't need info for dropped attributes */		if (attr[i]->attisdropped)			continue;		/* Fetch the input function and typelem info */		if (binary)			getTypeBinaryInputInfo(attr[i]->atttypid,								   &in_func_oid, &elements[i]);		else			getTypeInputInfo(attr[i]->atttypid,							 &in_func_oid, &elements[i]);		fmgr_info(in_func_oid, &in_functions[i]);		/* Get default info if needed */		if (!intMember(i + 1, attnumlist))		{			/* attribute is NOT to be copied from input */			/* use default value if one exists */			Node	   *defexpr = build_column_default(rel, i + 1);			if (defexpr != NULL)			{				defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,														 estate);				defmap[num_defaults] = i;				num_defaults++;			}		}		/* If it's a domain type, set up to check domain constraints */		if (get_typtype(attr[i]->atttypid) == 'd')		{			Param	   *prm;			Node	   *node;			/*			 * Easiest way to do this is to use parse_coerce.c to set up			 * an expression that checks the constraints.  (At present,			 * the expression might contain a length-coercion-function			 * call and/or CoerceToDomain nodes.)  The bottom of the			 * expression is a Param node so that we can fill in the			 * actual datum during the data input loop.			 */			prm = makeNode(Param);			prm->paramkind = PARAM_EXEC;			prm->paramid = 0;			prm->paramtype = getBaseType(attr[i]->atttypid);			node = coerce_to_domain((Node *) prm,									prm->paramtype,									attr[i]->atttypid,									COERCE_IMPLICIT_CAST);			constraintexprs[i] = ExecPrepareExpr((Expr *) node,												 estate);			hasConstraints = true;		}	}	/*	 * Check BEFORE STATEMENT insertion triggers. It's debateable whether	 * we should do this for COPY, since it's not really an "INSERT"	 * statement as such. However, executing these triggers maintains	 * consistency with the EACH ROW triggers that we already fire on	 * COPY.	 */	ExecBSInsertTriggers(estate, resultRelInfo);	if (!binary)	{		file_has_oids = oids;	/* must rely on user to tell us this... */	}	else	{		/* Read and verify binary header */		char		readSig[11];		int32		tmp;		/* Signature */		CopyGetData(readSig, 11);		if (CopyGetEof() || memcmp(readSig, BinarySignature, 11) != 0)			ereport(ERROR,					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),					 errmsg("COPY file signature not recognized")));		/* Flags field */		tmp = CopyGetInt32();		if (CopyGetEof())			ereport(ERROR,					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),					 errmsg("invalid COPY file header (missing flags)")));		file_has_oids = (tmp & (1 << 16)) != 0;		tmp &= ~(1 << 16);		if ((tmp >> 16) != 0)			ereport(ERROR,					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),			 errmsg("unrecognized critical flags in COPY file header")));		/* Header extension length */		tmp = CopyGetInt32();		if (CopyGetEof() || tmp < 0)			ereport(ERROR,					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),				   errmsg("invalid COPY file header (missing length)")));		/* Skip extension header, if present */		while (tmp-- > 0)		{			CopyGetData(readSig, 1);			if (CopyGetEof())				ereport(ERROR,						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),					 errmsg("invalid COPY file header (wrong length)")));		}	}	if (file_has_oids && binary)	{		getTypeBinaryInputInfo(OIDOID,							   &in_func_oid, &oid_in_element);		fmgr_info(in_func_oid, &oid_in_function);	}	values = (Datum *) palloc((num_phys_attrs + 1) * sizeof(Datum));	nulls = (char *) palloc((num_phys_attrs + 1) * sizeof(char));	/* Make room for a PARAM_EXEC value for domain constraint checks */	if (hasConstraints)		econtext->ecxt_param_exec_vals = (ParamExecData *)			palloc0(sizeof(ParamExecData));	/* Initialize static variables */	fe_eof = false;	eol_type = EOL_UNKNOWN;	copy_binary = binary;	copy_relname = RelationGetRelationName(rel);	copy_lineno = 0;	copy_attname = NULL;	/* Set up callback to identify error line number */	errcontext.callback = copy_in_error_callback;	errcontext.arg = NULL;	errcontext.previous = error_context_stack;	error_context_stack = &errcontext;	while (!done)	{		bool		skip_tuple;		Oid			loaded_oid = InvalidOid;		CHECK_FOR_INTERRUPTS();		copy_lineno++;		/* Reset the per-tuple exprcontext */		ResetPerTupleExprContext(estate);		/* Switch into its memory context */		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));		/* Initialize all values for row to NULL */		MemSet(values, 0, num_phys_attrs * sizeof(Datum));		MemSet(nulls, 'n', num_phys_attrs * sizeof(char));		if (!binary)		{			CopyReadResult result = NORMAL_ATTR;			char	   *string;			/* Actually read the line into memory here */			done = CopyReadLine();			/*			 * EOF at start of line means we're done.  If we see EOF			 * after some characters, we act as though it was newline			 * followed by EOF, ie, process the line and then exit loop			 * on next iteration.			 */			if (done && line_buf.len == 0)				break;			if (file_has_oids)			{				string = CopyReadAttribute(delim, null_print,										   &result, &isnull);				if (isnull)					ereport(ERROR,							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),							 errmsg("null OID in COPY data")));				else				{					copy_attname = "oid";					loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,											   CStringGetDatum(string)));					if (loaded_oid == InvalidOid)						ereport(ERROR,								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),								 errmsg("invalid OID in COPY data")));					copy_attname = NULL;				}			}			/*			 * Loop to read the user attributes on the line.			 */			foreach(cur, attnumlist)			{				int			attnum = lfirsti(cur);				int			m = attnum - 1;				/*				 * If prior attr on this line was ended by newline,				 * complain.				 */				if (result != NORMAL_ATTR)					ereport(ERROR,							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),							 errmsg("missing data for column \"%s\"",									NameStr(attr[m]->attname))));				string = CopyReadAttribute(delim, null_print,										   &result, &isnull);				if (isnull)				{					/* we read an SQL NULL, no need to do anything */				}				else				{					copy_attname = NameStr(attr[m]->attname);					values[m] = FunctionCall3(&in_functions[m],											  CStringGetDatum(string),										   ObjectIdGetDatum(elements[m]),									  Int32GetDatum(attr[m]->atttypmod));					nulls[m] = ' ';					copy_attname = NULL;				}			}			/*			 * Complain if there are more fields on the input line.			 *			 * Special case: if we're reading a zero-column table, we won't			 * yet have called CopyReadAttribute() at all; so no error if			 * line is empty.			 */			if (result == NORMAL_ATTR && line_buf.len != 0)				ereport(ERROR,						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),						 errmsg("extra data after last expected column")));		}		else		{			/* binary */			int16		fld_count;			fld_count = CopyGetInt16();			if (CopyGetEof() || fld_count == -1)			{				done = true;				break;			}			if (fld_count != attr_count)				ereport(ERROR,						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),						 errmsg("row field count is %d, expected %d",								(int) fld_count, attr_count)));			if (file_has_oids)			{				copy_attname = "oid";				loaded_oid =					DatumGetObjectId(CopyReadBinaryAttribute(0,														&oid_in_function,														  oid_in_element,															 &isnull));				if (isnull || loaded_oid == InvalidOid)					ereport(ERROR,							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),							 errmsg("invalid OID in COPY data")));				copy_attname = NULL;			}			i = 0;			foreach(cur, attnumlist)			{				int			attnum = lfirsti(cur);				int			m = attnum - 1;				copy_attname = NameStr(attr[m]->attname);				i++;				values[m] = CopyReadBinaryAttribute(i,													&in_functions[m],													elements[m],													&isnull);				nulls[m] = isnull ? 'n' : ' ';				copy_attname = NULL;			}		}		/*		 * Now compute and insert any defaults available for the columns		 * not provided by the input data.	Anything not processed here or		 * above will remain NULL.		 */		for (i = 0; i < num_defaults; i++)		{			values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,											 &isnull, NULL);			if (!isnull)				nulls[defmap[i]] = ' ';		}		/*		 * Next apply any domain constraints		 */		if (hasConstraints)		{			ParamExecData *prmdata = &econtext->ecxt_param_exec_vals[0];			for (i = 0; i < num_phys_attrs; i++)			{				ExprState  *exprstate = constraintexprs[i];				if (exprstate == NULL)					continue;	/* no constraint for this attr */				/* Insert current row's value into the Param value */				prmdata->value = values[i];				prmdata->isnull = (nulls[i] == 'n');				/*				 * Execute the constraint expression.  Allow the				 * expression to replace the value (consider e.g. a				 * timestamp precision restriction).				 */				values[i] = ExecEvalExpr(exprstate, econtext,										 &isnull, NULL);				nulls[i] = isnull ? 'n' : ' ';			}		}		/*		 * And now we can form the input tuple.		 */		tuple = heap_formtuple(tupDesc, values, nulls);		if (oids && file_has_oids)			HeapTupleSetOid(tuple, loaded_oid);		/*		 * Triggers and stuff need to be invoked in query context.		 */		MemoryContextSwitchTo(oldcontext);		skip_tuple = false;		/* BEFORE ROW INSERT Triggers */		if (resultRelInfo->ri_TrigDesc &&			resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)		{			HeapTuple	newtuple;			newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);			if (newtuple == NULL)		/* "do nothing" */				skip_tuple = true;			else if (newtuple != tuple) /* modified by Trigger(s) */			{				heap_freetuple(tuple);				tuple = newtuple;			}		}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?