📄 xlog.c
字号:
#ifdef WAL_DEBUG if (XLOG_DEBUG) { char buf[8192]; sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff); xlog_outrec(buf, record); if (rdata->data != NULL) { strcat(buf, " - "); RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data); } elog(LOG, "%s", buf); }#endif /* Record begin of record in appropriate places */ if (!no_tran) MyLastRecPtr = RecPtr; ProcLastRecPtr = RecPtr; Insert->PrevRecord = RecPtr; MyXactMadeXLogEntry = true; Insert->currpos += SizeOfXLogRecord; freespace -= SizeOfXLogRecord; /* * Append the data, including backup blocks if any */ while (write_len) { while (rdata->data == NULL) rdata = rdata->next; if (freespace > 0) { if (rdata->len > freespace) { memcpy(Insert->currpos, rdata->data, freespace); rdata->data += freespace; rdata->len -= freespace; write_len -= freespace; } else { memcpy(Insert->currpos, rdata->data, rdata->len); freespace -= rdata->len; write_len -= rdata->len; Insert->currpos += rdata->len; rdata = rdata->next; continue; } } /* Use next buffer */ updrqst = AdvanceXLInsertBuffer(); curridx = Insert->curridx; /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; contrecord->xl_rem_len = write_len; Insert->currpos += SizeOfXLogContRecord; freespace = INSERT_FREESPACE(Insert); } /* Ensure next record will be properly aligned */ Insert->currpos = (char *) Insert->currpage + MAXALIGN(Insert->currpos - (char *) Insert->currpage); freespace = INSERT_FREESPACE(Insert); /* * The recptr I return is the beginning of the *next* record. This will be * stored as LSN for changed data pages... */ INSERT_RECPTR(RecPtr, Insert, curridx); /* Need to update shared LogwrtRqst if some block was filled up */ if (freespace < SizeOfXLogRecord) updrqst = true; /* curridx is filled and available for writing * out */ else curridx = PrevBufIdx(curridx); WriteRqst = XLogCtl->xlblocks[curridx]; LWLockRelease(WALInsertLock); if (updrqst) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); /* advance global request to include new block(s) */ if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst)) xlogctl->LogwrtRqst.Write = WriteRqst; /* update local result copy while I have the chance */ LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } ProcLastRecEnd = RecPtr; END_CRIT_SECTION(); return (RecPtr);}/* * Determine whether the buffer referenced by an XLogRecData item has to * be backed up, and if so fill a BkpBlock struct for it. In any case * save the buffer's LSN at *lsn. */static boolXLogCheckBuffer(XLogRecData *rdata, XLogRecPtr *lsn, BkpBlock *bkpb){ PageHeader page; page = (PageHeader) BufferGetBlock(rdata->buffer); /* * XXX We assume page LSN is first data on *every* page that can be passed * to XLogInsert, whether it otherwise has the standard page layout or * not. */ *lsn = page->pd_lsn; if (XLByteLE(page->pd_lsn, RedoRecPtr)) { /* * The page needs to be backed up, so set up *bkpb */ bkpb->node = BufferGetFileNode(rdata->buffer); bkpb->block = BufferGetBlockNumber(rdata->buffer); if (rdata->buffer_std) { /* Assume we can omit data between pd_lower and pd_upper */ uint16 lower = page->pd_lower; uint16 upper = page->pd_upper; if (lower >= SizeOfPageHeaderData && upper > lower && upper <= BLCKSZ) { bkpb->hole_offset = lower; bkpb->hole_length = upper - lower; } else { /* No "hole" to compress out */ bkpb->hole_offset = 0; bkpb->hole_length = 0; } } else { /* Not a standard page header, don't try to eliminate "hole" */ bkpb->hole_offset = 0; bkpb->hole_length = 0; } return true; /* buffer requires backup */ } return false; /* buffer does not need to be backed up */}/* * XLogArchiveNotify * * Create an archive notification file * * The name of the notification file is the message that will be picked up * by the archiver, e.g. we write 0000000100000001000000C6.ready * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6, * then when complete, rename it to 0000000100000001000000C6.done */static voidXLogArchiveNotify(const char *xlog){ char archiveStatusPath[MAXPGPATH]; FILE *fd; /* insert an otherwise empty file called <XLOG>.ready */ StatusFilePath(archiveStatusPath, xlog, ".ready"); fd = AllocateFile(archiveStatusPath, "w"); if (fd == NULL) { ereport(LOG, (errcode_for_file_access(), errmsg("could not create archive status file \"%s\": %m", archiveStatusPath))); return; } if (FreeFile(fd)) { ereport(LOG, (errcode_for_file_access(), errmsg("could not write archive status file \"%s\": %m", archiveStatusPath))); return; } /* Notify archiver that it's got something to do */ if (IsUnderPostmaster) SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);}/* * Convenience routine to notify using log/seg representation of filename */static voidXLogArchiveNotifySeg(uint32 log, uint32 seg){ char xlog[MAXFNAMELEN]; XLogFileName(xlog, ThisTimeLineID, log, seg); XLogArchiveNotify(xlog);}/* * XLogArchiveIsDone * * Checks for a ".done" archive notification file. This is called when we * are ready to delete or recycle an old XLOG segment file. If it is okay * to delete it then return true. * * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists, * then return false; else create <XLOG>.ready and return false. The * last case covers the possibility that the original attempt to create * <XLOG>.ready failed. */static boolXLogArchiveIsDone(const char *xlog){ char archiveStatusPath[MAXPGPATH]; struct stat stat_buf; /* First check for .done --- this is the expected case */ StatusFilePath(archiveStatusPath, xlog, ".done"); if (stat(archiveStatusPath, &stat_buf) == 0) return true; /* check for .ready --- this means archiver is still busy with it */ StatusFilePath(archiveStatusPath, xlog, ".ready"); if (stat(archiveStatusPath, &stat_buf) == 0) return false; /* Race condition --- maybe archiver just finished, so recheck */ StatusFilePath(archiveStatusPath, xlog, ".done"); if (stat(archiveStatusPath, &stat_buf) == 0) return true; /* Retry creation of the .ready file */ XLogArchiveNotify(xlog); return false;}/* * XLogArchiveCleanup * * Cleanup archive notification file(s) for a particular xlog segment */static voidXLogArchiveCleanup(const char *xlog){ char archiveStatusPath[MAXPGPATH]; /* Remove the .done file */ StatusFilePath(archiveStatusPath, xlog, ".done"); unlink(archiveStatusPath); /* should we complain about failure? */ /* Remove the .ready file if present --- normally it shouldn't be */ StatusFilePath(archiveStatusPath, xlog, ".ready"); unlink(archiveStatusPath); /* should we complain about failure? */}/* * Advance the Insert state to the next buffer page, writing out the next * buffer if it still contains unwritten data. * * The global LogwrtRqst.Write pointer needs to be advanced to include the * just-filled page. If we can do this for free (without an extra lock), * we do so here. Otherwise the caller must do it. We return TRUE if the * request update still needs to be done, FALSE if we did it internally. * * Must be called with WALInsertLock held. */static boolAdvanceXLInsertBuffer(void){ XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlWrite *Write = &XLogCtl->Write; int nextidx = NextBufIdx(Insert->curridx); bool update_needed = true; XLogRecPtr OldPageRqstPtr; XLogwrtRqst WriteRqst; XLogRecPtr NewPageEndPtr; XLogPageHeader NewPage; /* Use Insert->LogwrtResult copy if it's more fresh */ if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write)) LogwrtResult = Insert->LogwrtResult; /* * Get ending-offset of the buffer page we need to replace (this may be * zero if the buffer hasn't been used yet). Fall through if it's already * written out. */ OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* nope, got work to do... */ XLogRecPtr FinishedPageRqstPtr; FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; /* Before waiting, get info_lck and update LogwrtResult */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr)) xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } update_needed = false; /* Did the shared-request update */ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* OK, someone wrote it already */ Insert->LogwrtResult = LogwrtResult; } else { /* Must acquire write lock */ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = Write->LogwrtResult; if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* OK, someone wrote it already */ LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } else { /* * Have to write buffers while holding insert lock. This is * not good, so only write as much as we absolutely must. */ WriteRqst.Write = OldPageRqstPtr; WriteRqst.Flush.xlogid = 0; WriteRqst.Flush.xrecoff = 0; XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; } } } /* * Now the next buffer slot is free and we can set it up to be the next * output page. */ NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx]; if (NewPageEndPtr.xrecoff >= XLogFileSize) { /* crossing a logid boundary */ NewPageEndPtr.xlogid += 1; NewPageEndPtr.xrecoff = BLCKSZ; } else NewPageEndPtr.xrecoff += BLCKSZ; XLogCtl->xlblocks[nextidx] = NewPageEndPtr; NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) BLCKSZ); Insert->curridx = nextidx; Insert->currpage = NewPage; Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD; /* * Be sure to re-zero the buffer so that bytes beyond what we've written * will look like zeroes and not valid XLOG records... */ MemSet((char *) NewPage, 0, BLCKSZ); /* * Fill the new page's header */ NewPage ->xlp_magic = XLOG_PAGE_MAGIC; /* NewPage->xlp_info = 0; */ /* done by memset */ NewPage ->xlp_tli = ThisTimeLineID; NewPage ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; NewPage ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ; /* * If first page of an XLOG segment file, make it a long header. */ if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0) { XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; NewLongPage->xlp_sysid = ControlFile->system_identifier; NewLongPage->xlp_seg_size = XLogSegSize; NewPage ->xlp_info |= XLP_LONG_HEADER; Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD; } return update_needed;}/* * Write and/or fsync the log at least as far as WriteRqst indicates. * * If flexible == TRUE, we don't have to write as far as WriteRqst, but
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -