📄 cluster.c
字号:
object.objectId = OIDNewHeap; object.objectSubId = 0; /* * The new relation is local to our transaction and we know nothing * depends on it, so DROP_RESTRICT should be OK. */ performDeletion(&object, DROP_RESTRICT); /* performDeletion does CommandCounterIncrement at end */ /* * Recreate each index on the relation. We do not need * CommandCounterIncrement() because rebuild_indexes does it. */ rebuild_indexes(tableOid, indexes);}/* * Create the new table that we will fill with correctly-ordered data. */static Oidmake_new_heap(Oid OIDOldHeap, const char *NewName){ TupleDesc OldHeapDesc, tupdesc; Oid OIDNewHeap; Relation OldHeap; OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock); OldHeapDesc = RelationGetDescr(OldHeap); /* * Need to make a copy of the tuple descriptor, since * heap_create_with_catalog modifies it. */ tupdesc = CreateTupleDescCopyConstr(OldHeapDesc); OIDNewHeap = heap_create_with_catalog(NewName, RelationGetNamespace(OldHeap), tupdesc, OldHeap->rd_rel->relkind, OldHeap->rd_rel->relisshared, ONCOMMIT_NOOP, allowSystemTableMods); /* * Advance command counter so that the newly-created relation's * catalog tuples will be visible to heap_open. */ CommandCounterIncrement(); /* * If necessary, create a TOAST table for the new relation. Note that * AlterTableCreateToastTable ends with CommandCounterIncrement(), so * that the TOAST table will be visible for insertion. */ AlterTableCreateToastTable(OIDNewHeap, true); heap_close(OldHeap, NoLock); return OIDNewHeap;}/* * Do the physical copying of heap data. */static voidcopy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex){ Relation NewHeap, OldHeap, OldIndex; IndexScanDesc scan; HeapTuple tuple; /* * Open the relations I need. Scan through the OldHeap on the OldIndex * and insert each tuple into the NewHeap. */ NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock); OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock); OldIndex = index_open(OIDOldIndex); scan = index_beginscan(OldHeap, OldIndex, SnapshotNow, 0, (ScanKey) NULL); while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL) { /* * We must copy the tuple because heap_insert() will overwrite the * commit-status fields of the tuple it's handed, and the * retrieved tuple will actually be in a disk buffer! Thus, the * source relation would get trashed, which is bad news if we * abort later on. (This was a bug in releases thru 7.0) * * Note that the copied tuple will have the original OID, if any, so * this does preserve OIDs. */ HeapTuple copiedTuple = heap_copytuple(tuple); simple_heap_insert(NewHeap, copiedTuple); heap_freetuple(copiedTuple); CHECK_FOR_INTERRUPTS(); } index_endscan(scan); index_close(OldIndex); heap_close(OldHeap, NoLock); heap_close(NewHeap, NoLock);}/* * Get the necessary info about the indexes of the relation and * return a list of IndexAttrs structures. */static List *get_indexattr_list(Relation OldHeap, Oid OldIndex){ List *indexes = NIL; List *indlist; /* Ask the relcache to produce a list of the indexes of the old rel */ foreach(indlist, RelationGetIndexList(OldHeap)) { Oid indexOID = lfirsto(indlist); Relation oldIndex; IndexAttrs *attrs; oldIndex = index_open(indexOID); attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs)); attrs->indexOID = indexOID; attrs->indexName = pstrdup(NameStr(oldIndex->rd_rel->relname)); attrs->accessMethodOID = oldIndex->rd_rel->relam; attrs->indexInfo = BuildIndexInfo(oldIndex); attrs->classOID = (Oid *) palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); memcpy(attrs->classOID, oldIndex->rd_index->indclass, sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); /* We adjust the isclustered attribute to correct new state */ attrs->isclustered = (indexOID == OldIndex); index_close(oldIndex); /* * Cons the gathered data into the list. We do not care about * ordering, and this is more efficient than append. */ indexes = lcons(attrs, indexes); } return indexes;}/* * Create new indexes and swap the filenodes with old indexes. Then drop * the new index (carrying the old index filenode along). */static voidrebuild_indexes(Oid OIDOldHeap, List *indexes){ List *elem; foreach(elem, indexes) { IndexAttrs *attrs = (IndexAttrs *) lfirst(elem); Oid newIndexOID; char newIndexName[NAMEDATALEN]; ObjectAddress object; Form_pg_index index; HeapTuple tuple; Relation pg_index; /* Create the new index under a temporary name */ snprintf(newIndexName, sizeof(newIndexName), "pg_temp_%u", attrs->indexOID); /* * The new index will have primary and constraint status set to * false, but since we will only use its filenode it doesn't * matter: after the filenode swap the index will keep the * constraint status of the old index. */ newIndexOID = index_create(OIDOldHeap, newIndexName, attrs->indexInfo, attrs->accessMethodOID, attrs->classOID, false, false, allowSystemTableMods); CommandCounterIncrement(); /* Swap the filenodes. */ swap_relfilenodes(attrs->indexOID, newIndexOID); CommandCounterIncrement(); /* * Make sure that indisclustered is correct: it should be set only * for the index we just clustered on. */ pg_index = heap_openr(IndexRelationName, RowExclusiveLock); tuple = SearchSysCacheCopy(INDEXRELID, ObjectIdGetDatum(attrs->indexOID), 0, 0, 0); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for index %u", attrs->indexOID); index = (Form_pg_index) GETSTRUCT(tuple); if (index->indisclustered != attrs->isclustered) { index->indisclustered = attrs->isclustered; simple_heap_update(pg_index, &tuple->t_self, tuple); CatalogUpdateIndexes(pg_index, tuple); } heap_freetuple(tuple); heap_close(pg_index, RowExclusiveLock); /* Destroy new index with old filenode */ object.classId = RelOid_pg_class; object.objectId = newIndexOID; object.objectSubId = 0; /* * The relation is local to our transaction and we know nothing * depends on it, so DROP_RESTRICT should be OK. */ performDeletion(&object, DROP_RESTRICT); /* performDeletion does CommandCounterIncrement() at its end */ }}/* * Swap the relfilenodes for two given relations. * * Also swap any TOAST links, so that the toast data moves along with * the main-table data. */static voidswap_relfilenodes(Oid r1, Oid r2){ Relation relRelation, rel; HeapTuple reltup1, reltup2; Form_pg_class relform1, relform2; Oid swaptemp; int i; CatalogIndexState indstate; /* We need writable copies of both pg_class tuples. */ relRelation = heap_openr(RelationRelationName, RowExclusiveLock); reltup1 = SearchSysCacheCopy(RELOID, ObjectIdGetDatum(r1), 0, 0, 0); if (!HeapTupleIsValid(reltup1)) elog(ERROR, "cache lookup failed for relation %u", r1); relform1 = (Form_pg_class) GETSTRUCT(reltup1); reltup2 = SearchSysCacheCopy(RELOID, ObjectIdGetDatum(r2), 0, 0, 0); if (!HeapTupleIsValid(reltup2)) elog(ERROR, "cache lookup failed for relation %u", r2); relform2 = (Form_pg_class) GETSTRUCT(reltup2); /* * The buffer manager gets confused if we swap relfilenodes for * relations that are not both local or non-local to this transaction. * Flush the buffers on both relations so the buffer manager can * forget about'em. (XXX this might not be necessary anymore?) */ rel = relation_open(r1, NoLock); i = FlushRelationBuffers(rel, 0); if (i < 0) elog(ERROR, "FlushRelationBuffers returned %d", i); relation_close(rel, NoLock); rel = relation_open(r2, NoLock); i = FlushRelationBuffers(rel, 0); if (i < 0) elog(ERROR, "FlushRelationBuffers returned %d", i); relation_close(rel, NoLock); /* * Actually swap the filenode and TOAST fields in the two tuples */ swaptemp = relform1->relfilenode; relform1->relfilenode = relform2->relfilenode; relform2->relfilenode = swaptemp; swaptemp = relform1->reltoastrelid; relform1->reltoastrelid = relform2->reltoastrelid; relform2->reltoastrelid = swaptemp; /* we should not swap reltoastidxid */ /* swap size statistics too, since new rel has freshly-updated stats */ { int4 swap_pages; float4 swap_tuples; swap_pages = relform1->relpages; relform1->relpages = relform2->relpages; relform2->relpages = swap_pages; swap_tuples = relform1->reltuples; relform1->reltuples = relform2->reltuples; relform2->reltuples = swap_tuples; } /* Update the tuples in pg_class */ simple_heap_update(relRelation, &reltup1->t_self, reltup1); simple_heap_update(relRelation, &reltup2->t_self, reltup2); /* Keep system catalogs current */ indstate = CatalogOpenIndexes(relRelation); CatalogIndexInsert(indstate, reltup1); CatalogIndexInsert(indstate, reltup2); CatalogCloseIndexes(indstate); /* * If we have toast tables associated with the relations being * swapped, change their dependency links to re-associate them with * their new owning relations. Otherwise the wrong one will get * dropped ... * * NOTE: it is possible that only one table has a toast table; this can * happen in CLUSTER if there were dropped columns in the old table. * * NOTE: at present, a TOAST table's only dependency is the one on its * owning table. If more are ever created, we'd need to use something * more selective than deleteDependencyRecordsFor() to get rid of only * the link we want. */ if (relform1->reltoastrelid || relform2->reltoastrelid) { ObjectAddress baseobject, toastobject; long count; /* Delete old dependencies */ if (relform1->reltoastrelid) { count = deleteDependencyRecordsFor(RelOid_pg_class, relform1->reltoastrelid); if (count != 1) elog(ERROR, "expected one dependency record for TOAST table, found %ld", count); } if (relform2->reltoastrelid) { count = deleteDependencyRecordsFor(RelOid_pg_class, relform2->reltoastrelid); if (count != 1) elog(ERROR, "expected one dependency record for TOAST table, found %ld", count); } /* Register new dependencies */ baseobject.classId = RelOid_pg_class; baseobject.objectSubId = 0; toastobject.classId = RelOid_pg_class; toastobject.objectSubId = 0; if (relform1->reltoastrelid) { baseobject.objectId = r1; toastobject.objectId = relform1->reltoastrelid; recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); } if (relform2->reltoastrelid) { baseobject.objectId = r2; toastobject.objectId = relform2->reltoastrelid; recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); } } /* * Blow away the old relcache entries now. We need this kluge because * relcache.c indexes relcache entries by rd_node as well as OID. It * will get confused if it is asked to (re)build an entry with a new * rd_node value when there is still another entry laying about with * that same rd_node value. (Fortunately, since one of the entries is * local in our transaction, it's sufficient to clear out our own * relcache this way; the problem cannot arise for other backends when * they see our update on the non-local relation.) */ RelationForgetRelation(r1); RelationForgetRelation(r2); /* Clean up. */ heap_freetuple(reltup1); heap_freetuple(reltup2); heap_close(relRelation, RowExclusiveLock);}/* * Get a list of tables that the current user owns and * have indisclustered set. Return the list in a List * of rvsToCluster * with the tableOid and the indexOid on which the table is already * clustered. */static List *get_tables_to_cluster(MemoryContext cluster_context){ Relation indRelation; HeapScanDesc scan; ScanKeyData entry; HeapTuple indexTuple; Form_pg_index index; MemoryContext old_context; RelToCluster *rvtc; List *rvs = NIL; /* * Get all indexes that have indisclustered set and are owned by * appropriate user. System relations or nailed-in relations cannot * ever have indisclustered set, because CLUSTER will refuse to set it * when called with one of them as argument. */ indRelation = relation_openr(IndexRelationName, AccessShareLock); ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered, F_BOOLEQ, BoolGetDatum(true)); scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry); while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { index = (Form_pg_index) GETSTRUCT(indexTuple); if (!pg_class_ownercheck(index->indrelid, GetUserId())) continue; /* * We have to build the list in a different memory context so it * will survive the cross-transaction processing */ old_context = MemoryContextSwitchTo(cluster_context); rvtc = (RelToCluster *) palloc(sizeof(RelToCluster)); rvtc->tableOid = index->indrelid; rvtc->indexOid = index->indexrelid; rvs = lcons(rvtc, rvs); MemoryContextSwitchTo(old_context); } heap_endscan(scan); relation_close(indRelation, AccessShareLock); return rvs;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -