⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbcommands.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------- * * dbcommands.c *		Database management commands (create/drop database). * * Note: database creation/destruction commands take ExclusiveLock on * pg_database to ensure that no two proceed in parallel.  We must use * at least this level of locking to ensure that no two backends try to * write the flat-file copy of pg_database at once.  We avoid using * AccessExclusiveLock since there's no need to lock out ordinary readers * of pg_database. * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION *	  $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.173.2.1 2005/11/22 18:23:07 momjian Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include <fcntl.h>#include <unistd.h>#include <sys/stat.h>#include "access/genam.h"#include "access/heapam.h"#include "catalog/catalog.h"#include "catalog/dependency.h"#include "catalog/indexing.h"#include "catalog/pg_authid.h"#include "catalog/pg_database.h"#include "catalog/pg_tablespace.h"#include "commands/comment.h"#include "commands/dbcommands.h"#include "commands/tablespace.h"#include "mb/pg_wchar.h"#include "miscadmin.h"#include "postmaster/bgwriter.h"#include "storage/fd.h"#include "storage/freespace.h"#include "storage/procarray.h"#include "utils/acl.h"#include "utils/array.h"#include "utils/builtins.h"#include "utils/flatfiles.h"#include "utils/fmgroids.h"#include "utils/guc.h"#include "utils/lsyscache.h"#include "utils/syscache.h"/* non-export function prototypes */static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,			Oid *dbLastSysOidP,			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,			Oid *dbTablespace);static bool have_createdb_privilege(void);static void remove_dbtablespaces(Oid db_id);/* * CREATE DATABASE */voidcreatedb(const CreatedbStmt *stmt){	HeapScanDesc scan;	Relation	rel;	Oid			src_dboid;	Oid			src_owner;	int			src_encoding;	bool		src_istemplate;	bool		src_allowconn;	Oid			src_lastsysoid;	TransactionId src_vacuumxid;	TransactionId src_frozenxid;	Oid			src_deftablespace;	volatile Oid dst_deftablespace;	volatile Relation pg_database_rel;	HeapTuple	tuple;	TupleDesc	pg_database_dsc;	Datum		new_record[Natts_pg_database];	char		new_record_nulls[Natts_pg_database];	Oid			dboid;	volatile Oid datdba;	ListCell   *option;	DefElem    *dtablespacename = NULL;	DefElem    *downer = NULL;	DefElem    *dtemplate = NULL;	DefElem    *dencoding = NULL;	DefElem    *dconnlimit = NULL;	char	   *dbname = stmt->dbname;	char	   *dbowner = NULL;	const char *dbtemplate = NULL;	volatile int encoding = -1;	volatile int dbconnlimit = -1;	/* don't call this in a transaction block */	PreventTransactionChain((void *) stmt, "CREATE DATABASE");	/* Extract options from the statement node tree */	foreach(option, stmt->options)	{		DefElem    *defel = (DefElem *) lfirst(option);		if (strcmp(defel->defname, "tablespace") == 0)		{			if (dtablespacename)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			dtablespacename = defel;		}		else if (strcmp(defel->defname, "owner") == 0)		{			if (downer)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			downer = defel;		}		else if (strcmp(defel->defname, "template") == 0)		{			if (dtemplate)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			dtemplate = defel;		}		else if (strcmp(defel->defname, "encoding") == 0)		{			if (dencoding)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			dencoding = defel;		}		else if (strcmp(defel->defname, "connectionlimit") == 0)		{			if (dconnlimit)				ereport(ERROR,						(errcode(ERRCODE_SYNTAX_ERROR),						 errmsg("conflicting or redundant options")));			dconnlimit = defel;		}		else if (strcmp(defel->defname, "location") == 0)		{			ereport(WARNING,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),					 errmsg("LOCATION is not supported anymore"),					 errhint("Consider using tablespaces instead.")));		}		else			elog(ERROR, "option \"%s\" not recognized",				 defel->defname);	}	if (downer && downer->arg)		dbowner = strVal(downer->arg);	if (dtemplate && dtemplate->arg)		dbtemplate = strVal(dtemplate->arg);	if (dencoding && dencoding->arg)	{		const char *encoding_name;		if (IsA(dencoding->arg, Integer))		{			encoding = intVal(dencoding->arg);			encoding_name = pg_encoding_to_char(encoding);			if (strcmp(encoding_name, "") == 0 ||				pg_valid_server_encoding(encoding_name) < 0)				ereport(ERROR,						(errcode(ERRCODE_UNDEFINED_OBJECT),						 errmsg("%d is not a valid encoding code",								encoding)));		}		else if (IsA(dencoding->arg, String))		{			encoding_name = strVal(dencoding->arg);			if (pg_valid_server_encoding(encoding_name) < 0)				ereport(ERROR,						(errcode(ERRCODE_UNDEFINED_OBJECT),						 errmsg("%s is not a valid encoding name",								encoding_name)));			encoding = pg_char_to_encoding(encoding_name);		}		else			elog(ERROR, "unrecognized node type: %d",				 nodeTag(dencoding->arg));	}	if (dconnlimit && dconnlimit->arg)		dbconnlimit = intVal(dconnlimit->arg);	/* obtain OID of proposed owner */	if (dbowner)		datdba = get_roleid_checked(dbowner);	else		datdba = GetUserId();	/*	 * To create a database, must have createdb privilege and must be able to	 * become the target role (this does not imply that the target role itself	 * must have createdb privilege).  The latter provision guards against	 * "giveaway" attacks.	Note that a superuser will always have both of	 * these privileges a fortiori.	 */	if (!have_createdb_privilege())		ereport(ERROR,				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),				 errmsg("permission denied to create database")));	check_is_member_of_role(GetUserId(), datdba);	/*	 * Check for db name conflict.	There is a race condition here, since	 * another backend could create the same DB name before we commit.	 * However, holding an exclusive lock on pg_database for the whole time we	 * are copying the source database doesn't seem like a good idea, so	 * accept possibility of race to create.  We will check again after we	 * grab the exclusive lock.	 */	if (get_db_info(dbname, NULL, NULL, NULL,					NULL, NULL, NULL, NULL, NULL, NULL))		ereport(ERROR,				(errcode(ERRCODE_DUPLICATE_DATABASE),				 errmsg("database \"%s\" already exists", dbname)));	/*	 * Lookup database (template) to be cloned.	 */	if (!dbtemplate)		dbtemplate = "template1";		/* Default template database name */	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,					 &src_istemplate, &src_allowconn, &src_lastsysoid,					 &src_vacuumxid, &src_frozenxid, &src_deftablespace))		ereport(ERROR,				(errcode(ERRCODE_UNDEFINED_DATABASE),			 errmsg("template database \"%s\" does not exist", dbtemplate)));	/*	 * Permission check: to copy a DB that's not marked datistemplate, you	 * must be superuser or the owner thereof.	 */	if (!src_istemplate)	{		if (!pg_database_ownercheck(src_dboid, GetUserId()))			ereport(ERROR,					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),					 errmsg("permission denied to copy database \"%s\"",							dbtemplate)));	}	/*	 * The source DB can't have any active backends, except this one	 * (exception is to allow CREATE DB while connected to template1).	 * Otherwise we might copy inconsistent data.  This check is not	 * bulletproof, since someone might connect while we are copying...	 */	if (DatabaseHasActiveBackends(src_dboid, true))		ereport(ERROR,				(errcode(ERRCODE_OBJECT_IN_USE),			errmsg("source database \"%s\" is being accessed by other users",				   dbtemplate)));	/* If encoding is defaulted, use source's encoding */	if (encoding < 0)		encoding = src_encoding;	/* Some encodings are client only */	if (!PG_VALID_BE_ENCODING(encoding))		ereport(ERROR,				(errcode(ERRCODE_WRONG_OBJECT_TYPE),				 errmsg("invalid server encoding %d", encoding)));	/* Resolve default tablespace for new database */	if (dtablespacename && dtablespacename->arg)	{		char	   *tablespacename;		AclResult	aclresult;		tablespacename = strVal(dtablespacename->arg);		dst_deftablespace = get_tablespace_oid(tablespacename);		if (!OidIsValid(dst_deftablespace))			ereport(ERROR,					(errcode(ERRCODE_UNDEFINED_OBJECT),					 errmsg("tablespace \"%s\" does not exist",							tablespacename)));		/* check permissions */		aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),										   ACL_CREATE);		if (aclresult != ACLCHECK_OK)			aclcheck_error(aclresult, ACL_KIND_TABLESPACE,						   tablespacename);		/*		 * If we are trying to change the default tablespace of the template,		 * we require that the template not have any files in the new default		 * tablespace.	This is necessary because otherwise the copied		 * database would contain pg_class rows that refer to its default		 * tablespace both explicitly (by OID) and implicitly (as zero), which		 * would cause problems.  For example another CREATE DATABASE using		 * the copied database as template, and trying to change its default		 * tablespace again, would yield outright incorrect results (it would		 * improperly move tables to the new default tablespace that should		 * stay in the same tablespace).		 */		if (dst_deftablespace != src_deftablespace)		{			char	   *srcpath;			struct stat st;			srcpath = GetDatabasePath(src_dboid, dst_deftablespace);			if (stat(srcpath, &st) == 0 &&				S_ISDIR(st.st_mode) &&				!directory_is_empty(srcpath))				ereport(ERROR,						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),						 errmsg("cannot assign new default tablespace \"%s\"",								tablespacename),						 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",								   dbtemplate)));			pfree(srcpath);		}	}	else	{		/* Use template database's default tablespace */		dst_deftablespace = src_deftablespace;		/* Note there is no additional permission check in this path */	}	/*	 * Normally we mark the new database with the same datvacuumxid and	 * datfrozenxid as the source.	However, if the source is not allowing	 * connections then we assume it is fully frozen, and we can set the	 * current transaction ID as the xid limits.  This avoids immediately	 * starting to generate warnings after cloning template0.	 */	if (!src_allowconn)		src_vacuumxid = src_frozenxid = GetCurrentTransactionId();	/*	 * Preassign OID for pg_database tuple, so that we can compute db path. We	 * have to open pg_database to do this, but we don't want to take	 * ExclusiveLock yet, so just do it and close again.	 */	pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock);	dboid = GetNewOid(pg_database_rel);	heap_close(pg_database_rel, AccessShareLock);	pg_database_rel = NULL;	/*	 * Force dirty buffers out to disk, to ensure source database is	 * up-to-date for the copy.  (We really only need to flush buffers for the	 * source database, but bufmgr.c provides no API for that.)	 */	BufferSync();	/*	 * Once we start copying subdirectories, we need to be able to clean 'em	 * up if we fail.  Establish a TRY block to make sure this happens. (This	 * is not a 100% solution, because of the possibility of failure during	 * transaction commit after we leave this routine, but it should handle	 * most scenarios.)	 */	PG_TRY();	{		/*		 * Iterate through all tablespaces of the template database, and copy		 * each one to the new database.		 */		rel = heap_open(TableSpaceRelationId, AccessShareLock);		scan = heap_beginscan(rel, SnapshotNow, 0, NULL);		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)		{			Oid			srctablespace = HeapTupleGetOid(tuple);			Oid			dsttablespace;			char	   *srcpath;			char	   *dstpath;			struct stat st;			/* No need to copy global tablespace */			if (srctablespace == GLOBALTABLESPACE_OID)				continue;			srcpath = GetDatabasePath(src_dboid, srctablespace);			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||				directory_is_empty(srcpath))			{				/* Assume we can ignore it */				pfree(srcpath);				continue;			}			if (srctablespace == src_deftablespace)				dsttablespace = dst_deftablespace;			else				dsttablespace = srctablespace;			dstpath = GetDatabasePath(dboid, dsttablespace);			/*			 * Copy this subdirectory to the new location			 *			 * We don't need to copy subdirectories			 */			copydir(srcpath, dstpath, false);			/* Record the filesystem change in XLOG */			{				xl_dbase_create_rec xlrec;				XLogRecData rdata[1];				xlrec.db_id = dboid;				xlrec.tablespace_id = dsttablespace;				xlrec.src_db_id = src_dboid;				xlrec.src_tablespace_id = srctablespace;				rdata[0].data = (char *) &xlrec;				rdata[0].len = sizeof(xl_dbase_create_rec);				rdata[0].buffer = InvalidBuffer;				rdata[0].next = NULL;				(void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);			}		}		heap_endscan(scan);		heap_close(rel, AccessShareLock);		/*		 * Now OK to grab exclusive lock on pg_database.		 */		pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);		/* Check to see if someone else created same DB name meanwhile. */		if (get_db_info(dbname, NULL, NULL, NULL,						NULL, NULL, NULL, NULL, NULL, NULL))			ereport(ERROR,					(errcode(ERRCODE_DUPLICATE_DATABASE),					 errmsg("database \"%s\" already exists", dbname)));		/*		 * Insert a new tuple into pg_database		 */		pg_database_dsc = RelationGetDescr(pg_database_rel);		/* Form tuple */		MemSet(new_record, 0, sizeof(new_record));		MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));		new_record[Anum_pg_database_datname - 1] =			DirectFunctionCall1(namein, CStringGetDatum(dbname));		new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);		new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);		new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);		new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);		new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);		new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -