📄 xlog.c
字号:
return (RecPtr);}/* * Advance the Insert state to the next buffer page, writing out the next * buffer if it still contains unwritten data. * * The global LogwrtRqst.Write pointer needs to be advanced to include the * just-filled page. If we can do this for free (without an extra lock), * we do so here. Otherwise the caller must do it. We return TRUE if the * request update still needs to be done, FALSE if we did it internally. * * Must be called with WALInsertLock held. */static boolAdvanceXLInsertBuffer(void){ XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlWrite *Write = &XLogCtl->Write; uint16 nextidx = NextBufIdx(Insert->curridx); bool update_needed = true; XLogRecPtr OldPageRqstPtr; XLogwrtRqst WriteRqst; XLogRecPtr NewPageEndPtr; XLogPageHeader NewPage; /* Use Insert->LogwrtResult copy if it's more fresh */ if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write)) LogwrtResult = Insert->LogwrtResult; /* * Get ending-offset of the buffer page we need to replace (this may * be zero if the buffer hasn't been used yet). Fall through if it's * already written out. */ OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* nope, got work to do... */ XLogRecPtr FinishedPageRqstPtr; FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; /* Before waiting, get info_lck and update LogwrtResult */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr)) xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } update_needed = false; /* Did the shared-request update */ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* OK, someone wrote it already */ Insert->LogwrtResult = LogwrtResult; } else { /* Must acquire write lock */ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = Write->LogwrtResult; if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* OK, someone wrote it already */ LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } else { /* * Have to write buffers while holding insert lock. This * is not good, so only write as much as we absolutely * must. */ WriteRqst.Write = OldPageRqstPtr; WriteRqst.Flush.xlogid = 0; WriteRqst.Flush.xrecoff = 0; XLogWrite(WriteRqst); LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } } } /* * Now the next buffer slot is free and we can set it up to be the * next output page. */ NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx]; if (NewPageEndPtr.xrecoff >= XLogFileSize) { /* crossing a logid boundary */ NewPageEndPtr.xlogid += 1; NewPageEndPtr.xrecoff = BLCKSZ; } else NewPageEndPtr.xrecoff += BLCKSZ; XLogCtl->xlblocks[nextidx] = NewPageEndPtr; NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ); Insert->curridx = nextidx; Insert->currpage = NewPage; Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD; /* * Be sure to re-zero the buffer so that bytes beyond what we've * written will look like zeroes and not valid XLOG records... */ MemSet((char *) NewPage, 0, BLCKSZ); /* And fill the new page's header */ NewPage->xlp_magic = XLOG_PAGE_MAGIC; /* NewPage->xlp_info = 0; */ /* done by memset */ NewPage->xlp_sui = ThisStartUpID; NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ; return update_needed;}/* * Write and/or fsync the log at least as far as WriteRqst indicates. * * Must be called with WALWriteLock held. */static voidXLogWrite(XLogwrtRqst WriteRqst){ XLogCtlWrite *Write = &XLogCtl->Write; char *from; bool ispartialpage; bool use_existent; /* * Update local LogwrtResult (caller probably did this already, * but...) */ LogwrtResult = Write->LogwrtResult; while (XLByteLT(LogwrtResult.Write, WriteRqst.Write)) { /* * Make sure we're not ahead of the insert process. This could * happen if we're passed a bogus WriteRqst.Write that is past the * end of the last page that's been initialized by * AdvanceXLInsertBuffer. */ if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, XLogCtl->xlblocks[Write->curridx].xlogid, XLogCtl->xlblocks[Write->curridx].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { /* * Switch to new logfile segment. */ if (openLogFile >= 0) { if (close(openLogFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log file %u, segment %u: %m", openLogId, openLogSeg))); openLogFile = -1; } XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); /* create/use new log file */ use_existent = true; openLogFile = XLogFileInit(openLogId, openLogSeg, &use_existent, true); openLogOff = 0; /* update pg_control, unless someone else already did */ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (ControlFile->logId < openLogId || (ControlFile->logId == openLogId && ControlFile->logSeg < openLogSeg + 1)) { ControlFile->logId = openLogId; ControlFile->logSeg = openLogSeg + 1; ControlFile->time = time(NULL); UpdateControlFile(); /* * Signal postmaster to start a checkpoint if it's been * too long since the last one. (We look at local copy of * RedoRecPtr which might be a little out of date, but * should be close enough for this purpose.) */ if (IsUnderPostmaster && (openLogId != RedoRecPtr.xlogid || openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) + (uint32) CheckPointSegments)) { if (XLOG_DEBUG) elog(LOG, "time for a checkpoint, signaling postmaster"); SendPostmasterSignal(PMSIGNAL_DO_CHECKPOINT); } } LWLockRelease(ControlFileLock); } if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg, false); openLogOff = 0; } /* Need to seek in the file? */ if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize) { openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not seek in log file %u, segment %u to offset %u: %m", openLogId, openLogSeg, openLogOff))); } /* OK to write the page */ from = XLogCtl->pages + Write->curridx * BLCKSZ; errno = 0; if (write(openLogFile, from, BLCKSZ) != BLCKSZ) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to log file %u, segment %u at offset %u: %m", openLogId, openLogSeg, openLogOff))); } openLogOff += BLCKSZ; /* * If we just wrote the whole last page of a logfile segment, * fsync the segment immediately. This avoids having to go back * and re-open prior segments when an fsync request comes along * later. Doing it here ensures that one and only one backend will * perform this fsync. */ if (openLogOff >= XLogSegSize && !ispartialpage) { issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ } if (ispartialpage) { /* Only asked to write a partial page */ LogwrtResult.Write = WriteRqst.Write; break; } Write->curridx = NextBufIdx(Write->curridx); } /* * If asked to flush, do so */ if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) && XLByteLT(LogwrtResult.Flush, LogwrtResult.Write)) { /* * Could get here without iterating above loop, in which case we * might have no open file or the wrong one. However, we do not * need to fsync more than one file. */ if (sync_method != SYNC_METHOD_OPEN) { if (openLogFile >= 0 && !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { if (close(openLogFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log file %u, segment %u: %m", openLogId, openLogSeg))); openLogFile = -1; } if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg, false); openLogOff = 0; } issue_xlog_fsync(); } LogwrtResult.Flush = LogwrtResult.Write; } /* * Update shared-memory status * * We make sure that the shared 'request' values do not fall behind the * 'result' values. This is not absolutely essential, but it saves * some code in a couple of places. */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); xlogctl->LogwrtResult = LogwrtResult; if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write)) xlogctl->LogwrtRqst.Write = LogwrtResult.Write; if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush)) xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } Write->LogwrtResult = LogwrtResult;}/* * Ensure that all XLOG data through the given position is flushed to disk. * * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not * already held, and we try to avoid acquiring it if possible. */voidXLogFlush(XLogRecPtr record){ XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; /* Disabled during REDO */ if (InRedo) return; /* Quick exit if already known flushed */ if (XLByteLE(record, LogwrtResult.Flush)) return; if (XLOG_DEBUG) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", record.xlogid, record.xrecoff, LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff); START_CRIT_SECTION(); /* * Since fsync is usually a horribly expensive operation, we try to * piggyback as much data as we can on each fsync: if we see any more * data entered into the xlog buffer, we'll write and fsync that too, * so that the final value of LogwrtResult.Flush is as large as * possible. This gives us some chance of avoiding another fsync * immediately after. */ /* initialize to given target; may increase below */ WriteRqstPtr = record; /* read LogwrtResult and update local state */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write)) WriteRqstPtr = xlogctl->LogwrtRqst.Write; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } /* done already? */ if (!XLByteLE(record, LogwrtResult.Flush)) { /* now wait for the write lock */ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->Write.LogwrtResult; if (!XLByteLE(record, LogwrtResult.Flush)) { /* try to write/flush later additions to XLOG as well */ if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE)) { XLogCtlInsert *Insert = &XLogCtl->Insert; uint32 freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) /* buffer is full */ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; else { WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; WriteRqstPtr.xrecoff -= freespace; } LWLockRelease(WALInsertLock); WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = WriteRqstPtr; } else { WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = record; } XLogWrite(WriteRqst); } LWLockRelease(WALWriteLock); } END_CRIT_SECTION(); /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of * XLOG. This has been seen to occur when a disk page has a corrupted * LSN. * * Formerly we treated this as a PANIC condition, but that hurts the * system's robustness rather than helping it: we do not want to take * down the whole system due to corruption on one data page. In * particular, if the bad page is encountered again during recovery * then we would be unable to restart the database at all! (This * scenario has actually happened in the field several times with 7.1 * releases. Note that we cannot get here while InRedo is true, but if * the bad page is brought in and marked dirty during recovery then * CreateCheckpoint will try to flush it at the end of recovery.) * * The current approach is to ERROR under normal conditions, but only * WARNING during recovery, so that the system can be brought up even * if there's a corrupt LSN. Note that for calls from xact.c, the * ERROR will be promoted to PANIC since xact.c calls this routine * inside a critical section. However, calls from bufmgr.c are not * within critical sections and so we will not force a restart for a
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -