📄 xlog.c
字号:
*/ while (write_len) { while (rdata->data == NULL) rdata = rdata->next; if (freespace > 0) { if (rdata->len > freespace) { memcpy(Insert->currpos, rdata->data, freespace); rdata->data += freespace; rdata->len -= freespace; write_len -= freespace; } else { memcpy(Insert->currpos, rdata->data, rdata->len); freespace -= rdata->len; write_len -= rdata->len; Insert->currpos += rdata->len; rdata = rdata->next; continue; } } /* Use next buffer */ updrqst = AdvanceXLInsertBuffer(false); curridx = Insert->curridx; /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; contrecord->xl_rem_len = write_len; Insert->currpos += SizeOfXLogContRecord; freespace = INSERT_FREESPACE(Insert); } /* Ensure next record will be properly aligned */ Insert->currpos = (char *) Insert->currpage + MAXALIGN(Insert->currpos - (char *) Insert->currpage); freespace = INSERT_FREESPACE(Insert); /* * The recptr I return is the beginning of the *next* record. This will be * stored as LSN for changed data pages... */ INSERT_RECPTR(RecPtr, Insert, curridx); /* * If the record is an XLOG_SWITCH, we must now write and flush all the * existing data, and then forcibly advance to the start of the next * segment. It's not good to do this I/O while holding the insert lock, * but there seems too much risk of confusion if we try to release the * lock sooner. Fortunately xlog switch needn't be a high-performance * operation anyway... */ if (isLogSwitch) { XLogCtlWrite *Write = &XLogCtl->Write; XLogwrtRqst FlushRqst; XLogRecPtr OldSegEnd; LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); /* * Flush through the end of the page containing XLOG_SWITCH, and * perform end-of-segment actions (eg, notifying archiver). */ WriteRqst = XLogCtl->xlblocks[curridx]; FlushRqst.Write = WriteRqst; FlushRqst.Flush = WriteRqst; XLogWrite(FlushRqst, false, true); /* Set up the next buffer as first page of next segment */ /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */ (void) AdvanceXLInsertBuffer(true); /* There should be no unwritten data */ curridx = Insert->curridx; Assert(curridx == Write->curridx); /* Compute end address of old segment */ OldSegEnd = XLogCtl->xlblocks[curridx]; OldSegEnd.xrecoff -= XLOG_BLCKSZ; if (OldSegEnd.xrecoff == 0) { /* crossing a logid boundary */ OldSegEnd.xlogid -= 1; OldSegEnd.xrecoff = XLogFileSize; } /* Make it look like we've written and synced all of old segment */ LogwrtResult.Write = OldSegEnd; LogwrtResult.Flush = OldSegEnd; /* * Update shared-memory status --- this code should match XLogWrite */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire(&xlogctl->info_lck); xlogctl->LogwrtResult = LogwrtResult; if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write)) xlogctl->LogwrtRqst.Write = LogwrtResult.Write; if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush)) xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease(&xlogctl->info_lck); } Write->LogwrtResult = LogwrtResult; LWLockRelease(WALWriteLock); updrqst = false; /* done already */ } else { /* normal case, ie not xlog switch */ /* Need to update shared LogwrtRqst if some block was filled up */ if (freespace < SizeOfXLogRecord) { /* curridx is filled and available for writing out */ updrqst = true; } else { /* if updrqst already set, write through end of previous buf */ curridx = PrevBufIdx(curridx); } WriteRqst = XLogCtl->xlblocks[curridx]; } LWLockRelease(WALInsertLock); if (updrqst) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire(&xlogctl->info_lck); /* advance global request to include new block(s) */ if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst)) xlogctl->LogwrtRqst.Write = WriteRqst; /* update local result copy while I have the chance */ LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease(&xlogctl->info_lck); } XactLastRecEnd = RecPtr; END_CRIT_SECTION(); return RecPtr;}/* * Determine whether the buffer referenced by an XLogRecData item has to * be backed up, and if so fill a BkpBlock struct for it. In any case * save the buffer's LSN at *lsn. */static boolXLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb){ PageHeader page; page = (PageHeader) BufferGetBlock(rdata->buffer); /* * XXX We assume page LSN is first data on *every* page that can be passed * to XLogInsert, whether it otherwise has the standard page layout or * not. */ *lsn = page->pd_lsn; if (doPageWrites && XLByteLE(page->pd_lsn, RedoRecPtr)) { /* * The page needs to be backed up, so set up *bkpb */ bkpb->node = BufferGetFileNode(rdata->buffer); bkpb->block = BufferGetBlockNumber(rdata->buffer); if (rdata->buffer_std) { /* Assume we can omit data between pd_lower and pd_upper */ uint16 lower = page->pd_lower; uint16 upper = page->pd_upper; if (lower >= SizeOfPageHeaderData && upper > lower && upper <= BLCKSZ) { bkpb->hole_offset = lower; bkpb->hole_length = upper - lower; } else { /* No "hole" to compress out */ bkpb->hole_offset = 0; bkpb->hole_length = 0; } } else { /* Not a standard page header, don't try to eliminate "hole" */ bkpb->hole_offset = 0; bkpb->hole_length = 0; } return true; /* buffer requires backup */ } return false; /* buffer does not need to be backed up */}/* * XLogArchiveNotify * * Create an archive notification file * * The name of the notification file is the message that will be picked up * by the archiver, e.g. we write 0000000100000001000000C6.ready * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6, * then when complete, rename it to 0000000100000001000000C6.done */static voidXLogArchiveNotify(const char *xlog){ char archiveStatusPath[MAXPGPATH]; FILE *fd; /* insert an otherwise empty file called <XLOG>.ready */ StatusFilePath(archiveStatusPath, xlog, ".ready"); fd = AllocateFile(archiveStatusPath, "w"); if (fd == NULL) { ereport(LOG, (errcode_for_file_access(), errmsg("could not create archive status file \"%s\": %m", archiveStatusPath))); return; } if (FreeFile(fd)) { ereport(LOG, (errcode_for_file_access(), errmsg("could not write archive status file \"%s\": %m", archiveStatusPath))); return; } /* Notify archiver that it's got something to do */ if (IsUnderPostmaster) SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);}/* * Convenience routine to notify using log/seg representation of filename */static voidXLogArchiveNotifySeg(uint32 log, uint32 seg){ char xlog[MAXFNAMELEN]; XLogFileName(xlog, ThisTimeLineID, log, seg); XLogArchiveNotify(xlog);}/* * XLogArchiveCheckDone * * This is called when we are ready to delete or recycle an old XLOG segment * file or backup history file. If it is okay to delete it then return true. * If it is not time to delete it, make sure a .ready file exists, and return * false. * * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists, * then return false; else create <XLOG>.ready and return false. * * The reason we do things this way is so that if the original attempt to * create <XLOG>.ready fails, we'll retry during subsequent checkpoints. */static boolXLogArchiveCheckDone(const char *xlog){ char archiveStatusPath[MAXPGPATH]; struct stat stat_buf; /* Always deletable if archiving is off */ if (!XLogArchivingActive()) return true; /* First check for .done --- this means archiver is done with it */ StatusFilePath(archiveStatusPath, xlog, ".done"); if (stat(archiveStatusPath, &stat_buf) == 0) return true; /* check for .ready --- this means archiver is still busy with it */ StatusFilePath(archiveStatusPath, xlog, ".ready"); if (stat(archiveStatusPath, &stat_buf) == 0) return false; /* Race condition --- maybe archiver just finished, so recheck */ StatusFilePath(archiveStatusPath, xlog, ".done"); if (stat(archiveStatusPath, &stat_buf) == 0) return true; /* Retry creation of the .ready file */ XLogArchiveNotify(xlog); return false;}/* * XLogArchiveCleanup * * Cleanup archive notification file(s) for a particular xlog segment */static voidXLogArchiveCleanup(const char *xlog){ char archiveStatusPath[MAXPGPATH]; /* Remove the .done file */ StatusFilePath(archiveStatusPath, xlog, ".done"); unlink(archiveStatusPath); /* should we complain about failure? */ /* Remove the .ready file if present --- normally it shouldn't be */ StatusFilePath(archiveStatusPath, xlog, ".ready"); unlink(archiveStatusPath); /* should we complain about failure? */}/* * Advance the Insert state to the next buffer page, writing out the next * buffer if it still contains unwritten data. * * If new_segment is TRUE then we set up the next buffer page as the first * page of the next xlog segment file, possibly but not usually the next * consecutive file page. * * The global LogwrtRqst.Write pointer needs to be advanced to include the * just-filled page. If we can do this for free (without an extra lock), * we do so here. Otherwise the caller must do it. We return TRUE if the * request update still needs to be done, FALSE if we did it internally. * * Must be called with WALInsertLock held. */static boolAdvanceXLInsertBuffer(bool new_segment){ XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlWrite *Write = &XLogCtl->Write; int nextidx = NextBufIdx(Insert->curridx); bool update_needed = true; XLogRecPtr OldPageRqstPtr; XLogwrtRqst WriteRqst; XLogRecPtr NewPageEndPtr; XLogPageHeader NewPage; /* Use Insert->LogwrtResult copy if it's more fresh */ if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write)) LogwrtResult = Insert->LogwrtResult; /* * Get ending-offset of the buffer page we need to replace (this may be * zero if the buffer hasn't been used yet). Fall through if it's already * written out. */ OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* nope, got work to do... */ XLogRecPtr FinishedPageRqstPtr; FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; /* Before waiting, get info_lck and update LogwrtResult */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire(&xlogctl->info_lck); if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr)) xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease(&xlogctl->info_lck); } update_needed = false; /* Did the shared-request update */ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* OK, someone wrote it already */ Insert->LogwrtResult = LogwrtResult; } else { /* Must acquire write lock */ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = Write->LogwrtResult; if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* OK, someone wrote it already */ LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } else { /* * Have to write buffers while holding insert lock. This is * not good, so only write as much as we absolutely must. */ WriteRqst.Write = OldPageRqstPtr; WriteRqst.Flush.xlogid = 0; WriteRqst.Flush.xrecoff = 0; XLogWrite(WriteRqst, false, false);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -