📄 bufmgr.c
字号:
if (!BufTableDelete(buf)) { LWLockRelease(BufMgrLock); elog(FATAL, "buffer wasn't in the buffer hash table"); } INIT_BUFFERTAG(&(buf->tag), reln, blockNum); if (!BufTableInsert(buf)) { LWLockRelease(BufMgrLock); elog(FATAL, "buffer in buffer hash table twice"); } /* * Buffer contents are currently invalid. Have to mark IO IN PROGRESS * so no one fiddles with them until the read completes. If this * routine has been called simply to allocate a buffer, no io will be * attempted, so the flag isnt set. */ if (!inProgress) StartBufferIO(buf, true); else ContinueBufferIO(buf, true);#ifdef BMTRACE _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);#endif /* BMTRACE */ LWLockRelease(BufMgrLock); return buf;}/* * write_buffer -- common functionality for * WriteBuffer and WriteNoReleaseBuffer */static voidwrite_buffer(Buffer buffer, bool release){ BufferDesc *bufHdr; if (BufferIsLocal(buffer)) { WriteLocalBuffer(buffer, release); return; } if (BAD_BUFFER_ID(buffer)) elog(ERROR, "bad buffer id: %d", buffer); bufHdr = &BufferDescriptors[buffer - 1]; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(bufHdr->refcount > 0); bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); if (release) UnpinBuffer(bufHdr); LWLockRelease(BufMgrLock);}/* * WriteBuffer * * Marks buffer contents as dirty (actual write happens later). * * Assume that buffer is pinned. Assume that reln is * valid. * * Side Effects: * Pin count is decremented. */#undef WriteBuffervoidWriteBuffer(Buffer buffer){ write_buffer(buffer, true);}/* * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer * when the operation is complete. */voidWriteNoReleaseBuffer(Buffer buffer){ write_buffer(buffer, false);}#undef ReleaseAndReadBuffer/* * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer() * to save a lock release/acquire. * * Also, if the passed buffer is valid and already contains the desired block * number, we simply return it without ever acquiring the lock at all. * Since the passed buffer must be pinned, it's OK to examine its block * number without getting the lock first. * * Note: it is OK to pass buffer = InvalidBuffer, indicating that no old * buffer actually needs to be released. This case is the same as ReadBuffer, * but can save some tests in the caller. * * Also note: while it will work to call this routine with blockNum == P_NEW, * it's best to avoid doing so, since that would result in calling * smgrnblocks() while holding the bufmgr lock, hence some loss of * concurrency. */BufferReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum){ BufferDesc *bufHdr; if (BufferIsValid(buffer)) { if (BufferIsLocal(buffer)) { Assert(LocalRefCount[-buffer - 1] > 0); bufHdr = &LocalBufferDescriptors[-buffer - 1]; if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) return buffer; LocalRefCount[-buffer - 1]--; } else { Assert(PrivateRefCount[buffer - 1] > 0); bufHdr = &BufferDescriptors[buffer - 1]; if (bufHdr->tag.blockNum == blockNum && RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node)) return buffer; if (PrivateRefCount[buffer - 1] > 1) PrivateRefCount[buffer - 1]--; else { LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); UnpinBuffer(bufHdr); return ReadBufferInternal(relation, blockNum, true); } } } return ReadBufferInternal(relation, blockNum, false);}/* * BufferSync -- Write all dirty buffers in the pool. * * This is called at checkpoint time and writes out all dirty shared buffers. */voidBufferSync(void){ int i; BufferDesc *bufHdr; ErrorContextCallback errcontext; /* Setup error traceback support for ereport() */ errcontext.callback = buffer_write_error_callback; errcontext.arg = NULL; errcontext.previous = error_context_stack; error_context_stack = &errcontext; for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++) { Buffer buffer; int status; RelFileNode rnode; XLogRecPtr recptr; Relation reln; errcontext.arg = bufHdr; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); if (!(bufHdr->flags & BM_VALID)) { LWLockRelease(BufMgrLock); continue; } /* * We can check bufHdr->cntxDirty here *without* holding any lock * on buffer context as long as we set this flag in access methods * *before* logging changes with XLogInsert(): if someone will set * cntxDirty just after our check we don't worry because of our * checkpoint.redo points before log record for upcoming changes * and so we are not required to write such dirty buffer. */ if (!(bufHdr->flags & BM_DIRTY) && !(bufHdr->cntxDirty)) { LWLockRelease(BufMgrLock); continue; } /* * IO synchronization. Note that we do it with unpinned buffer to * avoid conflicts with FlushRelationBuffers. */ if (bufHdr->flags & BM_IO_IN_PROGRESS) { WaitIO(bufHdr); if (!(bufHdr->flags & BM_VALID) || (!(bufHdr->flags & BM_DIRTY) && !(bufHdr->cntxDirty))) { LWLockRelease(BufMgrLock); continue; } } /* * Here: no one doing IO for this buffer and it's dirty. Pin * buffer now and set IO state for it *before* acquiring shlock to * avoid conflicts with FlushRelationBuffers. */ PinBuffer(bufHdr); StartBufferIO(bufHdr, false); /* output IO start */ buffer = BufferDescriptorGetBuffer(bufHdr); rnode = bufHdr->tag.rnode; LWLockRelease(BufMgrLock); /* * Try to find relation for buffer */ reln = RelationNodeCacheGetRelation(rnode); /* * Protect buffer content against concurrent update */ LockBuffer(buffer, BUFFER_LOCK_SHARE); /* * Force XLOG flush for buffer' LSN */ recptr = BufferGetLSN(bufHdr); XLogFlush(recptr); /* * Now it's safe to write buffer to disk. Note that no one else * should not be able to write it while we were busy with locking * and log flushing because of we setted IO flag. */ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); Assert(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty); bufHdr->flags &= ~BM_JUST_DIRTIED; LWLockRelease(BufMgrLock); if (reln == (Relation) NULL) { status = smgrblindwrt(DEFAULT_SMGR, bufHdr->tag.rnode, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); } else { status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); } if (status == SM_FAIL) /* disk failure ?! */ ereport(PANIC, (errcode(ERRCODE_IO_ERROR), errmsg("could not write block %u of %u/%u", bufHdr->tag.blockNum, bufHdr->tag.rnode.tblNode, bufHdr->tag.rnode.relNode))); /* * Note that it's safe to change cntxDirty here because of we * protect it from upper writers by share lock and from other * bufmgr routines by BM_IO_IN_PROGRESS */ bufHdr->cntxDirty = false; /* * Release the per-buffer readlock, reacquire BufMgrLock. */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); BufferFlushCount++; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); bufHdr->flags &= ~BM_IO_IN_PROGRESS; /* mark IO finished */ TerminateBufferIO(bufHdr); /* Sync IO finished */ /* * If this buffer was marked by someone as DIRTY while we were * flushing it out we must not clear DIRTY flag - vadim 01/17/97 */ if (!(bufHdr->flags & BM_JUST_DIRTIED)) bufHdr->flags &= ~BM_DIRTY; UnpinBuffer(bufHdr); LWLockRelease(BufMgrLock); /* drop refcnt obtained by RelationNodeCacheGetRelation */ if (reln != (Relation) NULL) RelationDecrementReferenceCount(reln); } /* Pop the error context stack */ error_context_stack = errcontext.previous;}/* * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared. * * Should be entered with buffer manager lock held; releases it before * waiting and re-acquires it afterwards. */static voidWaitIO(BufferDesc *buf){ /* * Changed to wait until there's no IO - Inoue 01/13/2000 * * Note this is *necessary* because an error abort in the process doing * I/O could release the io_in_progress_lock prematurely. See * AbortBufferIO. */ while ((buf->flags & BM_IO_IN_PROGRESS) != 0) { LWLockRelease(BufMgrLock); LWLockAcquire(buf->io_in_progress_lock, LW_SHARED); LWLockRelease(buf->io_in_progress_lock); LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); }}long NDirectFileRead; /* some I/O's are direct file access. * bypass bufmgr */long NDirectFileWrite; /* e.g., I/O in psort and hashjoin. *//* * Return a palloc'd string containing buffer usage statistics. */char *ShowBufferUsage(void){ StringInfoData str; float hitrate; float localhitrate; initStringInfo(&str); if (ReadBufferCount == 0) hitrate = 0.0; else hitrate = (float) BufferHitCount *100.0 / ReadBufferCount; if (ReadLocalBufferCount == 0) localhitrate = 0.0; else localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount; appendStringInfo(&str, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n", ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate); appendStringInfo(&str, "!\tLocal blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n", ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate); appendStringInfo(&str, "!\tDirect blocks: %10ld read, %10ld written\n", NDirectFileRead, NDirectFileWrite); return str.data;}voidResetBufferUsage(void){ BufferHitCount = 0; ReadBufferCount = 0; BufferFlushCount = 0; LocalBufferHitCount = 0; ReadLocalBufferCount = 0; LocalBufferFlushCount = 0; NDirectFileRead = 0; NDirectFileWrite = 0;}/* * AtEOXact_Buffers - clean up at end of transaction. * * During abort, we need to release any buffer pins we're holding * (this cleans up in case ereport interrupted a routine that pins a * buffer). During commit, we shouldn't need to do that, but check * anyway to see if anyone leaked a buffer reference count. */voidAtEOXact_Buffers(bool isCommit){ int i; for (i = 0; i < NBuffers; i++) { if (PrivateRefCount[i] != 0) { BufferDesc *buf = &(BufferDescriptors[i]); if (isCommit) elog(WARNING, "buffer refcount leak: [%03d] (freeNext=%d, freePrev=%d, " "rel=%u/%u, blockNum=%u, flags=0x%x, refcount=%d %ld)", i, buf->freeNext, buf->freePrev, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); PrivateRefCount[i] = 1; /* make sure we release shared pin */ LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); UnpinBuffer(buf); LWLockRelease(BufMgrLock); Assert(PrivateRefCount[i] == 0); } } AtEOXact_LocalBuffers(isCommit);}/* * FlushBufferPool * * Flush all dirty blocks in buffer pool to disk at the checkpoint time. * Local relations do not participate in checkpoints, so they don't need to be * flushed. */voidFlushBufferPool(void){ BufferSync(); smgrsync();}/* * Do whatever is needed to prepare for commit at the bufmgr and smgr levels */voidBufmgrCommit(void){ /* Nothing to do in bufmgr anymore... */ smgrcommit();}/* * BufferGetBlockNumber * Returns the block number associated with a buffer. * * Note: * Assumes that the buffer is valid and pinned, else the * value may be obsolete immediately... */BlockNumberBufferGetBlockNumber(Buffer buffer){ Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) return LocalBufferDescriptors[-buffer - 1].tag.blockNum; else return BufferDescriptors[buffer - 1].tag.blockNum;}/* * BufferReplace * * Write out the buffer corresponding to 'bufHdr' * * BufMgrLock must be held at entry, and the buffer must be pinned. */static intBufferReplace(BufferDesc *bufHdr){ Relation reln; XLogRecPtr recptr; int status; ErrorContextCallback errcontext; /* To check if block content changed while flushing. - vadim 01/17/97 */ bufHdr->flags &= ~BM_JUST_DIRTIED; LWLockRelease(BufMgrLock); /* Setup error traceback support for ereport() */ errcontext.callback = buffer_write_error_callback; errcontext.arg = bufHdr; errcontext.previous = error_context_stack; error_context_stack = &errcontext; /* * No need to lock buffer context - no one should be able to end * ReadBuffer */ recptr = BufferGetLSN(bufHdr); XLogFlush(recptr); reln = RelationNodeCacheGetRelation(bufHdr->tag.rnode); if (reln != (Relation) NULL) { status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); } else { status = smgrblindwrt(DEFAULT_SMGR, bufHdr->tag.rnode, bufHdr->tag.blockNum, (char *) MAKE_PTR(bufHdr->data)); } /* drop relcache refcnt incremented by RelationNodeCacheGetRelation */ if (reln != (Relation) NULL) RelationDecrementReferenceCount(reln); /* Pop the error context stack */ error_context_stack = errcontext.previous; LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); if (status == SM_FAIL) return FALSE; BufferFlushCount++; return TRUE;}/* * RelationGetNumberOfBlocks * Determines the current number of pages in the relation. * Side effect: relation->rd_nblocks is updated. */BlockNumberRelationGetNumberOfBlocks(Relation relation){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -