copy.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,233 行 · 第 1/4 页

C
2,233
字号
/*------------------------------------------------------------------------- * * copy.c *		Implements the COPY utility command. * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION *	  $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.213.2.1 2004/01/18 02:15:57 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include <errno.h>#include <unistd.h>#include <sys/stat.h>#include <netinet/in.h>#include <arpa/inet.h>#include "access/genam.h"#include "access/heapam.h"#include "access/printtup.h"#include "catalog/catname.h"#include "catalog/index.h"#include "catalog/namespace.h"#include "catalog/pg_index.h"#include "catalog/pg_shadow.h"#include "catalog/pg_type.h"#include "commands/copy.h"#include "commands/trigger.h"#include "executor/executor.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"#include "mb/pg_wchar.h"#include "miscadmin.h"#include "nodes/makefuncs.h"#include "parser/parse_coerce.h"#include "parser/parse_relation.h"#include "rewrite/rewriteHandler.h"#include "tcop/pquery.h"#include "tcop/tcopprot.h"#include "utils/acl.h"#include "utils/builtins.h"#include "utils/relcache.h"#include "utils/lsyscache.h"#include "utils/syscache.h"#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))#define OCTVALUE(c) ((c) - '0')/* * Represents the different source/dest cases we need to worry about at * the bottom level */typedef enum CopyDest{	COPY_FILE,					/* to/from file */	COPY_OLD_FE,				/* to/from frontend (2.0 protocol) */	COPY_NEW_FE					/* to/from frontend (3.0 protocol) */} CopyDest;/* * State indicator showing what stopped CopyReadAttribute() */typedef enum CopyReadResult{	NORMAL_ATTR,	END_OF_LINE} CopyReadResult;/* *	Represents the end-of-line terminator type of the input */typedef enum EolType{	EOL_UNKNOWN,	EOL_NL,	EOL_CR,	EOL_CRNL} EolType;static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";/* * Static communication variables ... pretty grotty, but COPY has * never been reentrant... */static CopyDest copy_dest;static FILE *copy_file;			/* used if copy_dest == COPY_FILE */static StringInfo copy_msgbuf;	/* used if copy_dest == COPY_NEW_FE */static bool fe_eof;				/* true if detected end of copy data */static EolType eol_type;		/* EOL type of input */static int	client_encoding;	/* remote side's character encoding */static int	server_encoding;	/* local encoding *//* these are just for error messages, see copy_in_error_callback */static bool copy_binary;		/* is it a binary copy? */static const char *copy_relname;	/* table name for error messages */static int	copy_lineno;		/* line number for error messages */static const char *copy_attname;	/* current att for error messages *//* * These static variables are used to avoid incurring overhead for each * attribute processed.  attribute_buf is reused on each CopyReadAttribute * call to hold the string being read in.  Under normal use it will soon * grow to a suitable size, and then we will avoid palloc/pfree overhead * for subsequent attributes.  Note that CopyReadAttribute returns a pointer * to attribute_buf's data buffer! */static StringInfoData attribute_buf;/* * Similarly, line_buf holds the whole input line being processed (its * cursor field points to the next character to be read by CopyReadAttribute). * The input cycle is first to read the whole line into line_buf, convert it * to server encoding, and then extract individual attribute fields into * attribute_buf.  (We used to have CopyReadAttribute read the input source * directly, but that caused a lot of encoding issues and unnecessary logic * complexity.) */static StringInfoData line_buf;static bool line_buf_converted;/* non-export function prototypes */static void CopyTo(Relation rel, List *attnumlist, bool binary, bool oids,	   char *delim, char *null_print);static void CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids,		 char *delim, char *null_print);static bool CopyReadLine(void);static char *CopyReadAttribute(const char *delim, const char *null_print,							   CopyReadResult *result, bool *isnull);static Datum CopyReadBinaryAttribute(int column_no, FmgrInfo *flinfo,						Oid typelem, bool *isnull);static void CopyAttributeOut(char *string, char *delim);static List *CopyGetAttnums(Relation rel, List *attnamelist);static void limit_printout_length(StringInfo buf);/* Internal communications functions */static void SendCopyBegin(bool binary, int natts);static void ReceiveCopyBegin(bool binary, int natts);static void SendCopyEnd(bool binary);static void CopySendData(void *databuf, int datasize);static void CopySendString(const char *str);static void CopySendChar(char c);static void CopySendEndOfRow(bool binary);static void CopyGetData(void *databuf, int datasize);static int	CopyGetChar(void);#define CopyGetEof()  (fe_eof)static int	CopyPeekChar(void);static void CopyDonePeek(int c, bool pickup);static void CopySendInt32(int32 val);static int32 CopyGetInt32(void);static void CopySendInt16(int16 val);static int16 CopyGetInt16(void);/* * Send copy start/stop messages for frontend copies.  These have changed * in past protocol redesigns. */static voidSendCopyBegin(bool binary, int natts){	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)	{		/* new way */		StringInfoData buf;		int16		format = (binary ? 1 : 0);		int			i;		pq_beginmessage(&buf, 'H');		pq_sendbyte(&buf, format);		/* overall format */		pq_sendint(&buf, natts, 2);		for (i = 0; i < natts; i++)			pq_sendint(&buf, format, 2);		/* per-column formats */		pq_endmessage(&buf);		copy_dest = COPY_NEW_FE;		copy_msgbuf = makeStringInfo();	}	else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)	{		/* old way */		if (binary)			ereport(ERROR,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),					 errmsg("COPY BINARY is not supported to stdout or from stdin")));		pq_putemptymessage('H');		/* grottiness needed for old COPY OUT protocol */		pq_startcopyout();		copy_dest = COPY_OLD_FE;	}	else	{		/* very old way */		if (binary)			ereport(ERROR,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),					 errmsg("COPY BINARY is not supported to stdout or from stdin")));		pq_putemptymessage('B');		/* grottiness needed for old COPY OUT protocol */		pq_startcopyout();		copy_dest = COPY_OLD_FE;	}}static voidReceiveCopyBegin(bool binary, int natts){	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)	{		/* new way */		StringInfoData buf;		int16		format = (binary ? 1 : 0);		int			i;		pq_beginmessage(&buf, 'G');		pq_sendbyte(&buf, format);		/* overall format */		pq_sendint(&buf, natts, 2);		for (i = 0; i < natts; i++)			pq_sendint(&buf, format, 2);		/* per-column formats */		pq_endmessage(&buf);		copy_dest = COPY_NEW_FE;		copy_msgbuf = makeStringInfo();	}	else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)	{		/* old way */		if (binary)			ereport(ERROR,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),					 errmsg("COPY BINARY is not supported to stdout or from stdin")));		pq_putemptymessage('G');		copy_dest = COPY_OLD_FE;	}	else	{		/* very old way */		if (binary)			ereport(ERROR,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),					 errmsg("COPY BINARY is not supported to stdout or from stdin")));		pq_putemptymessage('D');		copy_dest = COPY_OLD_FE;	}	/* We *must* flush here to ensure FE knows it can send. */	pq_flush();}static voidSendCopyEnd(bool binary){	if (copy_dest == COPY_NEW_FE)	{		if (binary)		{			/* Need to flush out file trailer word */			CopySendEndOfRow(true);		}		else		{			/* Shouldn't have any unsent data */			Assert(copy_msgbuf->len == 0);		}		/* Send Copy Done message */		pq_putemptymessage('c');	}	else	{		/* The FE/BE protocol uses \n as newline for all platforms */		CopySendData("\\.\n", 3);		pq_endcopyout(false);	}}/*---------- * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters * CopySendEndOfRow does the appropriate thing at end of each data row * * NB: no data conversion is applied by these functions *---------- */static voidCopySendData(void *databuf, int datasize){	switch (copy_dest)	{		case COPY_FILE:			fwrite(databuf, datasize, 1, copy_file);			if (ferror(copy_file))				ereport(ERROR,						(errcode_for_file_access(),						 errmsg("could not write to COPY file: %m")));			break;		case COPY_OLD_FE:			if (pq_putbytes((char *) databuf, datasize))			{				/* no hope of recovering connection sync, so FATAL */				ereport(FATAL,						(errcode(ERRCODE_CONNECTION_FAILURE),					   errmsg("connection lost during COPY to stdout")));			}			break;		case COPY_NEW_FE:			appendBinaryStringInfo(copy_msgbuf, (char *) databuf, datasize);			break;	}}static voidCopySendString(const char *str){	CopySendData((void *) str, strlen(str));}static voidCopySendChar(char c){	CopySendData(&c, 1);}static voidCopySendEndOfRow(bool binary){	switch (copy_dest)	{		case COPY_FILE:			if (!binary)			{				/* Default line termination depends on platform */#ifndef WIN32				CopySendChar('\n');#else				CopySendString("\r\n");#endif			}			break;		case COPY_OLD_FE:			/* The FE/BE protocol uses \n as newline for all platforms */			if (!binary)				CopySendChar('\n');			break;		case COPY_NEW_FE:			/* The FE/BE protocol uses \n as newline for all platforms */			if (!binary)				CopySendChar('\n');			/* Dump the accumulated row as one CopyData message */			(void) pq_putmessage('d', copy_msgbuf->data, copy_msgbuf->len);			/* Reset copy_msgbuf to empty */			copy_msgbuf->len = 0;			copy_msgbuf->data[0] = '\0';			break;	}}/* * CopyGetData reads data from the source (file or frontend) * CopyGetChar does the same for single characters * * CopyGetEof checks if EOF was detected by previous Get operation. * * Note: when copying from the frontend, we expect a proper EOF mark per * protocol; if the frontend simply drops the connection, we raise error. * It seems unwise to allow the COPY IN to complete normally in that case. * * NB: no data conversion is applied by these functions */static voidCopyGetData(void *databuf, int datasize){	switch (copy_dest)	{		case COPY_FILE:			fread(databuf, datasize, 1, copy_file);			if (feof(copy_file))				fe_eof = true;			break;		case COPY_OLD_FE:			if (pq_getbytes((char *) databuf, datasize))			{				/* Only a \. terminator is legal EOF in old protocol */				ereport(ERROR,						(errcode(ERRCODE_CONNECTION_FAILURE),						 errmsg("unexpected EOF on client connection")));			}			break;		case COPY_NEW_FE:			while (datasize > 0 && !fe_eof)			{				int			avail;				while (copy_msgbuf->cursor >= copy_msgbuf->len)				{					/* Try to receive another message */					int			mtype;				readmessage:					mtype = pq_getbyte();					if (mtype == EOF)						ereport(ERROR,								(errcode(ERRCODE_CONNECTION_FAILURE),						 errmsg("unexpected EOF on client connection")));					if (pq_getmessage(copy_msgbuf, 0))						ereport(ERROR,								(errcode(ERRCODE_CONNECTION_FAILURE),						 errmsg("unexpected EOF on client connection")));					switch (mtype)					{						case 'd':		/* CopyData */							break;						case 'c':		/* CopyDone */							/* COPY IN correctly terminated by frontend */							fe_eof = true;							return;						case 'f':		/* CopyFail */							ereport(ERROR,									(errcode(ERRCODE_QUERY_CANCELED),									 errmsg("COPY from stdin failed: %s",										 pq_getmsgstring(copy_msgbuf))));							break;						case 'H':		/* Flush */						case 'S':		/* Sync */							/*							 * Ignore Flush/Sync for the convenience of							 * client libraries (such as libpq) that may							 * send those without noticing that the command							 * they just sent was COPY.							 */							goto readmessage;						default:							ereport(ERROR,									(errcode(ERRCODE_PROTOCOL_VIOLATION),									 errmsg("unexpected message type 0x%02X during COPY from stdin",											mtype)));							break;					}				}				avail = copy_msgbuf->len - copy_msgbuf->cursor;				if (avail > datasize)					avail = datasize;				pq_copymsgbytes(copy_msgbuf, databuf, avail);				databuf = (void *) ((char *) databuf + avail);				datasize -= avail;			}			break;	}}static intCopyGetChar(void){	int			ch;	switch (copy_dest)	{		case COPY_FILE:			ch = getc(copy_file);			break;		case COPY_OLD_FE:			ch = pq_getbyte();			if (ch == EOF)			{				/* Only a \. terminator is legal EOF in old protocol */				ereport(ERROR,						(errcode(ERRCODE_CONNECTION_FAILURE),						 errmsg("unexpected EOF on client connection")));			}			break;		case COPY_NEW_FE:			{				unsigned char cc;				CopyGetData(&cc, 1);				if (fe_eof)					ch = EOF;				else					ch = cc;				break;			}		default:			ch = EOF;			break;	}	if (ch == EOF)		fe_eof = true;	return ch;}/* * CopyPeekChar reads a byte in "peekable" mode. * * after each call to CopyPeekChar, a call to CopyDonePeek _must_ * follow, unless EOF was returned. * * CopyDonePeek will either take the peeked char off the stream * (if pickup is true) or leave it on the stream (if pickup is false). */static intCopyPeekChar(void){	int			ch;	switch (copy_dest)	{		case COPY_FILE:			ch = getc(copy_file);			break;		case COPY_OLD_FE:			ch = pq_peekbyte();			if (ch == EOF)			{				/* Only a \. terminator is legal EOF in old protocol */				ereport(ERROR,						(errcode(ERRCODE_CONNECTION_FAILURE),						 errmsg("unexpected EOF on client connection")));			}			break;		case COPY_NEW_FE:			{				unsigned char cc;				CopyGetData(&cc, 1);				if (fe_eof)					ch = EOF;				else					ch = cc;				break;			}		default:			ch = EOF;			break;	}	if (ch == EOF)		fe_eof = true;	return ch;}static voidCopyDonePeek(int c, bool pickup){	if (fe_eof)		return;					/* can't unget an EOF */	switch (copy_dest)	{		case COPY_FILE:			if (!pickup)			{				/* We don't want to pick it up - so put it back in there */				ungetc(c, copy_file);			}			/* If we wanted to pick it up, it's already done */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?