📄 bufmgr.c
字号:
/* Make sure we can handle the pin inside the loop */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); for (i = 0; i < NBuffers; i++) { bufHdr = &BufferDescriptors[i]; LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(bufHdr->content_lock, LW_SHARED); FlushBuffer(bufHdr, rel->rd_smgr); LWLockRelease(bufHdr->content_lock); UnpinBuffer(bufHdr, true, false /* no freelist change */ ); } else UnlockBufHdr(bufHdr); }}/* * ReleaseBuffer -- remove the pin on a buffer without * marking it dirty. */voidReleaseBuffer(Buffer buffer){ volatile BufferDesc *bufHdr; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer id: %d", buffer); ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); if (BufferIsLocal(buffer)) { Assert(LocalRefCount[-buffer - 1] > 0); bufHdr = &LocalBufferDescriptors[-buffer - 1]; LocalRefCount[-buffer - 1]--; if (LocalRefCount[-buffer - 1] == 0 && bufHdr->usage_count < BM_MAX_USAGE_COUNT) bufHdr->usage_count++; return; } bufHdr = &BufferDescriptors[buffer - 1]; Assert(PrivateRefCount[buffer - 1] > 0); if (PrivateRefCount[buffer - 1] > 1) PrivateRefCount[buffer - 1]--; else UnpinBuffer(bufHdr, false, true);}/* * IncrBufferRefCount * Increment the pin count on a buffer that we have *already* pinned * at least once. * * This function cannot be used on a buffer we do not have pinned, * because it doesn't change the shared buffer state. */voidIncrBufferRefCount(Buffer buffer){ Assert(BufferIsPinned(buffer)); ResourceOwnerEnlargeBuffers(CurrentResourceOwner); ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer); if (BufferIsLocal(buffer)) LocalRefCount[-buffer - 1]++; else PrivateRefCount[buffer - 1]++;}/* * SetBufferCommitInfoNeedsSave * * Mark a buffer dirty when we have updated tuple commit-status bits in it. * * This is essentially the same as WriteNoReleaseBuffer. We preserve the * distinction as a way of documenting that the caller has not made a critical * data change --- the status-bit update could be redone by someone else just * as easily. Therefore, no WAL log record need be generated, whereas calls * to WriteNoReleaseBuffer really ought to be associated with a WAL-entry- * creating action. * * This routine might get called many times on the same page, if we are making * the first scan after commit of an xact that added/deleted many tuples. * So, be as quick as we can if the buffer is already dirty. We do this by * not acquiring spinlock if it looks like the status bits are already OK. * (Note it is okay if someone else clears BM_JUST_DIRTIED immediately after * we look, because the buffer content update is already done and will be * reflected in the I/O.) */voidSetBufferCommitInfoNeedsSave(Buffer buffer){ volatile BufferDesc *bufHdr; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer id: %d", buffer); if (BufferIsLocal(buffer)) { WriteLocalBuffer(buffer, false); return; } bufHdr = &BufferDescriptors[buffer - 1]; Assert(PrivateRefCount[buffer - 1] > 0); if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { LockBufHdr(bufHdr); Assert(bufHdr->refcount > 0); bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr(bufHdr); }}/* * Release buffer content locks for shared buffers. * * Used to clean up after errors. * * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care * of releasing buffer content locks per se; the only thing we need to deal * with here is clearing any PIN_COUNT request that was in progress. */voidUnlockBuffers(void){ volatile BufferDesc *buf = PinCountWaitBuf; if (buf) { HOLD_INTERRUPTS(); /* don't want to die() partway through... */ LockBufHdr_NoHoldoff(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) buf->flags &= ~BM_PIN_COUNT_WAITER; UnlockBufHdr_NoHoldoff(buf); ProcCancelWaitForSignal(); PinCountWaitBuf = NULL; RESUME_INTERRUPTS(); }}/* * Acquire or release the content_lock for the buffer. */voidLockBuffer(Buffer buffer, int mode){ volatile BufferDesc *buf; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) return; buf = &(BufferDescriptors[buffer - 1]); if (mode == BUFFER_LOCK_UNLOCK) LWLockRelease(buf->content_lock); else if (mode == BUFFER_LOCK_SHARE) LWLockAcquire(buf->content_lock, LW_SHARED); else if (mode == BUFFER_LOCK_EXCLUSIVE) { LWLockAcquire(buf->content_lock, LW_EXCLUSIVE); /* * This is not the best place to mark buffer dirty (eg indices do not * always change buffer they lock in excl mode). But please remember * that it's critical to set dirty bit *before* logging changes with * XLogInsert() - see comments in SyncOneBuffer(). */ LockBufHdr_NoHoldoff(buf); buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr_NoHoldoff(buf); } else elog(ERROR, "unrecognized buffer lock mode: %d", mode);}/* * Acquire the content_lock for the buffer, but only if we don't have to wait. * * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode. */boolConditionalLockBuffer(Buffer buffer){ volatile BufferDesc *buf; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) return true; /* act as though we got it */ buf = &(BufferDescriptors[buffer - 1]); if (LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE)) { /* * This is not the best place to mark buffer dirty (eg indices do not * always change buffer they lock in excl mode). But please remember * that it's critical to set dirty bit *before* logging changes with * XLogInsert() - see comments in SyncOneBuffer(). */ LockBufHdr_NoHoldoff(buf); buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED); UnlockBufHdr_NoHoldoff(buf); return true; } return false;}/* * LockBufferForCleanup - lock a buffer in preparation for deleting items * * Items may be deleted from a disk page only when the caller (a) holds an * exclusive lock on the buffer and (b) has observed that no other backend * holds a pin on the buffer. If there is a pin, then the other backend * might have a pointer into the buffer (for example, a heapscan reference * to an item --- see README for more details). It's OK if a pin is added * after the cleanup starts, however; the newly-arrived backend will be * unable to look at the page until we release the exclusive lock. * * To implement this protocol, a would-be deleter must pin the buffer and * then call LockBufferForCleanup(). LockBufferForCleanup() is similar to * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until * it has successfully observed pin count = 1. */voidLockBufferForCleanup(Buffer buffer){ volatile BufferDesc *bufHdr; Assert(BufferIsValid(buffer)); Assert(PinCountWaitBuf == NULL); if (BufferIsLocal(buffer)) { /* There should be exactly one pin */ if (LocalRefCount[-buffer - 1] != 1) elog(ERROR, "incorrect local pin count: %d", LocalRefCount[-buffer - 1]); /* Nobody else to wait for */ return; } /* There should be exactly one local pin */ if (PrivateRefCount[buffer - 1] != 1) elog(ERROR, "incorrect local pin count: %d", PrivateRefCount[buffer - 1]); bufHdr = &BufferDescriptors[buffer - 1]; for (;;) { /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBufHdr_NoHoldoff(bufHdr); Assert(bufHdr->refcount > 0); if (bufHdr->refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ UnlockBufHdr_NoHoldoff(bufHdr); return; } /* Failed, so mark myself as waiting for pincount 1 */ if (bufHdr->flags & BM_PIN_COUNT_WAITER) { UnlockBufHdr_NoHoldoff(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; UnlockBufHdr_NoHoldoff(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Wait to be signaled by UnpinBuffer() */ ProcWaitForSignal(); PinCountWaitBuf = NULL; /* Loop back and try again */ }}/* * Functions for buffer I/O handling * * Note: We assume that nested buffer I/O never occurs. * i.e at most one io_in_progress lock is held per proc. * * Also note that these are used only for shared buffers, not local ones. *//* * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared. */static voidWaitIO(volatile BufferDesc *buf){ /* * Changed to wait until there's no IO - Inoue 01/13/2000 * * Note this is *necessary* because an error abort in the process doing * I/O could release the io_in_progress_lock prematurely. See * AbortBufferIO. */ for (;;) { BufFlags sv_flags; /* * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ LockBufHdr(buf); sv_flags = buf->flags; UnlockBufHdr(buf); if (!(sv_flags & BM_IO_IN_PROGRESS)) break; LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); LWLockRelease(buf->io_in_progress_lock); }}/* * StartBufferIO: begin I/O on this buffer * (Assumptions) * My process is executing no IO * The buffer is Pinned * * In some scenarios there are race conditions in which multiple backends * could attempt the same I/O operation concurrently. If someone else * has already started I/O on this buffer then we will block on the * io_in_progress lock until he's done. * * Input operations are only attempted on buffers that are not BM_VALID, * and output operations only on buffers that are BM_VALID and BM_DIRTY, * so we can always tell if the work is already done. * * Returns TRUE if we successfully marked the buffer as I/O busy, * FALSE if someone else already did the work. */static boolStartBufferIO(volatile BufferDesc *buf, bool forInput){ Assert(!InProgressBuf); for (;;) { /* * Grab the io_in_progress lock so that other processes can wait for * me to finish the I/O. */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); /* NoHoldoff is OK since we now have an LWLock */ LockBufHdr_NoHoldoff(buf); if (!(buf->flags & BM_IO_IN_PROGRESS)) break; /* * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress * lock isn't held is if the process doing the I/O is recovering from * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ UnlockBufHdr_NoHoldoff(buf); LWLockRelease(buf->io_in_progress_lock); WaitIO(buf); } /* Once we get here, there is definitely no I/O active on this buffer */ if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr_NoHoldoff(buf); LWLockRelease(buf->io_in_progress_lock); return false; } buf->flags |= BM_IO_IN_PROGRESS; UnlockBufHdr_NoHoldoff(buf); InProgressBuf = buf; IsForInput = forInput; return true;}/* * TerminateBufferIO: release a buffer we were doing I/O on * (Assumptions) * My process is executing IO for the buffer * BM_IO_IN_PROGRESS bit is set for the buffer * We hold the buffer's io_in_progress lock * The buffer is Pinned * * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the * buffer's BM_DIRTY flag. This is appropriate when terminating a * successful write. The check on BM_JUST_DIRTIED is necessary to avoid * marking the buffer clean if it was re-dirtied while we were writing. * * set_flag_bits gets ORed into the buffer's flags. It must include * BM_IO_ERROR in a failure case. For successful completion it could * be 0, or BM_VALID if we just finished reading in the page. */static voidTerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits){ Assert(buf == InProgressBuf); /* NoHoldoff is OK since we must have an LWLock */ LockBufHdr_NoHoldoff(buf); Assert(buf->flags & BM_IO_IN_PROGRESS); buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) buf->flags &= ~BM_DIRTY; buf->flags |= set_flag_bits; UnlockBufHdr_NoHoldoff(buf); InProgressBuf = NULL; LWLockRelease(buf->io_in_progress_lock);}/* * AbortBufferIO: Clean up any active buffer I/O after an error. * * All LWLocks we might have held have been released, * but we haven't yet released buffer pins, so the buffer is still pinned. * * If I/O was in progress, we always set BM_IO_ERROR, even though it's * possible the error condition wasn't related to the I/O. */voidAbortBufferIO(void){ volatile BufferDesc *buf = InProgressBuf; if (buf) { /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that * we can use TerminateBufferIO. Anyone who's executing WaitIO on the * buffer will be in a busy spin until we succeed in doing this. */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); /* NoHoldoff is OK since we now have an LWLock */ LockBufHdr_NoHoldoff(buf); Assert(buf->flags & BM_IO_IN_PROGRESS); if (IsForInput) { Assert(!(buf->flags & BM_DIRTY)); /* We'd better not think buffer is valid yet */ Assert(!(buf->flags & BM_VALID)); UnlockBufHdr_NoHoldoff(buf); } else { BufFlags sv_flags; sv_flags = buf->flags; Assert(sv_flags & BM_DIRTY); UnlockBufHdr_NoHoldoff(buf); /* Issue notice if this is not the first failure... */ if (sv_flags & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ ereport(WARNING, (errcode(ERRCODE_IO_ERROR), errmsg("could not write block %u of %u/%u/%u", buf->tag.blockNum, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode), errdetail("Multiple failures --- write error may be permanent."))); } } TerminateBufferIO(buf, false, BM_IO_ERROR); }}/* * Error context callback for errors occurring during buffer writes. */static voidbuffer_write_error_callback(void *arg){ volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg; /* Buffer is pinned, so we can read the tag without locking the spinlock */ if (bufHdr != NULL) errcontext("writing block %u of relation %u/%u/%u", bufHdr->tag.blockNum, bufHdr->tag.rnode.spcNode, bufHdr->tag.rnode.dbNode, bufHdr->tag.rnode.relNode);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -