copy.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,233 行 · 第 1/4 页
C
2,233 行
/*------------------------------------------------------------------------- * * copy.c * Implements the COPY utility command. * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.213.2.1 2004/01/18 02:15:57 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include <errno.h>#include <unistd.h>#include <sys/stat.h>#include <netinet/in.h>#include <arpa/inet.h>#include "access/genam.h"#include "access/heapam.h"#include "access/printtup.h"#include "catalog/catname.h"#include "catalog/index.h"#include "catalog/namespace.h"#include "catalog/pg_index.h"#include "catalog/pg_shadow.h"#include "catalog/pg_type.h"#include "commands/copy.h"#include "commands/trigger.h"#include "executor/executor.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"#include "mb/pg_wchar.h"#include "miscadmin.h"#include "nodes/makefuncs.h"#include "parser/parse_coerce.h"#include "parser/parse_relation.h"#include "rewrite/rewriteHandler.h"#include "tcop/pquery.h"#include "tcop/tcopprot.h"#include "utils/acl.h"#include "utils/builtins.h"#include "utils/relcache.h"#include "utils/lsyscache.h"#include "utils/syscache.h"#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))#define OCTVALUE(c) ((c) - '0')/* * Represents the different source/dest cases we need to worry about at * the bottom level */typedef enum CopyDest{ COPY_FILE, /* to/from file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE /* to/from frontend (3.0 protocol) */} CopyDest;/* * State indicator showing what stopped CopyReadAttribute() */typedef enum CopyReadResult{ NORMAL_ATTR, END_OF_LINE} CopyReadResult;/* * Represents the end-of-line terminator type of the input */typedef enum EolType{ EOL_UNKNOWN, EOL_NL, EOL_CR, EOL_CRNL} EolType;static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";/* * Static communication variables ... pretty grotty, but COPY has * never been reentrant... */static CopyDest copy_dest;static FILE *copy_file; /* used if copy_dest == COPY_FILE */static StringInfo copy_msgbuf; /* used if copy_dest == COPY_NEW_FE */static bool fe_eof; /* true if detected end of copy data */static EolType eol_type; /* EOL type of input */static int client_encoding; /* remote side's character encoding */static int server_encoding; /* local encoding *//* these are just for error messages, see copy_in_error_callback */static bool copy_binary; /* is it a binary copy? */static const char *copy_relname; /* table name for error messages */static int copy_lineno; /* line number for error messages */static const char *copy_attname; /* current att for error messages *//* * These static variables are used to avoid incurring overhead for each * attribute processed. attribute_buf is reused on each CopyReadAttribute * call to hold the string being read in. Under normal use it will soon * grow to a suitable size, and then we will avoid palloc/pfree overhead * for subsequent attributes. Note that CopyReadAttribute returns a pointer * to attribute_buf's data buffer! */static StringInfoData attribute_buf;/* * Similarly, line_buf holds the whole input line being processed (its * cursor field points to the next character to be read by CopyReadAttribute). * The input cycle is first to read the whole line into line_buf, convert it * to server encoding, and then extract individual attribute fields into * attribute_buf. (We used to have CopyReadAttribute read the input source * directly, but that caused a lot of encoding issues and unnecessary logic * complexity.) */static StringInfoData line_buf;static bool line_buf_converted;/* non-export function prototypes */static void CopyTo(Relation rel, List *attnumlist, bool binary, bool oids, char *delim, char *null_print);static void CopyFrom(Relation rel, List *attnumlist, bool binary, bool oids, char *delim, char *null_print);static bool CopyReadLine(void);static char *CopyReadAttribute(const char *delim, const char *null_print, CopyReadResult *result, bool *isnull);static Datum CopyReadBinaryAttribute(int column_no, FmgrInfo *flinfo, Oid typelem, bool *isnull);static void CopyAttributeOut(char *string, char *delim);static List *CopyGetAttnums(Relation rel, List *attnamelist);static void limit_printout_length(StringInfo buf);/* Internal communications functions */static void SendCopyBegin(bool binary, int natts);static void ReceiveCopyBegin(bool binary, int natts);static void SendCopyEnd(bool binary);static void CopySendData(void *databuf, int datasize);static void CopySendString(const char *str);static void CopySendChar(char c);static void CopySendEndOfRow(bool binary);static void CopyGetData(void *databuf, int datasize);static int CopyGetChar(void);#define CopyGetEof() (fe_eof)static int CopyPeekChar(void);static void CopyDonePeek(int c, bool pickup);static void CopySendInt32(int32 val);static int32 CopyGetInt32(void);static void CopySendInt16(int16 val);static int16 CopyGetInt16(void);/* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. */static voidSendCopyBegin(bool binary, int natts){ if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int16 format = (binary ? 1 : 0); int i; pq_beginmessage(&buf, 'H'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); copy_dest = COPY_NEW_FE; copy_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { /* old way */ if (binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('H'); /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); copy_dest = COPY_OLD_FE; } else { /* very old way */ if (binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('B'); /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); copy_dest = COPY_OLD_FE; }}static voidReceiveCopyBegin(bool binary, int natts){ if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int16 format = (binary ? 1 : 0); int i; pq_beginmessage(&buf, 'G'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); copy_dest = COPY_NEW_FE; copy_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { /* old way */ if (binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); copy_dest = COPY_OLD_FE; } else { /* very old way */ if (binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('D'); copy_dest = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ pq_flush();}static voidSendCopyEnd(bool binary){ if (copy_dest == COPY_NEW_FE) { if (binary) { /* Need to flush out file trailer word */ CopySendEndOfRow(true); } else { /* Shouldn't have any unsent data */ Assert(copy_msgbuf->len == 0); } /* Send Copy Done message */ pq_putemptymessage('c'); } else { /* The FE/BE protocol uses \n as newline for all platforms */ CopySendData("\\.\n", 3); pq_endcopyout(false); }}/*---------- * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters * CopySendEndOfRow does the appropriate thing at end of each data row * * NB: no data conversion is applied by these functions *---------- */static voidCopySendData(void *databuf, int datasize){ switch (copy_dest) { case COPY_FILE: fwrite(databuf, datasize, 1, copy_file); if (ferror(copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); break; case COPY_OLD_FE: if (pq_putbytes((char *) databuf, datasize)) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection lost during COPY to stdout"))); } break; case COPY_NEW_FE: appendBinaryStringInfo(copy_msgbuf, (char *) databuf, datasize); break; }}static voidCopySendString(const char *str){ CopySendData((void *) str, strlen(str));}static voidCopySendChar(char c){ CopySendData(&c, 1);}static voidCopySendEndOfRow(bool binary){ switch (copy_dest) { case COPY_FILE: if (!binary) { /* Default line termination depends on platform */#ifndef WIN32 CopySendChar('\n');#else CopySendString("\r\n");#endif } break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!binary) CopySendChar('\n'); break; case COPY_NEW_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!binary) CopySendChar('\n'); /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', copy_msgbuf->data, copy_msgbuf->len); /* Reset copy_msgbuf to empty */ copy_msgbuf->len = 0; copy_msgbuf->data[0] = '\0'; break; }}/* * CopyGetData reads data from the source (file or frontend) * CopyGetChar does the same for single characters * * CopyGetEof checks if EOF was detected by previous Get operation. * * Note: when copying from the frontend, we expect a proper EOF mark per * protocol; if the frontend simply drops the connection, we raise error. * It seems unwise to allow the COPY IN to complete normally in that case. * * NB: no data conversion is applied by these functions */static voidCopyGetData(void *databuf, int datasize){ switch (copy_dest) { case COPY_FILE: fread(databuf, datasize, 1, copy_file); if (feof(copy_file)) fe_eof = true; break; case COPY_OLD_FE: if (pq_getbytes((char *) databuf, datasize)) { /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); } break; case COPY_NEW_FE: while (datasize > 0 && !fe_eof) { int avail; while (copy_msgbuf->cursor >= copy_msgbuf->len) { /* Try to receive another message */ int mtype; readmessage: mtype = pq_getbyte(); if (mtype == EOF) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); if (pq_getmessage(copy_msgbuf, 0)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); switch (mtype) { case 'd': /* CopyData */ break; case 'c': /* CopyDone */ /* COPY IN correctly terminated by frontend */ fe_eof = true; return; case 'f': /* CopyFail */ ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", pq_getmsgstring(copy_msgbuf)))); break; case 'H': /* Flush */ case 'S': /* Sync */ /* * Ignore Flush/Sync for the convenience of * client libraries (such as libpq) that may * send those without noticing that the command * they just sent was COPY. */ goto readmessage; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type 0x%02X during COPY from stdin", mtype))); break; } } avail = copy_msgbuf->len - copy_msgbuf->cursor; if (avail > datasize) avail = datasize; pq_copymsgbytes(copy_msgbuf, databuf, avail); databuf = (void *) ((char *) databuf + avail); datasize -= avail; } break; }}static intCopyGetChar(void){ int ch; switch (copy_dest) { case COPY_FILE: ch = getc(copy_file); break; case COPY_OLD_FE: ch = pq_getbyte(); if (ch == EOF) { /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); } break; case COPY_NEW_FE: { unsigned char cc; CopyGetData(&cc, 1); if (fe_eof) ch = EOF; else ch = cc; break; } default: ch = EOF; break; } if (ch == EOF) fe_eof = true; return ch;}/* * CopyPeekChar reads a byte in "peekable" mode. * * after each call to CopyPeekChar, a call to CopyDonePeek _must_ * follow, unless EOF was returned. * * CopyDonePeek will either take the peeked char off the stream * (if pickup is true) or leave it on the stream (if pickup is false). */static intCopyPeekChar(void){ int ch; switch (copy_dest) { case COPY_FILE: ch = getc(copy_file); break; case COPY_OLD_FE: ch = pq_peekbyte(); if (ch == EOF) { /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); } break; case COPY_NEW_FE: { unsigned char cc; CopyGetData(&cc, 1); if (fe_eof) ch = EOF; else ch = cc; break; } default: ch = EOF; break; } if (ch == EOF) fe_eof = true; return ch;}static voidCopyDonePeek(int c, bool pickup){ if (fe_eof) return; /* can't unget an EOF */ switch (copy_dest) { case COPY_FILE: if (!pickup) { /* We don't want to pick it up - so put it back in there */ ungetc(c, copy_file); } /* If we wanted to pick it up, it's already done */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?