📄 xlog.c
字号:
static char *str_time(pg_time_t tnow);#ifdef WAL_DEBUGstatic void xlog_outrec(StringInfo buf, XLogRecord *record);#endifstatic void issue_xlog_fsync(void);static void pg_start_backup_callback(int code, Datum arg);static bool read_backup_label(XLogRecPtr *checkPointLoc, XLogRecPtr *minRecoveryLoc);static void rm_redo_error_callback(void *arg);/* * Insert an XLOG record having the specified RMID and info bytes, * with the body of the record being the data chunk(s) described by * the rdata chain (see xlog.h for notes about rdata). * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) * * NB: this routine feels free to scribble on the XLogRecData structs, * though not on the data they reference. This is OK since the XLogRecData * structs are always just temporaries in the calling code. */XLogRecPtrXLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata){ XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; XLogContRecord *contrecord; XLogRecPtr RecPtr; XLogRecPtr WriteRqst; uint32 freespace; int curridx; XLogRecData *rdt; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; pg_crc32 rdata_crc; uint32 len, write_len; unsigned i; bool updrqst; bool doPageWrites; bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); /* info's high bits are reserved for use by me */ if (info & XLR_INFO_MASK) elog(PANIC, "invalid xlog info mask %02X", info); /* * In bootstrap mode, we don't actually log anything but XLOG resources; * return a phony record pointer. */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { RecPtr.xlogid = 0; RecPtr.xrecoff = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return RecPtr; } /* * Here we scan the rdata chain, determine which buffers must be backed * up, and compute the CRC values for the data. Note that the record * header isn't added into the CRC initially since we don't know the final * length or info bits quite yet. Thus, the CRC will represent the CRC of * the whole record in the order "rdata, then backup blocks, then record * header". * * We may have to loop back to here if a race condition is detected below. * We could prevent the race by doing all this work while holding the * insert lock, but it seems better to avoid doing CRC calculations while * holding the lock. This means we have to be careful about modifying the * rdata chain until we know we aren't going to loop back again. The only * change we allow ourselves to make earlier is to set rdt->data = NULL in * chain items we have decided we will have to back up the whole buffer * for. This is OK because we will certainly decide the same thing again * for those items if we do it over; doing it here saves an extra pass * over the chain later. */begin:; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { dtbuf[i] = InvalidBuffer; dtbuf_bkp[i] = false; } /* * Decide if we need to do full-page writes in this XLOG record: true if * full_page_writes is on or we have a PITR request for it. Since we * don't yet have the insert lock, forcePageWrites could change under us, * but we'll recheck it once we have the lock. */ doPageWrites = fullPageWrites || Insert->forcePageWrites; INIT_CRC32(rdata_crc); len = 0; for (rdt = rdata;;) { if (rdt->buffer == InvalidBuffer) { /* Simple data, just include it */ len += rdt->len; COMP_CRC32(rdata_crc, rdt->data, rdt->len); } else { /* Find info for buffer */ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (rdt->buffer == dtbuf[i]) { /* Buffer already referenced by earlier chain item */ if (dtbuf_bkp[i]) rdt->data = NULL; else if (rdt->data) { len += rdt->len; COMP_CRC32(rdata_crc, rdt->data, rdt->len); } break; } if (dtbuf[i] == InvalidBuffer) { /* OK, put it in this slot */ dtbuf[i] = rdt->buffer; if (XLogCheckBuffer(rdt, doPageWrites, &(dtbuf_lsn[i]), &(dtbuf_xlg[i]))) { dtbuf_bkp[i] = true; rdt->data = NULL; } else if (rdt->data) { len += rdt->len; COMP_CRC32(rdata_crc, rdt->data, rdt->len); } break; } } if (i >= XLR_MAX_BKP_BLOCKS) elog(PANIC, "can backup at most %d blocks per xlog record", XLR_MAX_BKP_BLOCKS); } /* Break out of loop when rdt points to last chain item */ if (rdt->next == NULL) break; rdt = rdt->next; } /* * Now add the backup block headers and data into the CRC */ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf_bkp[i]) { BkpBlock *bkpb = &(dtbuf_xlg[i]); char *page; COMP_CRC32(rdata_crc, (char *) bkpb, sizeof(BkpBlock)); page = (char *) BufferGetBlock(dtbuf[i]); if (bkpb->hole_length == 0) { COMP_CRC32(rdata_crc, page, BLCKSZ); } else { /* must skip the hole */ COMP_CRC32(rdata_crc, page, bkpb->hole_offset); COMP_CRC32(rdata_crc, page + (bkpb->hole_offset + bkpb->hole_length), BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); } } } /* * NOTE: We disallow len == 0 because it provides a useful bit of extra * error checking in ReadRecord. This means that all callers of * XLogInsert must supply at least some not-in-a-buffer data. However, we * make an exception for XLOG SWITCH records because we don't want them to * ever cross a segment boundary. */ if (len == 0 && !isLogSwitch) elog(PANIC, "invalid xlog record length %u", len); START_CRIT_SECTION(); /* Now wait to get insert lock */ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); /* * Check to see if my RedoRecPtr is out of date. If so, may have to go * back and recompute everything. This can only happen just after a * checkpoint, so it's better to be slow in this case and fast otherwise. * * If we aren't doing full-page writes then RedoRecPtr doesn't actually * affect the contents of the XLOG record, so we'll update our local copy * but not force a recomputation. */ if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) { Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); RedoRecPtr = Insert->RedoRecPtr; if (doPageWrites) { for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { if (dtbuf[i] == InvalidBuffer) continue; if (dtbuf_bkp[i] == false && XLByteLE(dtbuf_lsn[i], RedoRecPtr)) { /* * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ LWLockRelease(WALInsertLock); END_CRIT_SECTION(); goto begin; } } } } /* * Also check to see if forcePageWrites was just turned on; if we weren't * already doing full-page writes then go back and recompute. (If it was * just turned off, we could recompute the record without full pages, but * we choose not to bother.) */ if (Insert->forcePageWrites && !doPageWrites) { /* Oops, must redo it with full-page data */ LWLockRelease(WALInsertLock); END_CRIT_SECTION(); goto begin; } /* * Make additional rdata chain entries for the backup blocks, so that we * don't need to special-case them in the write loop. Note that we have * now irrevocably changed the input rdata chain. At the exit of this * loop, write_len includes the backup block data. * * Also set the appropriate info bits to show which buffers were backed * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct * buffer value (ignoring InvalidBuffer) appearing in the rdata chain. */ write_len = len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { BkpBlock *bkpb; char *page; if (!dtbuf_bkp[i]) continue; info |= XLR_SET_BKP_BLOCK(i); bkpb = &(dtbuf_xlg[i]); page = (char *) BufferGetBlock(dtbuf[i]); rdt->next = &(dtbuf_rdt1[i]); rdt = rdt->next; rdt->data = (char *) bkpb; rdt->len = sizeof(BkpBlock); write_len += sizeof(BkpBlock); rdt->next = &(dtbuf_rdt2[i]); rdt = rdt->next; if (bkpb->hole_length == 0) { rdt->data = page; rdt->len = BLCKSZ; write_len += BLCKSZ; rdt->next = NULL; } else { /* must skip the hole */ rdt->data = page; rdt->len = bkpb->hole_offset; write_len += bkpb->hole_offset; rdt->next = &(dtbuf_rdt3[i]); rdt = rdt->next; rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); write_len += rdt->len; rdt->next = NULL; } } /* * If we backed up any full blocks and online backup is not in progress, * mark the backup blocks as removable. This allows the WAL archiver to * know whether it is safe to compress archived WAL data by transforming * full-block records into the non-full-block format. * * Note: we could just set the flag whenever !forcePageWrites, but * defining it like this leaves the info bit free for some potential other * use in records without any backup blocks. */ if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites) info |= XLR_BKP_REMOVABLE; /* * If there isn't enough space on the current XLOG page for a record * header, advance to the next page (leaving the unused space as zeroes). */ updrqst = false; freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { updrqst = AdvanceXLInsertBuffer(false); freespace = INSERT_FREESPACE(Insert); } /* Compute record's XLOG location */ curridx = Insert->curridx; INSERT_RECPTR(RecPtr, Insert, curridx); /* * If the record is an XLOG_SWITCH, and we are exactly at the start of a * segment, we need not insert it (and don't want to because we'd like * consecutive switch requests to be no-ops). Instead, make sure * everything is written and flushed through the end of the prior segment, * and return the prior segment's end address. */ if (isLogSwitch && (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD) { /* We can release insert lock immediately */ LWLockRelease(WALInsertLock); RecPtr.xrecoff -= SizeOfXLogLongPHD; if (RecPtr.xrecoff == 0) { /* crossing a logid boundary */ RecPtr.xlogid -= 1; RecPtr.xrecoff = XLogFileSize; } LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->Write.LogwrtResult; if (!XLByteLE(RecPtr, LogwrtResult.Flush)) { XLogwrtRqst FlushRqst; FlushRqst.Write = RecPtr; FlushRqst.Flush = RecPtr; XLogWrite(FlushRqst, false, false); } LWLockRelease(WALWriteLock); END_CRIT_SECTION(); return RecPtr; } /* Insert record header */ record = (XLogRecord *) Insert->currpos; record->xl_prev = Insert->PrevRecord; record->xl_xid = GetCurrentTransactionIdIfAny(); record->xl_tot_len = SizeOfXLogRecord + write_len; record->xl_len = len; /* doesn't include backup blocks */ record->xl_info = info; record->xl_rmid = rmid; /* Now we can finish computing the record's CRC */ COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32), SizeOfXLogRecord - sizeof(pg_crc32)); FIN_CRC32(rdata_crc); record->xl_crc = rdata_crc;#ifdef WAL_DEBUG if (XLOG_DEBUG) { StringInfoData buf; initStringInfo(&buf); appendStringInfo(&buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff); xlog_outrec(&buf, record); if (rdata->data != NULL) { appendStringInfo(&buf, " - "); RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data); } elog(LOG, "%s", buf.data); pfree(buf.data); }#endif /* Record begin of record in appropriate places */ ProcLastRecPtr = RecPtr; Insert->PrevRecord = RecPtr; Insert->currpos += SizeOfXLogRecord; freespace -= SizeOfXLogRecord; /* * Append the data, including backup blocks if any
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -