copy.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,233 行 · 第 1/4 页

C
2,233
字号
			break;		case COPY_OLD_FE:			if (pickup)			{				/* We want to pick it up */				(void) pq_getbyte();			}			/*			 * If we didn't want to pick it up, just leave it where it			 * sits			 */			break;		case COPY_NEW_FE:			if (!pickup)			{				/* We don't want to pick it up - so put it back in there */				copy_msgbuf->cursor--;			}			/* If we wanted to pick it up, it's already done */			break;	}}/* * These functions do apply some data conversion *//* * CopySendInt32 sends an int32 in network byte order */static voidCopySendInt32(int32 val){	uint32		buf;	buf = htonl((uint32) val);	CopySendData(&buf, sizeof(buf));}/* * CopyGetInt32 reads an int32 that appears in network byte order */static int32CopyGetInt32(void){	uint32		buf;	CopyGetData(&buf, sizeof(buf));	return (int32) ntohl(buf);}/* * CopySendInt16 sends an int16 in network byte order */static voidCopySendInt16(int16 val){	uint16		buf;	buf = htons((uint16) val);	CopySendData(&buf, sizeof(buf));}/* * CopyGetInt16 reads an int16 that appears in network byte order */static int16CopyGetInt16(void){	uint16		buf;	CopyGetData(&buf, sizeof(buf));	return (int16) ntohs(buf);}/* *	 DoCopy executes the SQL COPY statement. * * Either unload or reload contents of table <relation>, depending on <from>. * (<from> = TRUE means we are inserting into the table.) * * If <pipe> is false, transfer is between the table and the file named * <filename>.	Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * * Iff <binary>, unload or reload in the binary format, as opposed to the * more wasteful but more robust and portable text format. * * Iff <oids>, unload or reload the format that includes OID information. * On input, we accept OIDs whether or not the table has an OID column, * but silently drop them if it does not.  On output, we report an error * if the user asks for OIDs in a table that has none (not providing an * OID column might seem friendlier, but could seriously confuse programs). * * If in the text format, delimit columns with delimiter <delim> and print * NULL values as <null_print>. * * When loading in the text format from an input stream (as opposed to * a file), recognize a "." on a line by itself as EOF. Also recognize * a stream EOF.  When unloading in the text format to an output stream, * write a "." on a line by itself at the end of the data. * * Do not allow a Postgres user without superuser privilege to read from * or write to a file. * * Do not allow the copy if user doesn't have proper permission to access * the table. */voidDoCopy(const CopyStmt *stmt){	RangeVar   *relation = stmt->relation;	char	   *filename = stmt->filename;	bool		is_from = stmt->is_from;	bool		pipe = (stmt->filename == NULL);	List	   *option;	List	   *attnamelist = stmt->attlist;	List	   *attnumlist;	bool		binary = false;	bool		oids = false;	char	   *delim = NULL;	char	   *null_print = NULL;	Relation	rel;	AclMode		required_access = (is_from ? ACL_INSERT : ACL_SELECT);	AclResult	aclresult;	/* Extract options from the statement node tree */	foreach(option, stmt->options)	{		DefElem    *defel = (DefElem *) lfirst(option);		if (strcmp(defel->defname, "binary") == 0)		{			if (binary)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			binary = intVal(defel->arg);		}		else if (strcmp(defel->defname, "oids") == 0)		{			if (oids)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			oids = intVal(defel->arg);		}		else if (strcmp(defel->defname, "delimiter") == 0)		{			if (delim)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			delim = strVal(defel->arg);		}		else if (strcmp(defel->defname, "null") == 0)		{			if (null_print)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			null_print = strVal(defel->arg);		}		else			elog(ERROR, "option \"%s\" not recognized",				 defel->defname);	}	if (binary && delim)		ereport(ERROR,				(errcode(ERRCODE_SYNTAX_ERROR),				 errmsg("cannot specify DELIMITER in BINARY mode")));	if (binary && null_print)		ereport(ERROR,				(errcode(ERRCODE_SYNTAX_ERROR),				 errmsg("cannot specify NULL in BINARY mode")));	/* Set defaults */	if (!delim)		delim = "\t";	if (!null_print)		null_print = "\\N";	/*	 * Open and lock the relation, using the appropriate lock type.	 */	rel = heap_openrv(relation, (is_from ? RowExclusiveLock : AccessShareLock));	/* check read-only transaction */	if (XactReadOnly && !is_from && !isTempNamespace(RelationGetNamespace(rel)))		ereport(ERROR,				(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),				 errmsg("transaction is read-only")));	/* Check permissions. */	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),								  required_access);	if (aclresult != ACLCHECK_OK)		aclcheck_error(aclresult, ACL_KIND_CLASS,					   RelationGetRelationName(rel));	if (!pipe && !superuser())		ereport(ERROR,				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),				 errmsg("must be superuser to COPY to or from a file"),				 errhint("Anyone can COPY to stdout or from stdin. "					   "psql's \\copy command also works for anyone.")));	/*	 * Presently, only single-character delimiter strings are supported.	 */	if (strlen(delim) != 1)		ereport(ERROR,				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),				 errmsg("COPY delimiter must be a single character")));	/*	 * Don't allow COPY w/ OIDs to or from a table without them	 */	if (oids && !rel->rd_rel->relhasoids)		ereport(ERROR,				(errcode(ERRCODE_UNDEFINED_COLUMN),				 errmsg("table \"%s\" does not have OIDs",						RelationGetRelationName(rel))));	/*	 * Generate or convert list of attributes to process	 */	attnumlist = CopyGetAttnums(rel, attnamelist);	/*	 * Set up variables to avoid per-attribute overhead.	 */	initStringInfo(&attribute_buf);	initStringInfo(&line_buf);	line_buf_converted = false;	client_encoding = pg_get_client_encoding();	server_encoding = GetDatabaseEncoding();	copy_dest = COPY_FILE;		/* default */	copy_file = NULL;	copy_msgbuf = NULL;	fe_eof = false;	if (is_from)	{							/* copy from file to database */		if (rel->rd_rel->relkind != RELKIND_RELATION)		{			if (rel->rd_rel->relkind == RELKIND_VIEW)				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),						 errmsg("cannot copy to view \"%s\"",								RelationGetRelationName(rel))));			else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),						 errmsg("cannot copy to sequence \"%s\"",								RelationGetRelationName(rel))));			else				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),					   errmsg("cannot copy to non-table relation \"%s\"",							  RelationGetRelationName(rel))));		}		if (pipe)		{			if (IsUnderPostmaster)				ReceiveCopyBegin(binary, length(attnumlist));			else				copy_file = stdin;		}		else		{			struct stat st;			copy_file = AllocateFile(filename, PG_BINARY_R);			if (copy_file == NULL)				ereport(ERROR,						(errcode_for_file_access(),					 errmsg("could not open file \"%s\" for reading: %m",							filename)));			fstat(fileno(copy_file), &st);			if (S_ISDIR(st.st_mode))			{				FreeFile(copy_file);				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),						 errmsg("\"%s\" is a directory", filename)));			}		}		CopyFrom(rel, attnumlist, binary, oids, delim, null_print);	}	else	{							/* copy from database to file */		if (rel->rd_rel->relkind != RELKIND_RELATION)		{			if (rel->rd_rel->relkind == RELKIND_VIEW)				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),						 errmsg("cannot copy from view \"%s\"",								RelationGetRelationName(rel))));			else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),						 errmsg("cannot copy from sequence \"%s\"",								RelationGetRelationName(rel))));			else				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),					 errmsg("cannot copy from non-table relation \"%s\"",							RelationGetRelationName(rel))));		}		if (pipe)		{			if (IsUnderPostmaster)				SendCopyBegin(binary, length(attnumlist));			else				copy_file = stdout;		}		else		{			mode_t		oumask; /* Pre-existing umask value */			struct stat st;			/*			 * Prevent write to relative path ... too easy to shoot			 * oneself in the foot by overwriting a database file ...			 */			if (!is_absolute_path(filename))				ereport(ERROR,						(errcode(ERRCODE_INVALID_NAME),				  errmsg("relative path not allowed for COPY to file")));			oumask = umask((mode_t) 022);			copy_file = AllocateFile(filename, PG_BINARY_W);			umask(oumask);			if (copy_file == NULL)				ereport(ERROR,						(errcode_for_file_access(),					 errmsg("could not open file \"%s\" for writing: %m",							filename)));			fstat(fileno(copy_file), &st);			if (S_ISDIR(st.st_mode))			{				FreeFile(copy_file);				ereport(ERROR,						(errcode(ERRCODE_WRONG_OBJECT_TYPE),						 errmsg("\"%s\" is a directory", filename)));			}		}		CopyTo(rel, attnumlist, binary, oids, delim, null_print);	}	if (!pipe)		FreeFile(copy_file);	else if (IsUnderPostmaster && !is_from)		SendCopyEnd(binary);	pfree(attribute_buf.data);	pfree(line_buf.data);	/*	 * Close the relation.	If reading, we can release the AccessShareLock	 * we got; if writing, we should hold the lock until end of	 * transaction to ensure that updates will be committed before lock is	 * released.	 */	heap_close(rel, (is_from ? NoLock : AccessShareLock));}/* * Copy from relation TO file. */static voidCopyTo(Relation rel, List *attnumlist, bool binary, bool oids,	   char *delim, char *null_print){	HeapTuple	tuple;	TupleDesc	tupDesc;	HeapScanDesc scandesc;	int			num_phys_attrs;	int			attr_count;	Form_pg_attribute *attr;	FmgrInfo   *out_functions;	Oid		   *elements;	bool	   *isvarlena;	char	   *string;	Snapshot	mySnapshot;	List	   *cur;	MemoryContext oldcontext;	MemoryContext mycontext;	tupDesc = rel->rd_att;	attr = tupDesc->attrs;	num_phys_attrs = tupDesc->natts;	attr_count = length(attnumlist);	/*	 * Get info about the columns we need to process.	 *	 * +1's here are to avoid palloc(0) in a zero-column table.	 */	out_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo));	elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid));	isvarlena = (bool *) palloc((num_phys_attrs + 1) * sizeof(bool));	foreach(cur, attnumlist)	{		int			attnum = lfirsti(cur);		Oid			out_func_oid;		if (binary)			getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,									&out_func_oid, &elements[attnum - 1],									&isvarlena[attnum - 1]);		else			getTypeOutputInfo(attr[attnum - 1]->atttypid,							  &out_func_oid, &elements[attnum - 1],							  &isvarlena[attnum - 1]);		fmgr_info(out_func_oid, &out_functions[attnum - 1]);	}	/*	 * Create a temporary memory context that we can reset once per row to	 * recover palloc'd memory.  This avoids any problems with leaks	 * inside datatype output routines, and should be faster than retail	 * pfree's anyway.  (We don't need a whole econtext as CopyFrom does.)	 */	mycontext = AllocSetContextCreate(CurrentMemoryContext,									  "COPY TO",									  ALLOCSET_DEFAULT_MINSIZE,									  ALLOCSET_DEFAULT_INITSIZE,									  ALLOCSET_DEFAULT_MAXSIZE);	if (binary)	{		/* Generate header for a binary copy */		int32		tmp;		/* Signature */		CopySendData((char *) BinarySignature, 11);		/* Flags field */		tmp = 0;		if (oids)			tmp |= (1 << 16);		CopySendInt32(tmp);		/* No header extension */		tmp = 0;		CopySendInt32(tmp);	}	else	{		/*		 * For non-binary copy, we need to convert null_print to client		 * encoding, because it will be sent directly with CopySendString.		 */		if (server_encoding != client_encoding)			null_print = (char *)				pg_server_to_client((unsigned char *) null_print,									strlen(null_print));	}	mySnapshot = CopyQuerySnapshot();	scandesc = heap_beginscan(rel, mySnapshot, 0, NULL);	while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)	{		bool		need_delim = false;		CHECK_FOR_INTERRUPTS();		MemoryContextReset(mycontext);		oldcontext = MemoryContextSwitchTo(mycontext);		if (binary)		{			/* Binary per-tuple header */			CopySendInt16(attr_count);			/* Send OID if wanted --- note attr_count doesn't include it */			if (oids)			{				Oid			oid = HeapTupleGetOid(tuple);				/* Hack --- assume Oid is same size as int32 */				CopySendInt32(sizeof(int32));				CopySendInt32(oid);			}		}		else		{			/* Text format has no per-tuple header, but send OID if wanted */			if (oids)			{				string = DatumGetCString(DirectFunctionCall1(oidout,							  ObjectIdGetDatum(HeapTupleGetOid(tuple))));				CopySendString(string);				need_delim = true;			}		}		foreach(cur, attnumlist)		{			int			attnum = lfirsti(cur);			Datum		value;			bool		isnull;			value = heap_getattr(tuple, attnum, tupDesc, &isnull);			if (!binary)			{				if (need_delim)					CopySendChar(delim[0]);				need_delim = true;			}			if (isnull)			{				if (!binary)					CopySendString(null_print); /* null indicator */				else					CopySendInt32(-1);	/* null marker */			}			else			{				if (!binary)				{					string = DatumGetCString(FunctionCall3(&out_functions[attnum - 1],														   value,								  ObjectIdGetDatum(elements[attnum - 1]),							Int32GetDatum(attr[attnum - 1]->atttypmod)));					CopyAttributeOut(string, delim);				}				else				{					bytea	   *outputbytes;					outputbytes = DatumGetByteaP(FunctionCall2(&out_functions[attnum - 1],															   value,								ObjectIdGetDatum(elements[attnum - 1])));					/* We assume the result will not have been toasted */					CopySendInt32(VARSIZE(outputbytes) - VARHDRSZ);					CopySendData(VARDATA(outputbytes),								 VARSIZE(outputbytes) - VARHDRSZ);				}			}		}		CopySendEndOfRow(binary);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?