📄 xlog.c
字号:
* used to write the XLOG, and so will normally refer to the active segment. */static int openLogFile = -1;static uint32 openLogId = 0;static uint32 openLogSeg = 0;static uint32 openLogOff = 0;/* * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which * will be just past that page. */static int readFile = -1;static uint32 readId = 0;static uint32 readSeg = 0;static uint32 readOff = 0;/* Buffer for currently read page (BLCKSZ bytes) */static char *readBuf = NULL;/* Buffer for current ReadRecord result (expandable) */static char *readRecordBuf = NULL;static uint32 readRecordBufSize = 0;/* State information for XLOG reading */static XLogRecPtr ReadRecPtr; /* start of last record read */static XLogRecPtr EndRecPtr; /* end+1 of last record read */static XLogRecord *nextRecord = NULL;static TimeLineID lastPageTLI = 0;static bool InRedo = false;static void XLogArchiveNotify(const char *xlog);static void XLogArchiveNotifySeg(uint32 log, uint32 seg);static bool XLogArchiveIsDone(const char *xlog);static void XLogArchiveCleanup(const char *xlog);static void readRecoveryCommandFile(void);static void exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg);static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);static bool XLogCheckBuffer(XLogRecData *rdata, XLogRecPtr *lsn, BkpBlock *bkpb);static bool AdvanceXLInsertBuffer(void);static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock);static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock);static int XLogFileOpen(uint32 log, uint32 seg);static int XLogFileRead(uint32 log, uint32 seg, int emode);static bool RestoreArchivedFile(char *path, const char *xlogfname, const char *recovername, off_t expectedSize);static int PreallocXlogFiles(XLogRecPtr endptr);static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr, int *nsegsremoved, int *nsegsrecycled);static void RemoveOldBackupHistory(void);static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);static List *readTimeLineHistory(TimeLineID targetTLI);static bool existsTimeLineHistory(TimeLineID probeTLI);static TimeLineID findNewestTimeLine(TimeLineID startTLI);static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg);static void WriteControlFile(void);static void ReadControlFile(void);static char *str_time(time_t tnow);static void issue_xlog_fsync(void);#ifdef WAL_DEBUGstatic void xlog_outrec(char *buf, XLogRecord *record);#endifstatic bool read_backup_label(XLogRecPtr *checkPointLoc);static void remove_backup_label(void);/* * Insert an XLOG record having the specified RMID and info bytes, * with the body of the record being the data chunk(s) described by * the rdata chain (see xlog.h for notes about rdata). * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) * * NB: this routine feels free to scribble on the XLogRecData structs, * though not on the data they reference. This is OK since the XLogRecData * structs are always just temporaries in the calling code. */XLogRecPtrXLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata){ XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; XLogContRecord *contrecord; XLogRecPtr RecPtr; XLogRecPtr WriteRqst; uint32 freespace; int curridx; XLogRecData *rdt; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; pg_crc32 rdata_crc; uint32 len, write_len; unsigned i; XLogwrtRqst LogwrtRqst; bool updrqst; bool no_tran = (rmid == RM_XLOG_ID) ? true : false; if (info & XLR_INFO_MASK) { if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN) elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK)); no_tran = true; info &= ~XLR_INFO_MASK; } /* * In bootstrap mode, we don't actually log anything but XLOG resources; * return a phony record pointer. */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { RecPtr.xlogid = 0; RecPtr.xrecoff = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return (RecPtr); } /* * Here we scan the rdata chain, determine which buffers must be backed * up, and compute the CRC values for the data. Note that the record * header isn't added into the CRC initially since we don't know the final * length or info bits quite yet. Thus, the CRC will represent the CRC of * the whole record in the order "rdata, then backup blocks, then record * header". * * We may have to loop back to here if a race condition is detected below. * We could prevent the race by doing all this work while holding the * insert lock, but it seems better to avoid doing CRC calculations while * holding the lock. This means we have to be careful about modifying the * rdata chain until we know we aren't going to loop back again. The only * change we allow ourselves to make earlier is to set rdt->data = NULL in * chain items we have decided we will have to back up the whole buffer * for. This is OK because we will certainly decide the same thing again * for those items if we do it over; doing it here saves an extra pass * over the chain later. */begin:; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { dtbuf[i] = InvalidBuffer; dtbuf_bkp[i] = false; } INIT_CRC32(rdata_crc); len = 0; for (rdt = rdata;;) { if (rdt->buffer == InvalidBuffer) { /* Simple data, just include it */ len += rdt->len; COMP_CRC32(rdata_crc, rdt->data, rdt->len); } else { /* Find info for buffer */ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (rdt->buffer == dtbuf[i]) { /* Buffer already referenced by earlier chain item */ if (dtbuf_bkp[i]) rdt->data = NULL; else if (rdt->data) { len += rdt->len; COMP_CRC32(rdata_crc, rdt->data, rdt->len); } break; } if (dtbuf[i] == InvalidBuffer) { /* OK, put it in this slot */ dtbuf[i] = rdt->buffer; if (XLogCheckBuffer(rdt, &(dtbuf_lsn[i]), &(dtbuf_xlg[i]))) { dtbuf_bkp[i] = true; rdt->data = NULL; } else if (rdt->data) { len += rdt->len; COMP_CRC32(rdata_crc, rdt->data, rdt->len); } break; } } if (i >= XLR_MAX_BKP_BLOCKS) elog(PANIC, "can backup at most %d blocks per xlog record", XLR_MAX_BKP_BLOCKS); } /* Break out of loop when rdt points to last chain item */ if (rdt->next == NULL) break; rdt = rdt->next; } /* * Now add the backup block headers and data into the CRC */ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf_bkp[i]) { BkpBlock *bkpb = &(dtbuf_xlg[i]); char *page; COMP_CRC32(rdata_crc, (char *) bkpb, sizeof(BkpBlock)); page = (char *) BufferGetBlock(dtbuf[i]); if (bkpb->hole_length == 0) { COMP_CRC32(rdata_crc, page, BLCKSZ); } else { /* must skip the hole */ COMP_CRC32(rdata_crc, page, bkpb->hole_offset); COMP_CRC32(rdata_crc, page + (bkpb->hole_offset + bkpb->hole_length), BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); } } } /* * NOTE: the test for len == 0 here is somewhat fishy, since in theory all * of the rmgr data might have been suppressed in favor of backup blocks. * Currently, all callers of XLogInsert provide at least some * not-in-a-buffer data and so len == 0 should never happen, but that may * not be true forever. If you need to remove the len == 0 check, also * remove the check for xl_len == 0 in ReadRecord, below. */ if (len == 0) elog(PANIC, "invalid xlog record length %u", len); START_CRIT_SECTION(); /* update LogwrtResult before doing cache fill check */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); LogwrtRqst = xlogctl->LogwrtRqst; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } /* * If cache is half filled then try to acquire write lock and do * XLogWrite. Ignore any fractional blocks in performing this check. */ LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ; if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid || (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff + XLogCtl->XLogCacheByte / 2)) { if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE)) { /* * Since the amount of data we write here is completely optional * anyway, tell XLogWrite it can be "flexible" and stop at a * convenient boundary. This allows writes triggered by this * mechanism to synchronize with the cache boundaries, so that in * a long transaction we'll basically dump alternating halves of * the buffer array. */ LogwrtResult = XLogCtl->Write.LogwrtResult; if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write)) XLogWrite(LogwrtRqst, true); LWLockRelease(WALWriteLock); } } /* Now wait to get insert lock */ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); /* * Check to see if my RedoRecPtr is out of date. If so, may have to go * back and recompute everything. This can only happen just after a * checkpoint, so it's better to be slow in this case and fast otherwise. */ if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) { Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); RedoRecPtr = Insert->RedoRecPtr; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf[i] == InvalidBuffer) continue; if (dtbuf_bkp[i] == false && XLByteLE(dtbuf_lsn[i], RedoRecPtr)) { /* * Oops, this buffer now needs to be backed up, but we didn't * think so above. Start over. */ LWLockRelease(WALInsertLock); END_CRIT_SECTION(); goto begin; } } } /* * Make additional rdata chain entries for the backup blocks, so that we * don't need to special-case them in the write loop. Note that we have * now irrevocably changed the input rdata chain. At the exit of this * loop, write_len includes the backup block data. * * Also set the appropriate info bits to show which buffers were backed * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct * buffer value (ignoring InvalidBuffer) appearing in the rdata chain. */ write_len = len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { BkpBlock *bkpb; char *page; if (!dtbuf_bkp[i]) continue; info |= XLR_SET_BKP_BLOCK(i); bkpb = &(dtbuf_xlg[i]); page = (char *) BufferGetBlock(dtbuf[i]); rdt->next = &(dtbuf_rdt1[i]); rdt = rdt->next; rdt->data = (char *) bkpb; rdt->len = sizeof(BkpBlock); write_len += sizeof(BkpBlock); rdt->next = &(dtbuf_rdt2[i]); rdt = rdt->next; if (bkpb->hole_length == 0) { rdt->data = page; rdt->len = BLCKSZ; write_len += BLCKSZ; rdt->next = NULL; } else { /* must skip the hole */ rdt->data = page; rdt->len = bkpb->hole_offset; write_len += bkpb->hole_offset; rdt->next = &(dtbuf_rdt3[i]); rdt = rdt->next; rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); write_len += rdt->len; rdt->next = NULL; } } /* * If there isn't enough space on the current XLOG page for a record * header, advance to the next page (leaving the unused space as zeroes). */ updrqst = false; freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { updrqst = AdvanceXLInsertBuffer(); freespace = INSERT_FREESPACE(Insert); } curridx = Insert->curridx; record = (XLogRecord *) Insert->currpos; /* Insert record header */ record->xl_prev = Insert->PrevRecord; record->xl_xid = GetCurrentTransactionIdIfAny(); record->xl_tot_len = SizeOfXLogRecord + write_len; record->xl_len = len; /* doesn't include backup blocks */ record->xl_info = info; record->xl_rmid = rmid; /* Now we can finish computing the record's CRC */ COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32), SizeOfXLogRecord - sizeof(pg_crc32)); FIN_CRC32(rdata_crc); record->xl_crc = rdata_crc; /* Compute record's XLOG location */ INSERT_RECPTR(RecPtr, Insert, curridx);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -