⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cluster.c

📁 PostgreSQL7.4.6 for Linux
💻 C
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------- * * cluster.c *	  CLUSTER a table on an index. * * There is hardly anything left of Paul Brown's original implementation... * * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994-5, Regents of the University of California * * * IDENTIFICATION *	  $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.116.2.1 2004/08/31 23:16:36 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/genam.h"#include "access/heapam.h"#include "catalog/catalog.h"#include "catalog/catname.h"#include "catalog/dependency.h"#include "catalog/heap.h"#include "catalog/index.h"#include "catalog/indexing.h"#include "catalog/namespace.h"#include "commands/cluster.h"#include "commands/tablecmds.h"#include "miscadmin.h"#include "utils/acl.h"#include "utils/fmgroids.h"#include "utils/lsyscache.h"#include "utils/syscache.h"#include "utils/relcache.h"/* * We need one of these structs for each index in the relation to be * clustered.  It's basically the data needed by index_create() so * we can rebuild the indexes on the new heap. */typedef struct{	Oid			indexOID;	char	   *indexName;	IndexInfo  *indexInfo;	Oid			accessMethodOID;	Oid		   *classOID;	bool		isclustered;} IndexAttrs;/* * This struct is used to pass around the information on tables to be * clustered. We need this so we can make a list of them when invoked without * a specific table/index pair. */typedef struct{	Oid			tableOid;	Oid			indexOid;} RelToCluster;static void cluster_rel(RelToCluster *rv, bool recheck);static Oid	make_new_heap(Oid OIDOldHeap, const char *NewName);static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);static void rebuild_indexes(Oid OIDOldHeap, List *indexes);static void swap_relfilenodes(Oid r1, Oid r2);static List *get_tables_to_cluster(MemoryContext cluster_context);/*--------------------------------------------------------------------------- * This cluster code allows for clustering multiple tables at once. Because * of this, we cannot just run everything on a single transaction, or we * would be forced to acquire exclusive locks on all the tables being * clustered, simultaneously --- very likely leading to deadlock. * * To solve this we follow a similar strategy to VACUUM code, * clustering each relation in a separate transaction. For this to work, * we need to: *	- provide a separate memory context so that we can pass information in *	  a way that survives across transactions *	- start a new transaction every time a new relation is clustered *	- check for validity of the information on to-be-clustered relations, *	  as someone might have deleted a relation behind our back, or *	  clustered one on a different index *	- end the transaction * * The single-relation case does not have any such overhead. * * We also allow a relation being specified without index.	In that case, * the indisclustered bit will be looked up, and an ERROR will be thrown * if there is no index with the bit set. *--------------------------------------------------------------------------- */voidcluster(ClusterStmt *stmt){	if (stmt->relation != NULL)	{		/* This is the single-relation case. */		Oid			tableOid,					indexOid = InvalidOid;		Relation	rel;		RelToCluster rvtc;		/* Find and lock the table */		rel = heap_openrv(stmt->relation, AccessExclusiveLock);		tableOid = RelationGetRelid(rel);		/* Check permissions */		if (!pg_class_ownercheck(tableOid, GetUserId()))			aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,						   RelationGetRelationName(rel));		if (stmt->indexname == NULL)		{			List	   *index;			/* We need to find the index that has indisclustered set. */			foreach(index, RelationGetIndexList(rel))			{				HeapTuple	idxtuple;				Form_pg_index indexForm;				indexOid = lfirsto(index);				idxtuple = SearchSysCache(INDEXRELID,										  ObjectIdGetDatum(indexOid),										  0, 0, 0);				if (!HeapTupleIsValid(idxtuple))					elog(ERROR, "cache lookup failed for index %u", indexOid);				indexForm = (Form_pg_index) GETSTRUCT(idxtuple);				if (indexForm->indisclustered)				{					ReleaseSysCache(idxtuple);					break;				}				ReleaseSysCache(idxtuple);				indexOid = InvalidOid;			}			if (!OidIsValid(indexOid))				ereport(ERROR,						(errcode(ERRCODE_UNDEFINED_OBJECT),						 errmsg("there is no previously clustered index for table \"%s\"",								stmt->relation->relname)));		}		else		{			/*			 * The index is expected to be in the same namespace as the			 * relation.			 */			indexOid = get_relname_relid(stmt->indexname,										 rel->rd_rel->relnamespace);			if (!OidIsValid(indexOid))				ereport(ERROR,						(errcode(ERRCODE_UNDEFINED_OBJECT),				   errmsg("index \"%s\" for table \"%s\" does not exist",						  stmt->indexname, stmt->relation->relname)));		}		/* All other checks are done in cluster_rel() */		rvtc.tableOid = tableOid;		rvtc.indexOid = indexOid;		/* close relation, keep lock till commit */		heap_close(rel, NoLock);		/* Do the job */		cluster_rel(&rvtc, false);	}	else	{		/*		 * This is the "multi relation" case. We need to cluster all		 * tables that have some index with indisclustered set.		 */		MemoryContext cluster_context;		List	   *rv,				   *rvs;		/*		 * We cannot run this form of CLUSTER inside a user transaction		 * block; we'd be holding locks way too long.		 */		PreventTransactionChain((void *) stmt, "CLUSTER");		/*		 * Create special memory context for cross-transaction storage.		 *		 * Since it is a child of PortalContext, it will go away even in case		 * of error.		 */		cluster_context = AllocSetContextCreate(PortalContext,												"Cluster",												ALLOCSET_DEFAULT_MINSIZE,												ALLOCSET_DEFAULT_INITSIZE,												ALLOCSET_DEFAULT_MAXSIZE);		/*		 * Build the list of relations to cluster.	Note that this lives		 * in cluster_context.		 */		rvs = get_tables_to_cluster(cluster_context);		/* Commit to get out of starting transaction */		CommitTransactionCommand();		/* Ok, now that we've got them all, cluster them one by one */		foreach(rv, rvs)		{			RelToCluster *rvtc = (RelToCluster *) lfirst(rv);			/* Start a new transaction for each relation. */			StartTransactionCommand();			SetQuerySnapshot(); /* might be needed for functions in								 * indexes */			cluster_rel(rvtc, true);			CommitTransactionCommand();		}		/* Start a new transaction for the cleanup work. */		StartTransactionCommand();		/* Clean up working storage */		MemoryContextDelete(cluster_context);	}}/* * cluster_rel * * This clusters the table by creating a new, clustered table and * swapping the relfilenodes of the new table and the old table, so * the OID of the original table is preserved.	Thus we do not lose * GRANT, inheritance nor references to this table (this was a bug * in releases thru 7.3). * * Also create new indexes and swap the filenodes with the old indexes the * same way we do for the relation.  Since we are effectively bulk-loading * the new table, it's better to create the indexes afterwards than to fill * them incrementally while we load the table. */static voidcluster_rel(RelToCluster *rvtc, bool recheck){	Relation	OldHeap,				OldIndex;	/* Check for user-requested abort. */	CHECK_FOR_INTERRUPTS();	/*	 * Since we may open a new transaction for each relation, we have to	 * check that the relation still is what we think it is.	 *	 * If this is a single-transaction CLUSTER, we can skip these tests. We	 * *must* skip the one on indisclustered since it would reject an	 * attempt to cluster a not-previously-clustered index.	 */	if (recheck)	{		HeapTuple	tuple;		Form_pg_index indexForm;		/*		 * Check if the relation and index still exist before opening them		 */		if (!SearchSysCacheExists(RELOID,								  ObjectIdGetDatum(rvtc->tableOid),								  0, 0, 0) ||			!SearchSysCacheExists(RELOID,								  ObjectIdGetDatum(rvtc->indexOid),								  0, 0, 0))			return;		/* Check that the user still owns the relation */		if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))			return;		/*		 * Check that the index is still the one with indisclustered set.		 */		tuple = SearchSysCache(INDEXRELID,							   ObjectIdGetDatum(rvtc->indexOid),							   0, 0, 0);		if (!HeapTupleIsValid(tuple))			return;				/* could have gone away... */		indexForm = (Form_pg_index) GETSTRUCT(tuple);		if (!indexForm->indisclustered)		{			ReleaseSysCache(tuple);			return;		}		ReleaseSysCache(tuple);	}	/*	 * We grab exclusive access to the target rel and index for the	 * duration of the transaction.  (This is redundant for the single-	 * transaction case, since cluster() already did it.)	 */	OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);	OldIndex = index_open(rvtc->indexOid);	LockRelation(OldIndex, AccessExclusiveLock);	/*	 * Check that index is in fact an index on the given relation	 */	if (OldIndex->rd_index == NULL ||		OldIndex->rd_index->indrelid != rvtc->tableOid)		ereport(ERROR,				(errcode(ERRCODE_WRONG_OBJECT_TYPE),				 errmsg("\"%s\" is not an index for table \"%s\"",						RelationGetRelationName(OldIndex),						RelationGetRelationName(OldHeap))));	/*	 * Disallow clustering on incomplete indexes (those that might not	 * index every row of the relation).  We could relax this by making a	 * separate seqscan pass over the table to copy the missing rows, but	 * that seems expensive and tedious.	 */	if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))		ereport(ERROR,				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),				 errmsg("cannot cluster on partial index")));	if (!OldIndex->rd_am->amindexnulls)	{		AttrNumber	colno;		/*		 * If the AM doesn't index nulls, then it's a partial index unless		 * we can prove all the rows are non-null.	Note we only need look		 * at the first column; multicolumn-capable AMs are *required* to		 * index nulls in columns after the first.		 */		colno = OldIndex->rd_index->indkey[0];		if (colno > 0)		{			/* ordinary user attribute */			if (!OldHeap->rd_att->attrs[colno - 1]->attnotnull)				ereport(ERROR,						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),						 errmsg("cannot cluster when index access method does not handle null values"),						 errhint("You may be able to work around this by marking column \"%s\" NOT NULL.",				  NameStr(OldHeap->rd_att->attrs[colno - 1]->attname))));		}		else if (colno < 0)		{			/* system column --- okay, always non-null */		}		else		{			/* index expression, lose... */			ereport(ERROR,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),					 errmsg("cannot cluster on expressional index when index access method does not handle null values")));		}	}	/*	 * Disallow clustering system relations.  This will definitely NOT	 * work for shared relations (we have no way to update pg_class rows	 * in other databases), nor for nailed-in-cache relations (the	 * relfilenode values for those are hardwired, see relcache.c).  It	 * might work for other system relations, but I ain't gonna risk it.	 */	if (IsSystemRelation(OldHeap))		ereport(ERROR,				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),				 errmsg("\"%s\" is a system catalog",						RelationGetRelationName(OldHeap))));	/*	 * Don't allow cluster on temp tables of other backends ... their	 * local buffer manager is not going to cope.	 */	if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))		ereport(ERROR,				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),			   errmsg("cannot cluster temporary tables of other sessions")));	/* Drop relcache refcnt on OldIndex, but keep lock */	index_close(OldIndex);	/* rebuild_relation does all the dirty work */	rebuild_relation(OldHeap, rvtc->indexOid);	/* NB: rebuild_relation does heap_close() on OldHeap */}/* * rebuild_relation: rebuild an existing relation * * This is shared code between CLUSTER and TRUNCATE.  In the TRUNCATE * case, the new relation is built and left empty.	In the CLUSTER case, * it is filled with data read from the old relation in the order specified * by the index. * * OldHeap: table to rebuild --- must be opened and exclusive-locked! * indexOid: index to cluster by, or InvalidOid in TRUNCATE case * * NB: this routine closes OldHeap at the right time; caller should not. */voidrebuild_relation(Relation OldHeap, Oid indexOid){	Oid			tableOid = RelationGetRelid(OldHeap);	List	   *indexes;	Oid			OIDNewHeap;	char		NewHeapName[NAMEDATALEN];	ObjectAddress object;	/* Save the information about all indexes on the relation. */	indexes = get_indexattr_list(OldHeap, indexOid);	/* Close relcache entry, but keep lock until transaction commit */	heap_close(OldHeap, NoLock);	/*	 * Create the new heap, using a temporary name in the same namespace	 * as the existing table.  NOTE: there is some risk of collision with	 * user relnames.  Working around this seems more trouble than it's	 * worth; in particular, we can't create the new heap in a different	 * namespace from the old, or we will have problems with the TEMP	 * status of temp tables.	 */	snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", tableOid);	OIDNewHeap = make_new_heap(tableOid, NewHeapName);	/*	 * We don't need CommandCounterIncrement() because make_new_heap did	 * it.	 */	/*	 * Copy the heap data into the new table in the desired order.	 */	if (OidIsValid(indexOid))		copy_heap_data(OIDNewHeap, tableOid, indexOid);	/* To make the new heap's data visible (probably not needed?). */	CommandCounterIncrement();	/* Swap the relfilenodes of the old and new heaps. */	swap_relfilenodes(tableOid, OIDNewHeap);	CommandCounterIncrement();	/* Destroy new heap with old filenode */	object.classId = RelOid_pg_class;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -