📄 xlog.c
字号:
LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } } } /* * Now the next buffer slot is free and we can set it up to be the next * output page. */ NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx]; if (new_segment) { /* force it to a segment start point */ NewPageEndPtr.xrecoff += XLogSegSize - 1; NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize; } if (NewPageEndPtr.xrecoff >= XLogFileSize) { /* crossing a logid boundary */ NewPageEndPtr.xlogid += 1; NewPageEndPtr.xrecoff = XLOG_BLCKSZ; } else NewPageEndPtr.xrecoff += XLOG_BLCKSZ; XLogCtl->xlblocks[nextidx] = NewPageEndPtr; NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); Insert->curridx = nextidx; Insert->currpage = NewPage; Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD; /* * Be sure to re-zero the buffer so that bytes beyond what we've written * will look like zeroes and not valid XLOG records... */ MemSet((char *) NewPage, 0, XLOG_BLCKSZ); /* * Fill the new page's header */ NewPage ->xlp_magic = XLOG_PAGE_MAGIC; /* NewPage->xlp_info = 0; */ /* done by memset */ NewPage ->xlp_tli = ThisTimeLineID; NewPage ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; NewPage ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ; /* * If first page of an XLOG segment file, make it a long header. */ if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0) { XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; NewLongPage->xlp_sysid = ControlFile->system_identifier; NewLongPage->xlp_seg_size = XLogSegSize; NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; NewPage ->xlp_info |= XLP_LONG_HEADER; Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD; } return update_needed;}/* * Check whether we've consumed enough xlog space that a checkpoint is needed. * * Caller must have just finished filling the open log file (so that * openLogId/openLogSeg are valid). We measure the distance from RedoRecPtr * to the open log file and see if that exceeds CheckPointSegments. * * Note: it is caller's responsibility that RedoRecPtr is up-to-date. */static boolXLogCheckpointNeeded(void){ /* * A straight computation of segment number could overflow 32 bits. Rather * than assuming we have working 64-bit arithmetic, we compare the * highest-order bits separately, and force a checkpoint immediately when * they change. */ uint32 old_segno, new_segno; uint32 old_highbits, new_highbits; old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile + (RedoRecPtr.xrecoff / XLogSegSize); old_highbits = RedoRecPtr.xlogid / XLogSegSize; new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg; new_highbits = openLogId / XLogSegSize; if (new_highbits != old_highbits || new_segno >= old_segno + (uint32) (CheckPointSegments - 1)) return true; return false;}/* * Write and/or fsync the log at least as far as WriteRqst indicates. * * If flexible == TRUE, we don't have to write as far as WriteRqst, but * may stop at any convenient boundary (such as a cache or logfile boundary). * This option allows us to avoid uselessly issuing multiple writes when a * single one would do. * * If xlog_switch == TRUE, we are intending an xlog segment switch, so * perform end-of-segment actions after writing the last page, even if * it's not physically the end of its segment. (NB: this will work properly * only if caller specifies WriteRqst == page-end and flexible == false, * and there is some data to write.) * * Must be called with WALWriteLock held. */static voidXLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch){ XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; bool last_iteration; bool finishing_seg; bool use_existent; int curridx; int npages; int startidx; uint32 startoffset; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); /* * Update local LogwrtResult (caller probably did this already, but...) */ LogwrtResult = Write->LogwrtResult; /* * Since successive pages in the xlog cache are consecutively allocated, * we can usually gather multiple pages together and issue just one * write() call. npages is the number of pages we have determined can be * written together; startidx is the cache block index of the first one, * and startoffset is the file offset at which it should go. The latter * two variables are only valid when npages > 0, but we must initialize * all of them to keep the compiler quiet. */ npages = 0; startidx = 0; startoffset = 0; /* * Within the loop, curridx is the cache block index of the page to * consider writing. We advance Write->curridx only after successfully * writing pages. (Right now, this refinement is useless since we are * going to PANIC if any error occurs anyway; but someday it may come in * useful.) */ curridx = Write->curridx; while (XLByteLT(LogwrtResult.Write, WriteRqst.Write)) { /* * Make sure we're not ahead of the insert process. This could happen * if we're passed a bogus WriteRqst.Write that is past the end of the * last page that's been initialized by AdvanceXLInsertBuffer. */ if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, XLogCtl->xlblocks[curridx].xlogid, XLogCtl->xlblocks[curridx].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ LogwrtResult.Write = XLogCtl->xlblocks[curridx]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) { /* * Switch to new logfile segment. We cannot have any pending * pages here (since we dump what we have at segment end). */ Assert(npages == 0); if (openLogFile >= 0) XLogFileClose(); XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); /* create/use new log file */ use_existent = true; openLogFile = XLogFileInit(openLogId, openLogSeg, &use_existent, true); openLogOff = 0; } /* Make sure we have the current logfile open */ if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } /* Add current page to the set of pending pages-to-dump */ if (npages == 0) { /* first of group */ startidx = curridx; startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize; } npages++; /* * Dump the set if this will be the last loop iteration, or if we are * at the last page of the cache area (since the next page won't be * contiguous in memory), or if we are at the end of the logfile * segment. */ last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write); finishing_seg = !ispartialpage && (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize; if (last_iteration || curridx == XLogCtl->XLogCacheBlck || finishing_seg) { char *from; Size nbytes; /* Need to seek in the file? */ if (openLogOff != startoffset) { if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not seek in log file %u, " "segment %u to offset %u: %m", openLogId, openLogSeg, startoffset))); openLogOff = startoffset; } /* OK to write the page(s) */ from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ; nbytes = npages * (Size) XLOG_BLCKSZ; errno = 0; if (write(openLogFile, from, nbytes) != nbytes) { /* if write didn't set errno, assume no disk space */ if (errno == 0) errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to log file %u, segment %u " "at offset %u, length %lu: %m", openLogId, openLogSeg, openLogOff, (unsigned long) nbytes))); } /* Update state for write */ openLogOff += nbytes; Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx); npages = 0; /* * If we just wrote the whole last page of a logfile segment, * fsync the segment immediately. This avoids having to go back * and re-open prior segments when an fsync request comes along * later. Doing it here ensures that one and only one backend will * perform this fsync. * * We also do this if this is the last page written for an xlog * switch. * * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage, and to update the * timer for archive_timeout, and to signal for a checkpoint if * too many logfile segments have been used since the last * checkpoint. */ if (finishing_seg || (xlog_switch && last_iteration)) { issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) XLogArchiveNotifySeg(openLogId, openLogSeg); Write->lastSegSwitchTime = time(NULL); /* * Signal bgwriter to start a checkpoint if we've consumed too * much xlog since the last one. For speed, we first check * using the local copy of RedoRecPtr, which might be out of * date; if it looks like a checkpoint is needed, forcibly * update RedoRecPtr and recheck. */ if (IsUnderPostmaster && XLogCheckpointNeeded()) { (void) GetRedoRecPtr(); if (XLogCheckpointNeeded()) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } } if (ispartialpage) { /* Only asked to write a partial page */ LogwrtResult.Write = WriteRqst.Write; break; } curridx = NextBufIdx(curridx); /* If flexible, break out of loop as soon as we wrote something */ if (flexible && npages == 0) break; } Assert(npages == 0); Assert(curridx == Write->curridx); /* * If asked to flush, do so */ if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) && XLByteLT(LogwrtResult.Flush, LogwrtResult.Write)) { /* * Could get here without iterating above loop, in which case we might * have no open file or the wrong one. However, we do not need to * fsync more than one file. */ if (sync_method != SYNC_METHOD_OPEN) { if (openLogFile >= 0 && !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) XLogFileClose(); if (openLogFile < 0) { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } issue_xlog_fsync(); } LogwrtResult.Flush = LogwrtResult.Write; } /* * Update shared-memory status * * We make sure that the shared 'request' values do not fall behind the * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire(&xlogctl->info_lck); xlogctl->LogwrtResult = LogwrtResult; if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write)) xlogctl->LogwrtRqst.Write = LogwrtResult.Write; if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush)) xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease(&xlogctl->info_lck); } Write->LogwrtResult = LogwrtResult;}/* * Record the LSN for an asynchronous transaction commit. * (This should not be called for aborts, nor for synchronous commits.) */voidXLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN){ /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire(&xlogctl->info_lck); if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN)) xlogctl->asyncCommitLSN = asyncCommitLSN; SpinLockRelease(&xlogctl->info_lck);}/* * Ensure that all XLOG data through the given position is flushed to disk. * * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not * already held, and we try to avoid acquiring it if possible. */voidXLogFlush(XLogRecPtr record){ XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; /* Disabled during REDO */ if (InRedo) return; /* Quick exit if already known flushed */ if (XLByteLE(record, LogwrtResult.Flush)) return;#ifdef WAL_DEBUG if (XLOG_DEBUG) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", record.xlogid, record.xrecoff, LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);#endif START_CRIT_SECTION(); /*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -