📄 heapam.c
字号:
* The assignment of t_min (and thus the others) should be * removed eventually. * * Currently places the tuple onto the last page. If there is no room, * it is placed on new pages. (Heap relations) * Note that concurrent inserts during a scan will probably have * unexpected results, though this will be fixed eventually. * * Fix to work with indexes. * ---------------- */Oidheap_insert(Relation relation, HeapTuple tup){ /* ---------------- * increment access statistics * ---------------- */ IncrHeapAccessStat(local_insert); IncrHeapAccessStat(global_insert); /* ---------------- * If the object id of this tuple has already been assigned, trust * the caller. There are a couple of ways this can happen. At initial * db creation, the backend program sets oids for tuples. When we * define an index, we set the oid. Finally, in the future, we may * allow users to set their own object ids in order to support a * persistent object store (objects need to contain pointers to one * another). * ---------------- */ if (!OidIsValid(tup->t_data->t_oid)) { tup->t_data->t_oid = newoid(); LastOidProcessed = tup->t_data->t_oid; } else CheckMaxObjectId(tup->t_data->t_oid); TransactionIdStore(GetCurrentTransactionId(), &(tup->t_data->t_xmin)); tup->t_data->t_cmin = GetCurrentCommandId(); StoreInvalidTransactionId(&(tup->t_data->t_xmax)); tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; RelationPutHeapTupleAtEnd(relation, tup); if (IsSystemRelationName(RelationGetRelationName(relation)->data)) RelationInvalidateHeapTuple(relation, tup); return tup->t_data->t_oid;}/* * heap_delete - delete a tuple */intheap_delete(Relation relation, ItemPointer tid, ItemPointer ctid){ ItemId lp; HeapTupleData tp; PageHeader dp; Buffer buffer; int result; /* increment access statistics */ IncrHeapAccessStat(local_delete); IncrHeapAccessStat(global_delete); Assert(ItemPointerIsValid(tid)); buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(buffer)) elog(ERROR, "heap_delete: failed ReadBuffer"); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); dp = (PageHeader) BufferGetPage(buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tp.t_len = ItemIdGetLength(lp); tp.t_self = *tid;l1: result = HeapTupleSatisfiesUpdate(&tp); if (result == HeapTupleInvisible) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); elog(ERROR, "heap_delete: (am)invalid tid"); } else if (result == HeapTupleBeingUpdated) { TransactionId xwait = tp.t_data->t_xmax; /* sleep until concurrent transaction ends */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); if (TransactionIdDidAbort(xwait)) goto l1; /* * xwait is committed but if xwait had just marked * the tuple for update then some other xaction could * update this tuple before we got to this point. */ if (tp.t_data->t_xmax != xwait) goto l1; if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED)) { tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; SetBufferCommitInfoNeedsSave(buffer); } /* if tuple was marked for update but not updated... */ if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); if (ctid != NULL) *ctid = tp.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return result; } /* store transaction information of xact deleting the tuple */ TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax)); tp.t_data->t_cmax = GetCurrentCommandId(); tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* invalidate caches */ RelationInvalidateHeapTuple(relation, &tp); WriteBuffer(buffer); return HeapTupleMayBeUpdated;}/* * heap_replace - replace a tuple */intheap_replace(Relation relation, ItemPointer otid, HeapTuple newtup, ItemPointer ctid){ ItemId lp; HeapTupleData oldtup; PageHeader dp; Buffer buffer; int result; /* increment access statistics */ IncrHeapAccessStat(local_replace); IncrHeapAccessStat(global_replace); Assert(ItemPointerIsValid(otid)); buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid)); if (!BufferIsValid(buffer)) elog(ERROR, "amreplace: failed ReadBuffer"); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); dp = (PageHeader) BufferGetPage(buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid)); oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp); oldtup.t_len = ItemIdGetLength(lp); oldtup.t_self = *otid;l2: result = HeapTupleSatisfiesUpdate(&oldtup); if (result == HeapTupleInvisible) { LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); elog(ERROR, "heap_replace: (am)invalid tid"); } else if (result == HeapTupleBeingUpdated) { TransactionId xwait = oldtup.t_data->t_xmax; /* sleep untill concurrent transaction ends */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); if (TransactionIdDidAbort(xwait)) goto l2; /* * xwait is committed but if xwait had just marked * the tuple for update then some other xaction could * update this tuple before we got to this point. */ if (oldtup.t_data->t_xmax != xwait) goto l2; if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED)) { oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; SetBufferCommitInfoNeedsSave(buffer); } /* if tuple was marked for update but not updated... */ if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); if (ctid != NULL) *ctid = oldtup.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return result; } /* XXX order problems if not atomic assignment ??? */ newtup->t_data->t_oid = oldtup.t_data->t_oid; TransactionIdStore(GetCurrentTransactionId(), &(newtup->t_data->t_xmin)); newtup->t_data->t_cmin = GetCurrentCommandId(); StoreInvalidTransactionId(&(newtup->t_data->t_xmax)); newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK); newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED); /* logically delete old item */ TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); oldtup.t_data->t_cmax = GetCurrentCommandId(); oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); /* insert new item */ if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp)) RelationPutHeapTuple(relation, buffer, newtup); else { /* * New item won't fit on same page as old item, have to look for a * new place to put it. Note that we have to unlock current buffer * context - not good but RelationPutHeapTupleAtEnd uses extend * lock. */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); RelationPutHeapTupleAtEnd(relation, newtup); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); } /* * New item in place, now record address of new tuple in t_ctid of old * one. */ oldtup.t_data->t_ctid = newtup->t_self; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* invalidate caches */ RelationInvalidateHeapTuple(relation, &oldtup); WriteBuffer(buffer); return HeapTupleMayBeUpdated;}/* * heap_mark4update - mark a tuple for update */intheap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer){ ItemPointer tid = &(tuple->t_self); ItemId lp; PageHeader dp; int result; /* increment access statistics */ IncrHeapAccessStat(local_mark4update); IncrHeapAccessStat(global_mark4update); *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); if (!BufferIsValid(*buffer)) elog(ERROR, "heap_mark4update: failed ReadBuffer"); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); dp = (PageHeader) BufferGetPage(*buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tuple->t_len = ItemIdGetLength(lp);l3: result = HeapTupleSatisfiesUpdate(tuple); if (result == HeapTupleInvisible) { LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(*buffer); elog(ERROR, "heap_mark4update: (am)invalid tid"); } else if (result == HeapTupleBeingUpdated) { TransactionId xwait = tuple->t_data->t_xmax; /* sleep untill concurrent transaction ends */ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); if (TransactionIdDidAbort(xwait)) goto l3; /* * xwait is committed but if xwait had just marked * the tuple for update then some other xaction could * update this tuple before we got to this point. */ if (tuple->t_data->t_xmax != xwait) goto l3; if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED)) { tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; SetBufferCommitInfoNeedsSave(*buffer); } /* if tuple was marked for update but not updated... */ if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); tuple->t_self = tuple->t_data->t_ctid; LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return result; } /* store transaction information of xact marking the tuple */ TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax)); tuple->t_data->t_cmax = GetCurrentCommandId(); tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE; LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); WriteNoReleaseBuffer(*buffer); return HeapTupleMayBeUpdated;}/* ---------------- * heap_markpos - mark scan position * * Note: * Should only one mark be maintained per scan at one time. * Check if this can be done generally--say calls to get the * next/previous tuple and NEVER pass struct scandesc to the * user AM's. Now, the mark is sent to the executor for safekeeping. * Probably can store this info into a GENERAL scan structure. * * May be best to change this call to store the marked position * (up to 2?) in the scan structure itself. * Fix to use the proper caching structure. * ---------------- */voidheap_markpos(HeapScanDesc scan){ /* ---------------- * increment access statistics * ---------------- */ IncrHeapAccessStat(local_markpos); IncrHeapAccessStat(global_markpos); /* Note: no locking manipulations needed */ if (scan->rs_ptup.t_data == NULL && BufferIsUnknown(scan->rs_pbuf)) { /* == NONTUP */ scan->rs_ptup = scan->rs_ctup; heapgettup(scan->rs_rd, &(scan->rs_ptup), -1, &scan->rs_pbuf, scan->rs_snapshot, scan->rs_nkeys, scan->rs_key); } else if (scan->rs_ntup.t_data == NULL && BufferIsUnknown(scan->rs_nbuf)) { /* == NONTUP */ scan->rs_ntup = scan->rs_ctup; heapgettup(scan->rs_rd, &(scan->rs_ntup), 1, &scan->rs_nbuf, scan->rs_snapshot, scan->rs_nkeys, scan->rs_key); } /* ---------------- * Should not unpin the buffer pages. They may still be in use. * ---------------- */ if (scan->rs_ptup.t_data != NULL) scan->rs_mptid = scan->rs_ptup.t_self; else ItemPointerSetInvalid(&scan->rs_mptid); if (scan->rs_ctup.t_data != NULL) scan->rs_mctid = scan->rs_ctup.t_self; else ItemPointerSetInvalid(&scan->rs_mctid); if (scan->rs_ntup.t_data != NULL) scan->rs_mntid = scan->rs_ntup.t_self; else ItemPointerSetInvalid(&scan->rs_mntid);}/* ---------------- * heap_restrpos - restore position to marked location * * Note: there are bad side effects here. If we were past the end * of a relation when heapmarkpos is called, then if the relation is * extended via insert, then the next call to heaprestrpos will set * cause the added tuples to be visible when the scan continues. * Problems also arise if the TID's are rearranged!!! * * Now pins buffer once for each valid tuple pointer (rs_ptup, * rs_ctup, rs_ntup) referencing it. * - 01/13/94 * * XXX might be better to do direct access instead of * using the generality of heapgettup(). * * XXX It is very possible that when a scan is restored, that a tuple * XXX which previously qualified may fail for time range purposes, unless * XXX some form of locking exists (ie., portals currently can act funny. * ---------------- */voidheap_restrpos(HeapScanDesc scan){ /* ---------------- * increment access statistics * ---------------- */ IncrHeapAccessStat(local_restrpos); IncrHeapAccessStat(global_restrpos); /* XXX no amrestrpos checking that ammarkpos called */ /* Note: no locking manipulations needed */ unpinscan(scan); /* force heapgettup to pin buffer for each loaded tuple */ scan->rs_pbuf = InvalidBuffer; scan->rs_cbuf = InvalidBuffer; scan->rs_nbuf = InvalidBuffer; if (!ItemPointerIsValid(&scan->rs_mptid)) scan->rs_ptup.t_data = NULL; else { scan->rs_ptup.t_self = scan->rs_mptid; scan->rs_ptup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */ heapgettup(scan->rs_rd, &(scan->rs_ptup), 0, &(scan->rs_pbuf), false, 0, (ScanKey) NULL); } if (!ItemPointerIsValid(&scan->rs_mctid)) scan->rs_ctup.t_data = NULL; else { scan->rs_ctup.t_self = scan->rs_mctid; scan->rs_ctup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */ heapgettup(scan->rs_rd, &(scan->rs_ctup), 0, &(scan->rs_cbuf), false, 0, (ScanKey) NULL); } if (!ItemPointerIsValid(&scan->rs_mntid)) scan->rs_ntup.t_data = NULL; else { scan->rs_ntup.t_self = scan->rs_mntid; scan->rs_ntup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */ heapgettup(scan->rs_rd, &(scan->rs_ntup), 0, &scan->rs_nbuf, false, 0, (ScanKey) NULL); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -