📄 xlog.c
字号:
static bool InRedo = false;static bool AdvanceXLInsertBuffer(void);static void XLogWrite(XLogwrtRqst WriteRqst);static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock);static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, bool find_free, int max_advance, bool use_lock);static int XLogFileOpen(uint32 log, uint32 seg, bool econt);static void PreallocXlogFiles(XLogRecPtr endptr);static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt, char *buffer);static void WriteControlFile(void);static void ReadControlFile(void);static char *str_time(time_t tnow);static void xlog_outrec(char *buf, XLogRecord *record);static void issue_xlog_fsync(void);/* * Insert an XLOG record having the specified RMID and info bytes, * with the body of the record being the data chunk(s) described by * the rdata list (see xlog.h for notes about rdata). * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) * * NB: this routine feels free to scribble on the XLogRecData structs, * though not on the data they reference. This is OK since the XLogRecData * structs are always just temporaries in the calling code. */XLogRecPtrXLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata){ XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; XLogContRecord *contrecord; XLogRecPtr RecPtr; XLogRecPtr WriteRqst; uint32 freespace; uint16 curridx; XLogRecData *rdt; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS]; crc64 rdata_crc; uint32 len, write_len; unsigned i; XLogwrtRqst LogwrtRqst; bool updrqst; bool no_tran = (rmid == RM_XLOG_ID) ? true : false; if (info & XLR_INFO_MASK) { if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN) elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK)); no_tran = true; info &= ~XLR_INFO_MASK; } /* * In bootstrap mode, we don't actually log anything but XLOG * resources; return a phony record pointer. */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { RecPtr.xlogid = 0; RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */ return (RecPtr); } /* * Here we scan the rdata list, determine which buffers must be backed * up, and compute the CRC values for the data. Note that the record * header isn't added into the CRC yet since we don't know the final * length or info bits quite yet. * * We may have to loop back to here if a race condition is detected * below. We could prevent the race by doing all this work while * holding the insert lock, but it seems better to avoid doing CRC * calculations while holding the lock. This means we have to be * careful about modifying the rdata list until we know we aren't * going to loop back again. The only change we allow ourselves to * make earlier is to set rdt->data = NULL in list items we have * decided we will have to back up the whole buffer for. This is OK * because we will certainly decide the same thing again for those * items if we do it over; doing it here saves an extra pass over the * list later. */begin:; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { dtbuf[i] = InvalidBuffer; dtbuf_bkp[i] = false; } INIT_CRC64(rdata_crc); len = 0; for (rdt = rdata;;) { if (rdt->buffer == InvalidBuffer) { /* Simple data, just include it */ len += rdt->len; COMP_CRC64(rdata_crc, rdt->data, rdt->len); } else { /* Find info for buffer */ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (rdt->buffer == dtbuf[i]) { /* Buffer already referenced by earlier list item */ if (dtbuf_bkp[i]) rdt->data = NULL; else if (rdt->data) { len += rdt->len; COMP_CRC64(rdata_crc, rdt->data, rdt->len); } break; } if (dtbuf[i] == InvalidBuffer) { /* OK, put it in this slot */ dtbuf[i] = rdt->buffer; /* * XXX We assume page LSN is first data on page */ dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer)); if (XLByteLE(dtbuf_lsn[i], RedoRecPtr)) { crc64 dtcrc; dtbuf_bkp[i] = true; rdt->data = NULL; INIT_CRC64(dtcrc); COMP_CRC64(dtcrc, BufferGetBlock(dtbuf[i]), BLCKSZ); dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]); dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]); COMP_CRC64(dtcrc, (char *) &(dtbuf_xlg[i]) + sizeof(crc64), sizeof(BkpBlock) - sizeof(crc64)); FIN_CRC64(dtcrc); dtbuf_xlg[i].crc = dtcrc; } else if (rdt->data) { len += rdt->len; COMP_CRC64(rdata_crc, rdt->data, rdt->len); } break; } } if (i >= XLR_MAX_BKP_BLOCKS) elog(PANIC, "can backup at most %d blocks per xlog record", XLR_MAX_BKP_BLOCKS); } /* Break out of loop when rdt points to last list item */ if (rdt->next == NULL) break; rdt = rdt->next; } /* * NOTE: the test for len == 0 here is somewhat fishy, since in theory * all of the rmgr data might have been suppressed in favor of backup * blocks. Currently, all callers of XLogInsert provide at least some * not-in-a-buffer data and so len == 0 should never happen, but that * may not be true forever. If you need to remove the len == 0 check, * also remove the check for xl_len == 0 in ReadRecord, below. */ if (len == 0 || len > MAXLOGRECSZ) elog(PANIC, "invalid xlog record length %u", len); START_CRIT_SECTION(); /* update LogwrtResult before doing cache fill check */ { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); LogwrtRqst = xlogctl->LogwrtRqst; LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } /* * If cache is half filled then try to acquire write lock and do * XLogWrite. Ignore any fractional blocks in performing this check. */ LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ; if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid || (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff + XLogCtl->XLogCacheByte / 2)) { if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE)) { LogwrtResult = XLogCtl->Write.LogwrtResult; if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write)) XLogWrite(LogwrtRqst); LWLockRelease(WALWriteLock); } } /* Now wait to get insert lock */ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); /* * Check to see if my RedoRecPtr is out of date. If so, may have to * go back and recompute everything. This can only happen just after * a checkpoint, so it's better to be slow in this case and fast * otherwise. */ if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) { Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); RedoRecPtr = Insert->RedoRecPtr; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf[i] == InvalidBuffer) continue; if (dtbuf_bkp[i] == false && XLByteLE(dtbuf_lsn[i], RedoRecPtr)) { /* * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ LWLockRelease(WALInsertLock); END_CRIT_SECTION(); goto begin; } } } /* * Make additional rdata list entries for the backup blocks, so that * we don't need to special-case them in the write loop. Note that we * have now irrevocably changed the input rdata list. At the exit of * this loop, write_len includes the backup block data. * * Also set the appropriate info bits to show which buffers were backed * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th * distinct buffer value (ignoring InvalidBuffer) appearing in the * rdata list. */ write_len = len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i])) continue; info |= XLR_SET_BKP_BLOCK(i); rdt->next = &(dtbuf_rdt[2 * i]); dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]); dtbuf_rdt[2 * i].len = sizeof(BkpBlock); write_len += sizeof(BkpBlock); rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]); dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]); dtbuf_rdt[2 * i + 1].len = BLCKSZ; write_len += BLCKSZ; dtbuf_rdt[2 * i + 1].next = NULL; } /* Insert record header */ updrqst = false; freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { updrqst = AdvanceXLInsertBuffer(); freespace = BLCKSZ - SizeOfXLogPHD; } curridx = Insert->curridx; record = (XLogRecord *) Insert->currpos; record->xl_prev = Insert->PrevRecord; if (no_tran) { record->xl_xact_prev.xlogid = 0; record->xl_xact_prev.xrecoff = 0; } else record->xl_xact_prev = MyLastRecPtr; record->xl_xid = GetCurrentTransactionId(); record->xl_len = len; /* doesn't include backup blocks */ record->xl_info = info; record->xl_rmid = rmid; /* Now we can finish computing the main CRC */ COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64), SizeOfXLogRecord - sizeof(crc64)); FIN_CRC64(rdata_crc); record->xl_crc = rdata_crc; /* Compute record's XLOG location */ INSERT_RECPTR(RecPtr, Insert, curridx); /* If first XLOG record of transaction, save it in PGPROC array */ if (MyLastRecPtr.xrecoff == 0 && !no_tran) { /* * We do not acquire SInvalLock here because of possible deadlock. * Anyone who wants to inspect other procs' logRec must acquire * WALInsertLock, instead. A better solution would be a per-PROC * spinlock, but no time for that before 7.2 --- tgl 12/19/01. */ MyProc->logRec = RecPtr; } if (XLOG_DEBUG) { char buf[8192]; sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff); xlog_outrec(buf, record); if (rdata->data != NULL) { strcat(buf, " - "); RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data); } elog(LOG, "%s", buf); } /* Record begin of record in appropriate places */ if (!no_tran) MyLastRecPtr = RecPtr; ProcLastRecPtr = RecPtr; Insert->PrevRecord = RecPtr; MyXactMadeXLogEntry = true; Insert->currpos += SizeOfXLogRecord; freespace -= SizeOfXLogRecord; /* * Append the data, including backup blocks if any */ while (write_len) { while (rdata->data == NULL) rdata = rdata->next; if (freespace > 0) { if (rdata->len > freespace) { memcpy(Insert->currpos, rdata->data, freespace); rdata->data += freespace; rdata->len -= freespace; write_len -= freespace; } else { memcpy(Insert->currpos, rdata->data, rdata->len); freespace -= rdata->len; write_len -= rdata->len; Insert->currpos += rdata->len; rdata = rdata->next; continue; } } /* Use next buffer */ updrqst = AdvanceXLInsertBuffer(); curridx = Insert->curridx; /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; contrecord->xl_rem_len = write_len; Insert->currpos += SizeOfXLogContRecord; freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord; } /* Ensure next record will be properly aligned */ Insert->currpos = (char *) Insert->currpage + MAXALIGN(Insert->currpos - (char *) Insert->currpage); freespace = INSERT_FREESPACE(Insert); /* * The recptr I return is the beginning of the *next* record. This * will be stored as LSN for changed data pages... */ INSERT_RECPTR(RecPtr, Insert, curridx); /* Need to update shared LogwrtRqst if some block was filled up */ if (freespace < SizeOfXLogRecord) updrqst = true; /* curridx is filled and available for * writing out */ else curridx = PrevBufIdx(curridx); WriteRqst = XLogCtl->xlblocks[curridx]; LWLockRelease(WALInsertLock); if (updrqst) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; SpinLockAcquire_NoHoldoff(&xlogctl->info_lck); /* advance global request to include new block(s) */ if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst)) xlogctl->LogwrtRqst.Write = WriteRqst; /* update local result copy while I have the chance */ LogwrtResult = xlogctl->LogwrtResult; SpinLockRelease_NoHoldoff(&xlogctl->info_lck); } ProcLastRecEnd = RecPtr; END_CRIT_SECTION();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -