📄 copy.c
字号:
/*------------------------------------------------------------------------- * * copy.c * Implements the COPY utility command. * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.254.2.4 2006/05/21 20:05:48 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include <ctype.h>#include <unistd.h>#include <sys/stat.h>#include <netinet/in.h>#include <arpa/inet.h>#include "access/genam.h"#include "access/heapam.h"#include "access/printtup.h"#include "catalog/index.h"#include "catalog/namespace.h"#include "catalog/pg_index.h"#include "catalog/pg_type.h"#include "commands/copy.h"#include "commands/trigger.h"#include "executor/executor.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"#include "mb/pg_wchar.h"#include "miscadmin.h"#include "nodes/makefuncs.h"#include "parser/parse_coerce.h"#include "parser/parse_relation.h"#include "rewrite/rewriteHandler.h"#include "storage/fd.h"#include "tcop/pquery.h"#include "tcop/tcopprot.h"#include "utils/acl.h"#include "utils/builtins.h"#include "utils/lsyscache.h"#include "utils/memutils.h"#include "utils/relcache.h"#include "utils/syscache.h"#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))#define OCTVALUE(c) ((c) - '0')/* * Represents the different source/dest cases we need to worry about at * the bottom level */typedef enum CopyDest{ COPY_FILE, /* to/from file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE /* to/from frontend (3.0 protocol) */} CopyDest;/* * Represents the end-of-line terminator type of the input */typedef enum EolType{ EOL_UNKNOWN, EOL_NL, EOL_CR, EOL_CRNL} EolType;/* * This struct contains all the state variables used throughout a COPY * operation. For simplicity, we use the same struct for all variants * of COPY, even though some fields are used in only some cases. * * A word about encoding considerations: encodings that are only supported on * the client side are those where multibyte characters may have second or * later bytes with the high bit not set. When scanning data in such an * encoding to look for a match to a single-byte (ie ASCII) character, * we must use the full pg_encoding_mblen() machinery to skip over * multibyte characters, else we might find a false match to a trailing * byte. In supported server encodings, there is no possibility of * a false match, and it's faster to make useless comparisons to trailing * bytes than it is to invoke pg_encoding_mblen() to skip over them. * client_only_encoding is TRUE when we have to do it the hard way. */typedef struct CopyStateData{ /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used if copy_dest == COPY_NEW_FE */ bool fe_copy; /* true for all FE copy dests */ bool fe_eof; /* true if detected end of copy data */ EolType eol_type; /* EOL type of input */ int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool client_only_encoding; /* encoding not valid on server? */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ List *attnumlist; /* integer list of attnums to copy */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool csv_mode; /* Comma Separated Value format? */ bool header_line; /* CSV header line? */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ char *delim; /* column delimiter (must be 1 byte) */ char *quote; /* CSV quote char (must be 1 byte) */ char *escape; /* CSV escape char (must be 1 byte) */ List *force_quote_atts; /* integer list of attnums to FQ */ List *force_notnull_atts; /* integer list of attnums to FNN */ /* these are just for error messages, see copy_in_error_callback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ /* * These variables are used to reduce overhead in textual COPY FROM. * * attribute_buf holds the separated, de-escaped text for each field of * the current line. The CopyReadAttributes functions return arrays of * pointers into this buffer. We avoid palloc/pfree overhead by re-using * the buffer on each cycle. */ StringInfoData attribute_buf; /* * Similarly, line_buf holds the whole input line being processed. The * input cycle is first to read the whole line into line_buf, convert it * to server encoding there, and then extract the individual attribute * fields into attribute_buf. line_buf is preserved unmodified so that we * can display it in error messages if appropriate. */ StringInfoData line_buf; bool line_buf_converted; /* converted to server encoding? */ /* * Finally, raw_buf holds raw data read from the data source (file or * client connection). CopyReadLine parses this data sufficiently to * locate line boundaries, then transfers the data to line_buf and * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. */#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */} CopyStateData;typedef CopyStateData *CopyState;static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";/* non-export function prototypes */static void DoCopyTo(CopyState cstate);static void CopyTo(CopyState cstate);static void CopyFrom(CopyState cstate);static bool CopyReadLine(CopyState cstate);static bool CopyReadLineText(CopyState cstate);static bool CopyReadLineCSV(CopyState cstate);static int CopyReadAttributesText(CopyState cstate, int maxfields, char **fieldvals);static int CopyReadAttributesCSV(CopyState cstate, int maxfields, char **fieldvals);static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull);static void CopyAttributeOutText(CopyState cstate, char *server_string);static void CopyAttributeOutCSV(CopyState cstate, char *server_string, bool use_quote, bool single_attr);static List *CopyGetAttnums(Relation rel, List *attnamelist);static char *limit_printout_length(const char *str);/* Low-level communications functions */static void SendCopyBegin(CopyState cstate);static void ReceiveCopyBegin(CopyState cstate);static void SendCopyEnd(CopyState cstate);static void CopySendData(CopyState cstate, void *databuf, int datasize);static void CopySendString(CopyState cstate, const char *str);static void CopySendChar(CopyState cstate, char c);static void CopySendEndOfRow(CopyState cstate);static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread);static void CopySendInt32(CopyState cstate, int32 val);static bool CopyGetInt32(CopyState cstate, int32 *val);static void CopySendInt16(CopyState cstate, int16 val);static bool CopyGetInt16(CopyState cstate, int16 *val);/* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. */static voidSendCopyBegin(CopyState cstate){ if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'H'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { /* old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('H'); /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; } else { /* very old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('B'); /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; }}static voidReceiveCopyBegin(CopyState cstate){ if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'G'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint(&buf, natts, 2); for (i = 0; i < natts; i++) pq_sendint(&buf, format, 2); /* per-column formats */ pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) { /* old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); cstate->copy_dest = COPY_OLD_FE; } else { /* very old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('D'); cstate->copy_dest = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ pq_flush();}static voidSendCopyEnd(CopyState cstate){ if (cstate->copy_dest == COPY_NEW_FE) { if (cstate->binary) { /* Need to flush out file trailer word */ CopySendEndOfRow(cstate); } else { /* Shouldn't have any unsent data */ Assert(cstate->fe_msgbuf->len == 0); } /* Send Copy Done message */ pq_putemptymessage('c'); } else { /* The FE/BE protocol uses \n as newline for all platforms */ CopySendData(cstate, "\\.\n", 3); pq_endcopyout(false); }}/*---------- * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters * CopySendEndOfRow does the appropriate thing at end of each data row * * NB: no data conversion is applied by these functions *---------- */static voidCopySendData(CopyState cstate, void *databuf, int datasize){ switch (cstate->copy_dest) { case COPY_FILE: fwrite(databuf, datasize, 1, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); break; case COPY_OLD_FE: if (pq_putbytes((char *) databuf, datasize)) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection lost during COPY to stdout"))); } break; case COPY_NEW_FE: appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize); break; }}static voidCopySendString(CopyState cstate, const char *str){ CopySendData(cstate, (void *) str, strlen(str));}static voidCopySendChar(CopyState cstate, char c){ CopySendData(cstate, &c, 1);}static voidCopySendEndOfRow(CopyState cstate){ switch (cstate->copy_dest) { case COPY_FILE: if (!cstate->binary) { /* Default line termination depends on platform */#ifndef WIN32 CopySendChar(cstate, '\n');#else CopySendString(cstate, "\r\n");#endif } break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) CopySendChar(cstate, '\n'); break; case COPY_NEW_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) CopySendChar(cstate, '\n'); /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', cstate->fe_msgbuf->data, cstate->fe_msgbuf->len); /* Reset fe_msgbuf to empty */ cstate->fe_msgbuf->len = 0; cstate->fe_msgbuf->data[0] = '\0'; break; }}/* * CopyGetData reads data from the source (file or frontend) * * We attempt to read at least minread, and at most maxread, bytes from * the source. The actual number of bytes read is returned; if this is * less than minread, EOF was detected. * * Note: when copying from the frontend, we expect a proper EOF mark per * protocol; if the frontend simply drops the connection, we raise error. * It seems unwise to allow the COPY IN to complete normally in that case. * * NB: no data conversion is applied here. */static intCopyGetData(CopyState cstate, void *databuf, int minread, int maxread){ int bytesread = 0; switch (cstate->copy_dest) { case COPY_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from COPY file: %m"))); break; case COPY_OLD_FE: /* * We cannot read more than minread bytes (which in practice is 1) * because old protocol doesn't have any clear way of separating * the COPY stream from following data. This is slow, but not any * slower than the code path was originally, and we don't care * much anymore about the performance of old protocol. */ if (pq_getbytes((char *) databuf, minread)) { /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection"))); } bytesread = minread; break; case COPY_NEW_FE: while (maxread > 0 && bytesread < minread && !cstate->fe_eof) { int avail; while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) { /* Try to receive another message */ int mtype;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -