⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xlog.c

📁 postgresql8.3.4源码,开源数据库
💻 C
📖 第 1 页 / 共 5 页
字号:
static char *str_time(pg_time_t tnow);#ifdef WAL_DEBUGstatic void xlog_outrec(StringInfo buf, XLogRecord *record);#endifstatic void issue_xlog_fsync(void);static void pg_start_backup_callback(int code, Datum arg);static bool read_backup_label(XLogRecPtr *checkPointLoc,				  XLogRecPtr *minRecoveryLoc);static void rm_redo_error_callback(void *arg);/* * Insert an XLOG record having the specified RMID and info bytes, * with the body of the record being the data chunk(s) described by * the rdata chain (see xlog.h for notes about rdata). * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out.  This implements the basic * WAL rule "write the log before the data".) * * NB: this routine feels free to scribble on the XLogRecData structs, * though not on the data they reference.  This is OK since the XLogRecData * structs are always just temporaries in the calling code. */XLogRecPtrXLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata){	XLogCtlInsert *Insert = &XLogCtl->Insert;	XLogRecord *record;	XLogContRecord *contrecord;	XLogRecPtr	RecPtr;	XLogRecPtr	WriteRqst;	uint32		freespace;	int			curridx;	XLogRecData *rdt;	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];	XLogRecPtr	dtbuf_lsn[XLR_MAX_BKP_BLOCKS];	XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];	XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];	XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];	pg_crc32	rdata_crc;	uint32		len,				write_len;	unsigned	i;	bool		updrqst;	bool		doPageWrites;	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);	/* info's high bits are reserved for use by me */	if (info & XLR_INFO_MASK)		elog(PANIC, "invalid xlog info mask %02X", info);	/*	 * In bootstrap mode, we don't actually log anything but XLOG resources;	 * return a phony record pointer.	 */	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)	{		RecPtr.xlogid = 0;		RecPtr.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */		return RecPtr;	}	/*	 * Here we scan the rdata chain, determine which buffers must be backed	 * up, and compute the CRC values for the data.  Note that the record	 * header isn't added into the CRC initially since we don't know the final	 * length or info bits quite yet.  Thus, the CRC will represent the CRC of	 * the whole record in the order "rdata, then backup blocks, then record	 * header".	 *	 * We may have to loop back to here if a race condition is detected below.	 * We could prevent the race by doing all this work while holding the	 * insert lock, but it seems better to avoid doing CRC calculations while	 * holding the lock.  This means we have to be careful about modifying the	 * rdata chain until we know we aren't going to loop back again.  The only	 * change we allow ourselves to make earlier is to set rdt->data = NULL in	 * chain items we have decided we will have to back up the whole buffer	 * for.  This is OK because we will certainly decide the same thing again	 * for those items if we do it over; doing it here saves an extra pass	 * over the chain later.	 */begin:;	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)	{		dtbuf[i] = InvalidBuffer;		dtbuf_bkp[i] = false;	}	/*	 * Decide if we need to do full-page writes in this XLOG record: true if	 * full_page_writes is on or we have a PITR request for it.  Since we	 * don't yet have the insert lock, forcePageWrites could change under us,	 * but we'll recheck it once we have the lock.	 */	doPageWrites = fullPageWrites || Insert->forcePageWrites;	INIT_CRC32(rdata_crc);	len = 0;	for (rdt = rdata;;)	{		if (rdt->buffer == InvalidBuffer)		{			/* Simple data, just include it */			len += rdt->len;			COMP_CRC32(rdata_crc, rdt->data, rdt->len);		}		else		{			/* Find info for buffer */			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)			{				if (rdt->buffer == dtbuf[i])				{					/* Buffer already referenced by earlier chain item */					if (dtbuf_bkp[i])						rdt->data = NULL;					else if (rdt->data)					{						len += rdt->len;						COMP_CRC32(rdata_crc, rdt->data, rdt->len);					}					break;				}				if (dtbuf[i] == InvalidBuffer)				{					/* OK, put it in this slot */					dtbuf[i] = rdt->buffer;					if (XLogCheckBuffer(rdt, doPageWrites,										&(dtbuf_lsn[i]), &(dtbuf_xlg[i])))					{						dtbuf_bkp[i] = true;						rdt->data = NULL;					}					else if (rdt->data)					{						len += rdt->len;						COMP_CRC32(rdata_crc, rdt->data, rdt->len);					}					break;				}			}			if (i >= XLR_MAX_BKP_BLOCKS)				elog(PANIC, "can backup at most %d blocks per xlog record",					 XLR_MAX_BKP_BLOCKS);		}		/* Break out of loop when rdt points to last chain item */		if (rdt->next == NULL)			break;		rdt = rdt->next;	}	/*	 * Now add the backup block headers and data into the CRC	 */	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)	{		if (dtbuf_bkp[i])		{			BkpBlock   *bkpb = &(dtbuf_xlg[i]);			char	   *page;			COMP_CRC32(rdata_crc,					   (char *) bkpb,					   sizeof(BkpBlock));			page = (char *) BufferGetBlock(dtbuf[i]);			if (bkpb->hole_length == 0)			{				COMP_CRC32(rdata_crc,						   page,						   BLCKSZ);			}			else			{				/* must skip the hole */				COMP_CRC32(rdata_crc,						   page,						   bkpb->hole_offset);				COMP_CRC32(rdata_crc,						   page + (bkpb->hole_offset + bkpb->hole_length),						   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));			}		}	}	/*	 * NOTE: We disallow len == 0 because it provides a useful bit of extra	 * error checking in ReadRecord.  This means that all callers of	 * XLogInsert must supply at least some not-in-a-buffer data.  However, we	 * make an exception for XLOG SWITCH records because we don't want them to	 * ever cross a segment boundary.	 */	if (len == 0 && !isLogSwitch)		elog(PANIC, "invalid xlog record length %u", len);	START_CRIT_SECTION();	/* Now wait to get insert lock */	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);	/*	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go	 * back and recompute everything.  This can only happen just after a	 * checkpoint, so it's better to be slow in this case and fast otherwise.	 *	 * If we aren't doing full-page writes then RedoRecPtr doesn't actually	 * affect the contents of the XLOG record, so we'll update our local copy	 * but not force a recomputation.	 */	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))	{		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));		RedoRecPtr = Insert->RedoRecPtr;		if (doPageWrites)		{			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)			{				if (dtbuf[i] == InvalidBuffer)					continue;				if (dtbuf_bkp[i] == false &&					XLByteLE(dtbuf_lsn[i], RedoRecPtr))				{					/*					 * Oops, this buffer now needs to be backed up, but we					 * didn't think so above.  Start over.					 */					LWLockRelease(WALInsertLock);					END_CRIT_SECTION();					goto begin;				}			}		}	}	/*	 * Also check to see if forcePageWrites was just turned on; if we weren't	 * already doing full-page writes then go back and recompute. (If it was	 * just turned off, we could recompute the record without full pages, but	 * we choose not to bother.)	 */	if (Insert->forcePageWrites && !doPageWrites)	{		/* Oops, must redo it with full-page data */		LWLockRelease(WALInsertLock);		END_CRIT_SECTION();		goto begin;	}	/*	 * Make additional rdata chain entries for the backup blocks, so that we	 * don't need to special-case them in the write loop.  Note that we have	 * now irrevocably changed the input rdata chain.  At the exit of this	 * loop, write_len includes the backup block data.	 *	 * Also set the appropriate info bits to show which buffers were backed	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct	 * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.	 */	write_len = len;	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)	{		BkpBlock   *bkpb;		char	   *page;		if (!dtbuf_bkp[i])			continue;		info |= XLR_SET_BKP_BLOCK(i);		bkpb = &(dtbuf_xlg[i]);		page = (char *) BufferGetBlock(dtbuf[i]);		rdt->next = &(dtbuf_rdt1[i]);		rdt = rdt->next;		rdt->data = (char *) bkpb;		rdt->len = sizeof(BkpBlock);		write_len += sizeof(BkpBlock);		rdt->next = &(dtbuf_rdt2[i]);		rdt = rdt->next;		if (bkpb->hole_length == 0)		{			rdt->data = page;			rdt->len = BLCKSZ;			write_len += BLCKSZ;			rdt->next = NULL;		}		else		{			/* must skip the hole */			rdt->data = page;			rdt->len = bkpb->hole_offset;			write_len += bkpb->hole_offset;			rdt->next = &(dtbuf_rdt3[i]);			rdt = rdt->next;			rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);			rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);			write_len += rdt->len;			rdt->next = NULL;		}	}	/*	 * If we backed up any full blocks and online backup is not in progress,	 * mark the backup blocks as removable.  This allows the WAL archiver to	 * know whether it is safe to compress archived WAL data by transforming	 * full-block records into the non-full-block format.	 *	 * Note: we could just set the flag whenever !forcePageWrites, but	 * defining it like this leaves the info bit free for some potential other	 * use in records without any backup blocks.	 */	if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)		info |= XLR_BKP_REMOVABLE;	/*	 * If there isn't enough space on the current XLOG page for a record	 * header, advance to the next page (leaving the unused space as zeroes).	 */	updrqst = false;	freespace = INSERT_FREESPACE(Insert);	if (freespace < SizeOfXLogRecord)	{		updrqst = AdvanceXLInsertBuffer(false);		freespace = INSERT_FREESPACE(Insert);	}	/* Compute record's XLOG location */	curridx = Insert->curridx;	INSERT_RECPTR(RecPtr, Insert, curridx);	/*	 * If the record is an XLOG_SWITCH, and we are exactly at the start of a	 * segment, we need not insert it (and don't want to because we'd like	 * consecutive switch requests to be no-ops).  Instead, make sure	 * everything is written and flushed through the end of the prior segment,	 * and return the prior segment's end address.	 */	if (isLogSwitch &&		(RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)	{		/* We can release insert lock immediately */		LWLockRelease(WALInsertLock);		RecPtr.xrecoff -= SizeOfXLogLongPHD;		if (RecPtr.xrecoff == 0)		{			/* crossing a logid boundary */			RecPtr.xlogid -= 1;			RecPtr.xrecoff = XLogFileSize;		}		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);		LogwrtResult = XLogCtl->Write.LogwrtResult;		if (!XLByteLE(RecPtr, LogwrtResult.Flush))		{			XLogwrtRqst FlushRqst;			FlushRqst.Write = RecPtr;			FlushRqst.Flush = RecPtr;			XLogWrite(FlushRqst, false, false);		}		LWLockRelease(WALWriteLock);		END_CRIT_SECTION();		return RecPtr;	}	/* Insert record header */	record = (XLogRecord *) Insert->currpos;	record->xl_prev = Insert->PrevRecord;	record->xl_xid = GetCurrentTransactionIdIfAny();	record->xl_tot_len = SizeOfXLogRecord + write_len;	record->xl_len = len;		/* doesn't include backup blocks */	record->xl_info = info;	record->xl_rmid = rmid;	/* Now we can finish computing the record's CRC */	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),			   SizeOfXLogRecord - sizeof(pg_crc32));	FIN_CRC32(rdata_crc);	record->xl_crc = rdata_crc;#ifdef WAL_DEBUG	if (XLOG_DEBUG)	{		StringInfoData buf;		initStringInfo(&buf);		appendStringInfo(&buf, "INSERT @ %X/%X: ",						 RecPtr.xlogid, RecPtr.xrecoff);		xlog_outrec(&buf, record);		if (rdata->data != NULL)		{			appendStringInfo(&buf, " - ");			RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);		}		elog(LOG, "%s", buf.data);		pfree(buf.data);	}#endif	/* Record begin of record in appropriate places */	ProcLastRecPtr = RecPtr;	Insert->PrevRecord = RecPtr;	Insert->currpos += SizeOfXLogRecord;	freespace -= SizeOfXLogRecord;	/*	 * Append the data, including backup blocks if any

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -