📄 xlog.c
字号:
* may stop at any convenient boundary (such as a cache or logfile boundary). * This option allows us to avoid uselessly issuing multiple writes when a * single one would do. * * Must be called with WALWriteLock held. */static voidXLogWrite(XLogwrtRqst WriteRqst, bool flexible){ XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; bool finishing_seg; bool use_existent; int curridx; int npages; int startidx; uint32 startoffset; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); /* * Update local LogwrtResult (caller probably did this already, but...) */ LogwrtResult = Write->LogwrtResult; /* * Since successive pages in the xlog cache are consecutively allocated, * we can usually gather multiple pages together and issue just one * write() call. npages is the number of pages we have determined can be * written together; startidx is the cache block index of the first one, * and startoffset is the file offset at which it should go. The latter * two variables are only valid when npages > 0, but we must initialize * all of them to keep the compiler quiet. */ npages = 0; startidx = 0; startoffset = 0; /* * Within the loop, curridx is the cache block index of the page to * consider writing. We advance Write->curridx only after successfully * writing pages. (Right now, this refinement is useless since we are * going to PANIC if any error occurs anyway; but someday it may come in * useful.) */ curridx = Write->curridx; while (XLByteLT(LogwrtResult.Write, WriteRqst.Write)) { /* * Make sure we're not ahead of the insert process. This could happen * if we're passed a bogus WriteRqst.Write that is past the end of the * last page that's been initialized by AdvanceXLInsertBuffer. */ if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, XLogCtl->xlblocks[curridx].xlogid, XLogCtl->xlblocks[curridx].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ LogwrtResult.Write = XLogCtl->xlblocks[curridx]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { /* * Switch to new logfile segment. We cannot have any pending * pages here (since we dump what we have at segment end). */ Assert(npages == 0); if (openLogFile >= 0) { if (close(openLogFile)) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log file %u, segment %u: %m", openLogId, openLogSeg))); openLogFile = -1; } XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); /* create/use new log file */ use_existent = true; openLogFile = XLogFileInit(openLogId, openLogSeg, &use_existent, true); openLogOff = 0; /* update pg_control, unless someone else already did */ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (ControlFile->logId < openLogId || (ControlFile->logId == openLogId && ControlFile->logSeg < openLogSeg + 1)) { ControlFile->logId = openLogId; ControlFile->logSeg = openLogSeg + 1; ControlFile->time = time(NULL); UpdateControlFile(); /* * Signal bgwriter to start a checkpoint if it's been too long * since the last one. (We look at local copy of RedoRecPtr * which might be a little out of date, but should be close * enough for this purpose.) * * A straight computation of segment number could overflow 32 * bits. Rather than assuming we have working 64-bit * arithmetic, we compare the highest-order bits separately, * and force a checkpoint immediately when they change. */ if (IsUnderPostmaster) { uint32 old_segno, new_segno; uint32 old_highbits, new_highbits; old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile + (RedoRecPtr.xrecoff / XLogSegSize); old_highbits = RedoRecPtr.xlogid / XLogSegSize; new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg; new_highbits = openLogId / XLogSegSize; if (new_highbits != old_highbits || new_segno >= old_segno + (uint32) CheckPointSegments) {#ifdef WAL_DEBUG if (XLOG_DEBUG) elog(LOG, "time for a checkpoint, signaling bgwriter");#endif RequestCheckpoint(false, true); } } } LWLockRelease(ControlFileLock); } /* Make sure we have the current logfile open */ if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } /* Add current page to the set of pending pages-to-dump */ if (npages == 0) { /* first of group */ startidx = curridx; startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; } npages++; /* * Dump the set if this will be the last loop iteration, or if we are * at the last page of the cache area (since the next page won't be * contiguous in memory), or if we are at the end of the logfile * segment. */ finishing_seg = !ispartialpage && (startoffset + npages * BLCKSZ) >= XLogSegSize; if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) || curridx == XLogCtl->XLogCacheBlck || finishing_seg) { char *from; Size nbytes; /* Need to seek in the file? */ if (openLogOff != startoffset) { if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not seek in log file %u, " "segment %u to offset %u: %m", openLogId, openLogSeg, startoffset))); openLogOff = startoffset; } /* OK to write the page(s) */ from = XLogCtl->pages + startidx * (Size) BLCKSZ; nbytes = npages * (Size) BLCKSZ; errno = 0; if (write(openLogFile, from, nbytes) != nbytes) { /* if write didn't set errno, assume no disk space */ if (errno == 0) errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to log file %u, segment %u " "at offset %u, length %lu: %m", openLogId, openLogSeg, openLogOff, (unsigned long) nbytes))); } /* Update state for write */ openLogOff += nbytes; Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx); npages = 0; /* * If we just wrote the whole last page of a logfile segment, * fsync the segment immediately. This avoids having to go back * and re-open prior segments when an fsync request comes along * later. Doing it here ensures that one and only one backend will * perform this fsync. * * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage. */ if (finishing_seg) { issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) XLogArchiveNotifySeg(openLogId, openLogSeg); } } if (ispartialpage) { /* Only asked to write a partial page */ LogwrtResult.Write = WriteRqst.Write; break; } curridx = NextBufIdx(curridx); /* If flexible, break out of loop as soon as we wrote something */ if (flexible && npages == 0) break; } Assert(npages == 0); Assert(curridx == Write->curridx); /* * If asked to flush, do so */ if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) && XLByteLT(LogwrtResult.Flush, LogwrtResult.Write)) { /* * Could get here without iterating above loop, in which case we might * have no open file or the wrong one. However, we do not need to * fsync more than one file. */ if (sync_method != SYNC_METHOD_OPEN) { if (openLogFile >= 0 && !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { if (close(openLogFile)) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log file %u, segment %u: %m", openLogId, openLogSeg))); openLogFile = -1; } if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } issue_xlog_fsync(); } LogwrtResult.Flush = LogwrtResult.Write; } /* * Update shared-memory status * * We make sure that the shared 'request' values do not fall behind the * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); xlogctl->LogwrtResult = LogwrtResult; if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write)) xlogctl->LogwrtRqst.Write = LogwrtResult.Write; if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush)) xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } Write->LogwrtResult = LogwrtResult;}/* * Ensure that all XLOG data through the given position is flushed to disk. * * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not * already held, and we try to avoid acquiring it if possible. */voidXLogFlush(XLogRecPtr record){ XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; /* Disabled during REDO */ if (InRedo) return; /* Quick exit if already known flushed */ if (XLByteLE(record, LogwrtResult.Flush)) return;#ifdef WAL_DEBUG if (XLOG_DEBUG) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", record.xlogid, record.xrecoff, LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);#endif START_CRIT_SECTION(); /* * Since fsync is usually a horribly expensive operation, we try to * piggyback as much data as we can on each fsync: if we see any more data * entered into the xlog buffer, we'll write and fsync that too, so that * the final value of LogwrtResult.Flush is as large as possible. This * gives us some chance of avoiding another fsync immediately after. */ /* initialize to given target; may increase below */ WriteRqstPtr = record; /* read LogwrtResult and update local state */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write)) WriteRqstPtr = xlogctl->LogwrtRqst.Write; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } /* done already? */ if (!XLByteLE(record, LogwrtResult.Flush)) { /* now wait for the write lock */ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->Write.LogwrtResult; if (!XLByteLE(record, LogwrtResult.Flush)) { /* try to write/flush later additions to XLOG as well */ if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE)) { XLogCtlInsert *Insert = &XLogCtl->Insert; uint32 freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) /* buffer is full */ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; else { WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; WriteRqstPtr.xrecoff -= freespace; } LWLockRelease(WALInsertLock); WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = WriteRqstPtr; } else { WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = record; } XLogWrite(WriteRqst, false); } LWLockRelease(WALWriteLock); } END_CRIT_SECTION(); /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. * This has been seen to occur when a disk page has a corrupted LSN. * * Formerly we treated this as a PANIC condition, but that hurts the * system's robustness rather than helping it: we do not want to take down * the whole system due to corruption on one data page. In particular, if * the bad page is encountered again during recovery then we would be * unable to restart the database at all! (This scenario has actually * happened in the field several times with 7.1 releases. Note that we * cannot get here while InRedo is true, but if the bad page is brought in * and marked dirty during recovery then CreateCheckPoint will try to * flush it at the end of recovery.) * * The current approach is to ERROR under normal conditions, but only * WARNING during recovery, so that the system can be brought up even if * there's a corrupt LSN. Note that for calls from xact.c, the ERROR will * be promoted to PANIC since xact.c calls this routine inside a critical * section. However, calls from bufmgr.c are not within critical sections * and so we will not force a restart for a bad LSN on a data page. */ if (XLByteLT(LogwrtResult.Flush, record)) elog(InRecovery ? WARNING : ERROR, "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", record.xlogid, record.xrecoff, LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);}/* * Create a new XLOG file segment, or open a pre-existing one. * * log, seg: identify segment to be created/opened. * * *use_existent: if TRUE, OK to use a pre-existing file (else, any * pre-existing file will be deleted). On return, TRUE if a pre-existing * file was used.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -