copy.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,233 行 · 第 1/4 页
C
2,233 行
break; case COPY_OLD_FE: if (pickup) { /* We want to pick it up */ (void) pq_getbyte(); } /* * If we didn't want to pick it up, just leave it where it * sits */ break; case COPY_NEW_FE: if (!pickup) { /* We don't want to pick it up - so put it back in there */ copy_msgbuf->cursor--; } /* If we wanted to pick it up, it's already done */ break; }}/* * These functions do apply some data conversion *//* * CopySendInt32 sends an int32 in network byte order */static voidCopySendInt32(int32 val){ uint32 buf; buf = htonl((uint32) val); CopySendData(&buf, sizeof(buf));}/* * CopyGetInt32 reads an int32 that appears in network byte order */static int32CopyGetInt32(void){ uint32 buf; CopyGetData(&buf, sizeof(buf)); return (int32) ntohl(buf);}/* * CopySendInt16 sends an int16 in network byte order */static voidCopySendInt16(int16 val){ uint16 buf; buf = htons((uint16) val); CopySendData(&buf, sizeof(buf));}/* * CopyGetInt16 reads an int16 that appears in network byte order */static int16CopyGetInt16(void){ uint16 buf; CopyGetData(&buf, sizeof(buf)); return (int16) ntohs(buf);}/* * DoCopy executes the SQL COPY statement. * * Either unload or reload contents of table <relation>, depending on <from>. * (<from> = TRUE means we are inserting into the table.) * * If <pipe> is false, transfer is between the table and the file named * <filename>. Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * * Iff <binary>, unload or reload in the binary format, as opposed to the * more wasteful but more robust and portable text format. * * Iff <oids>, unload or reload the format that includes OID information. * On input, we accept OIDs whether or not the table has an OID column, * but silently drop them if it does not. On output, we report an error * if the user asks for OIDs in a table that has none (not providing an * OID column might seem friendlier, but could seriously confuse programs). * * If in the text format, delimit columns with delimiter <delim> and print * NULL values as <null_print>. * * When loading in the text format from an input stream (as opposed to * a file), recognize a "." on a line by itself as EOF. Also recognize * a stream EOF. When unloading in the text format to an output stream, * write a "." on a line by itself at the end of the data. * * Do not allow a Postgres user without superuser privilege to read from * or write to a file. * * Do not allow the copy if user doesn't have proper permission to access * the table. */voidDoCopy(const CopyStmt *stmt){ RangeVar *relation = stmt->relation; char *filename = stmt->filename; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); List *option; List *attnamelist = stmt->attlist; List *attnumlist; bool binary = false; bool oids = false; char *delim = NULL; char *null_print = NULL; Relation rel; AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); AclResult aclresult; /* Extract options from the statement node tree */ foreach(option, stmt->options) { DefElem *defel = (DefElem *) lfirst(option); if (strcmp(defel->defname, "binary") == 0) { if (binary) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); binary = intVal(defel->arg); } else if (strcmp(defel->defname, "oids") == 0) { if (oids) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); oids = intVal(defel->arg); } else if (strcmp(defel->defname, "delimiter") == 0) { if (delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); delim = strVal(defel->arg); } else if (strcmp(defel->defname, "null") == 0) { if (null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); null_print = strVal(defel->arg); } else elog(ERROR, "option \"%s\" not recognized", defel->defname); } if (binary && delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DELIMITER in BINARY mode"))); if (binary && null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); /* Set defaults */ if (!delim) delim = "\t"; if (!null_print) null_print = "\\N"; /* * Open and lock the relation, using the appropriate lock type. */ rel = heap_openrv(relation, (is_from ? RowExclusiveLock : AccessShareLock)); /* check read-only transaction */ if (XactReadOnly && !is_from && !isTempNamespace(RelationGetNamespace(rel))) ereport(ERROR, (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), errmsg("transaction is read-only"))); /* Check permissions. */ aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), required_access); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_CLASS, RelationGetRelationName(rel)); if (!pipe && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to COPY to or from a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); /* * Presently, only single-character delimiter strings are supported. */ if (strlen(delim) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single character"))); /* * Don't allow COPY w/ OIDs to or from a table without them */ if (oids && !rel->rd_rel->relhasoids) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(rel)))); /* * Generate or convert list of attributes to process */ attnumlist = CopyGetAttnums(rel, attnamelist); /* * Set up variables to avoid per-attribute overhead. */ initStringInfo(&attribute_buf); initStringInfo(&line_buf); line_buf_converted = false; client_encoding = pg_get_client_encoding(); server_encoding = GetDatabaseEncoding(); copy_dest = COPY_FILE; /* default */ copy_file = NULL; copy_msgbuf = NULL; fe_eof = false; if (is_from) { /* copy from file to database */ if (rel->rd_rel->relkind != RELKIND_RELATION) { if (rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", RelationGetRelationName(rel)))); else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to sequence \"%s\"", RelationGetRelationName(rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to non-table relation \"%s\"", RelationGetRelationName(rel)))); } if (pipe) { if (IsUnderPostmaster) ReceiveCopyBegin(binary, length(attnumlist)); else copy_file = stdin; } else { struct stat st; copy_file = AllocateFile(filename, PG_BINARY_R); if (copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for reading: %m", filename))); fstat(fileno(copy_file), &st); if (S_ISDIR(st.st_mode)) { FreeFile(copy_file); ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } } CopyFrom(rel, attnumlist, binary, oids, delim, null_print); } else { /* copy from database to file */ if (rel->rd_rel->relkind != RELKIND_RELATION) { if (rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from view \"%s\"", RelationGetRelationName(rel)))); else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from sequence \"%s\"", RelationGetRelationName(rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from non-table relation \"%s\"", RelationGetRelationName(rel)))); } if (pipe) { if (IsUnderPostmaster) SendCopyBegin(binary, length(attnumlist)); else copy_file = stdout; } else { mode_t oumask; /* Pre-existing umask value */ struct stat st; /* * Prevent write to relative path ... too easy to shoot * oneself in the foot by overwriting a database file ... */ if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask((mode_t) 022); copy_file = AllocateFile(filename, PG_BINARY_W); umask(oumask); if (copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for writing: %m", filename))); fstat(fileno(copy_file), &st); if (S_ISDIR(st.st_mode)) { FreeFile(copy_file); ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } } CopyTo(rel, attnumlist, binary, oids, delim, null_print); } if (!pipe) FreeFile(copy_file); else if (IsUnderPostmaster && !is_from) SendCopyEnd(binary); pfree(attribute_buf.data); pfree(line_buf.data); /* * Close the relation. If reading, we can release the AccessShareLock * we got; if writing, we should hold the lock until end of * transaction to ensure that updates will be committed before lock is * released. */ heap_close(rel, (is_from ? NoLock : AccessShareLock));}/* * Copy from relation TO file. */static voidCopyTo(Relation rel, List *attnumlist, bool binary, bool oids, char *delim, char *null_print){ HeapTuple tuple; TupleDesc tupDesc; HeapScanDesc scandesc; int num_phys_attrs; int attr_count; Form_pg_attribute *attr; FmgrInfo *out_functions; Oid *elements; bool *isvarlena; char *string; Snapshot mySnapshot; List *cur; MemoryContext oldcontext; MemoryContext mycontext; tupDesc = rel->rd_att; attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; attr_count = length(attnumlist); /* * Get info about the columns we need to process. * * +1's here are to avoid palloc(0) in a zero-column table. */ out_functions = (FmgrInfo *) palloc((num_phys_attrs + 1) * sizeof(FmgrInfo)); elements = (Oid *) palloc((num_phys_attrs + 1) * sizeof(Oid)); isvarlena = (bool *) palloc((num_phys_attrs + 1) * sizeof(bool)); foreach(cur, attnumlist) { int attnum = lfirsti(cur); Oid out_func_oid; if (binary) getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &elements[attnum - 1], &isvarlena[attnum - 1]); else getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &elements[attnum - 1], &isvarlena[attnum - 1]); fmgr_info(out_func_oid, &out_functions[attnum - 1]); } /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks * inside datatype output routines, and should be faster than retail * pfree's anyway. (We don't need a whole econtext as CopyFrom does.) */ mycontext = AllocSetContextCreate(CurrentMemoryContext, "COPY TO", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); if (binary) { /* Generate header for a binary copy */ int32 tmp; /* Signature */ CopySendData((char *) BinarySignature, 11); /* Flags field */ tmp = 0; if (oids) tmp |= (1 << 16); CopySendInt32(tmp); /* No header extension */ tmp = 0; CopySendInt32(tmp); } else { /* * For non-binary copy, we need to convert null_print to client * encoding, because it will be sent directly with CopySendString. */ if (server_encoding != client_encoding) null_print = (char *) pg_server_to_client((unsigned char *) null_print, strlen(null_print)); } mySnapshot = CopyQuerySnapshot(); scandesc = heap_beginscan(rel, mySnapshot, 0, NULL); while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { bool need_delim = false; CHECK_FOR_INTERRUPTS(); MemoryContextReset(mycontext); oldcontext = MemoryContextSwitchTo(mycontext); if (binary) { /* Binary per-tuple header */ CopySendInt16(attr_count); /* Send OID if wanted --- note attr_count doesn't include it */ if (oids) { Oid oid = HeapTupleGetOid(tuple); /* Hack --- assume Oid is same size as int32 */ CopySendInt32(sizeof(int32)); CopySendInt32(oid); } } else { /* Text format has no per-tuple header, but send OID if wanted */ if (oids) { string = DatumGetCString(DirectFunctionCall1(oidout, ObjectIdGetDatum(HeapTupleGetOid(tuple)))); CopySendString(string); need_delim = true; } } foreach(cur, attnumlist) { int attnum = lfirsti(cur); Datum value; bool isnull; value = heap_getattr(tuple, attnum, tupDesc, &isnull); if (!binary) { if (need_delim) CopySendChar(delim[0]); need_delim = true; } if (isnull) { if (!binary) CopySendString(null_print); /* null indicator */ else CopySendInt32(-1); /* null marker */ } else { if (!binary) { string = DatumGetCString(FunctionCall3(&out_functions[attnum - 1], value, ObjectIdGetDatum(elements[attnum - 1]), Int32GetDatum(attr[attnum - 1]->atttypmod))); CopyAttributeOut(string, delim); } else { bytea *outputbytes; outputbytes = DatumGetByteaP(FunctionCall2(&out_functions[attnum - 1], value, ObjectIdGetDatum(elements[attnum - 1]))); /* We assume the result will not have been toasted */ CopySendInt32(VARSIZE(outputbytes) - VARHDRSZ); CopySendData(VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); } } } CopySendEndOfRow(binary);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?