⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xlog.c

📁 PostgreSQL7.4.6 for Linux
💻 C
📖 第 1 页 / 共 5 页
字号:
static bool InRedo = false;static bool AdvanceXLInsertBuffer(void);static void XLogWrite(XLogwrtRqst WriteRqst);static int XLogFileInit(uint32 log, uint32 seg,			 bool *use_existent, bool use_lock);static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,					   bool find_free, int max_advance,					   bool use_lock);static int	XLogFileOpen(uint32 log, uint32 seg, bool econt);static void PreallocXlogFiles(XLogRecPtr endptr);static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,					 int whichChkpt,					 char *buffer);static void WriteControlFile(void);static void ReadControlFile(void);static char *str_time(time_t tnow);static void xlog_outrec(char *buf, XLogRecord *record);static void issue_xlog_fsync(void);/* * Insert an XLOG record having the specified RMID and info bytes, * with the body of the record being the data chunk(s) described by * the rdata list (see xlog.h for notes about rdata). * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out.  This implements the basic * WAL rule "write the log before the data".) * * NB: this routine feels free to scribble on the XLogRecData structs, * though not on the data they reference.  This is OK since the XLogRecData * structs are always just temporaries in the calling code. */XLogRecPtrXLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata){	XLogCtlInsert *Insert = &XLogCtl->Insert;	XLogRecord *record;	XLogContRecord *contrecord;	XLogRecPtr	RecPtr;	XLogRecPtr	WriteRqst;	uint32		freespace;	uint16		curridx;	XLogRecData *rdt;	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];	XLogRecPtr	dtbuf_lsn[XLR_MAX_BKP_BLOCKS];	XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];	crc64		rdata_crc;	uint32		len,				write_len;	unsigned	i;	XLogwrtRqst LogwrtRqst;	bool		updrqst;	bool		no_tran = (rmid == RM_XLOG_ID) ? true : false;	if (info & XLR_INFO_MASK)	{		if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)			elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));		no_tran = true;		info &= ~XLR_INFO_MASK;	}	/*	 * In bootstrap mode, we don't actually log anything but XLOG	 * resources; return a phony record pointer.	 */	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)	{		RecPtr.xlogid = 0;		RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */		return (RecPtr);	}	/*	 * Here we scan the rdata list, determine which buffers must be backed	 * up, and compute the CRC values for the data.  Note that the record	 * header isn't added into the CRC yet since we don't know the final	 * length or info bits quite yet.	 *	 * We may have to loop back to here if a race condition is detected	 * below. We could prevent the race by doing all this work while	 * holding the insert lock, but it seems better to avoid doing CRC	 * calculations while holding the lock.  This means we have to be	 * careful about modifying the rdata list until we know we aren't	 * going to loop back again.  The only change we allow ourselves to	 * make earlier is to set rdt->data = NULL in list items we have	 * decided we will have to back up the whole buffer for.  This is OK	 * because we will certainly decide the same thing again for those	 * items if we do it over; doing it here saves an extra pass over the	 * list later.	 */begin:;	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)	{		dtbuf[i] = InvalidBuffer;		dtbuf_bkp[i] = false;	}	INIT_CRC64(rdata_crc);	len = 0;	for (rdt = rdata;;)	{		if (rdt->buffer == InvalidBuffer)		{			/* Simple data, just include it */			len += rdt->len;			COMP_CRC64(rdata_crc, rdt->data, rdt->len);		}		else		{			/* Find info for buffer */			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)			{				if (rdt->buffer == dtbuf[i])				{					/* Buffer already referenced by earlier list item */					if (dtbuf_bkp[i])						rdt->data = NULL;					else if (rdt->data)					{						len += rdt->len;						COMP_CRC64(rdata_crc, rdt->data, rdt->len);					}					break;				}				if (dtbuf[i] == InvalidBuffer)				{					/* OK, put it in this slot */					dtbuf[i] = rdt->buffer;					/*					 * XXX We assume page LSN is first data on page					 */					dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer));					if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))					{						crc64		dtcrc;						dtbuf_bkp[i] = true;						rdt->data = NULL;						INIT_CRC64(dtcrc);						COMP_CRC64(dtcrc,								   BufferGetBlock(dtbuf[i]),								   BLCKSZ);						dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);						dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);						COMP_CRC64(dtcrc,								(char *) &(dtbuf_xlg[i]) + sizeof(crc64),								   sizeof(BkpBlock) - sizeof(crc64));						FIN_CRC64(dtcrc);						dtbuf_xlg[i].crc = dtcrc;					}					else if (rdt->data)					{						len += rdt->len;						COMP_CRC64(rdata_crc, rdt->data, rdt->len);					}					break;				}			}			if (i >= XLR_MAX_BKP_BLOCKS)				elog(PANIC, "can backup at most %d blocks per xlog record",					 XLR_MAX_BKP_BLOCKS);		}		/* Break out of loop when rdt points to last list item */		if (rdt->next == NULL)			break;		rdt = rdt->next;	}	/*	 * NOTE: the test for len == 0 here is somewhat fishy, since in theory	 * all of the rmgr data might have been suppressed in favor of backup	 * blocks.	Currently, all callers of XLogInsert provide at least some	 * not-in-a-buffer data and so len == 0 should never happen, but that	 * may not be true forever.  If you need to remove the len == 0 check,	 * also remove the check for xl_len == 0 in ReadRecord, below.	 */	if (len == 0 || len > MAXLOGRECSZ)		elog(PANIC, "invalid xlog record length %u", len);	START_CRIT_SECTION();	/* update LogwrtResult before doing cache fill check */	{		/* use volatile pointer to prevent code rearrangement */		volatile XLogCtlData *xlogctl = XLogCtl;		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);		LogwrtRqst = xlogctl->LogwrtRqst;		LogwrtResult = xlogctl->LogwrtResult;		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);	}	/*	 * If cache is half filled then try to acquire write lock and do	 * XLogWrite. Ignore any fractional blocks in performing this check.	 */	LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;	if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||		(LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +		 XLogCtl->XLogCacheByte / 2))	{		if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))		{			LogwrtResult = XLogCtl->Write.LogwrtResult;			if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))				XLogWrite(LogwrtRqst);			LWLockRelease(WALWriteLock);		}	}	/* Now wait to get insert lock */	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);	/*	 * Check to see if my RedoRecPtr is out of date.  If so, may have to	 * go back and recompute everything.  This can only happen just after	 * a checkpoint, so it's better to be slow in this case and fast	 * otherwise.	 */	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))	{		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));		RedoRecPtr = Insert->RedoRecPtr;		for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)		{			if (dtbuf[i] == InvalidBuffer)				continue;			if (dtbuf_bkp[i] == false &&				XLByteLE(dtbuf_lsn[i], RedoRecPtr))			{				/*				 * Oops, this buffer now needs to be backed up, but we				 * didn't think so above.  Start over.				 */				LWLockRelease(WALInsertLock);				END_CRIT_SECTION();				goto begin;			}		}	}	/*	 * Make additional rdata list entries for the backup blocks, so that	 * we don't need to special-case them in the write loop.  Note that we	 * have now irrevocably changed the input rdata list.  At the exit of	 * this loop, write_len includes the backup block data.	 *	 * Also set the appropriate info bits to show which buffers were backed	 * up.	The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th	 * distinct buffer value (ignoring InvalidBuffer) appearing in the	 * rdata list.	 */	write_len = len;	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)	{		if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))			continue;		info |= XLR_SET_BKP_BLOCK(i);		rdt->next = &(dtbuf_rdt[2 * i]);		dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]);		dtbuf_rdt[2 * i].len = sizeof(BkpBlock);		write_len += sizeof(BkpBlock);		rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);		dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]);		dtbuf_rdt[2 * i + 1].len = BLCKSZ;		write_len += BLCKSZ;		dtbuf_rdt[2 * i + 1].next = NULL;	}	/* Insert record header */	updrqst = false;	freespace = INSERT_FREESPACE(Insert);	if (freespace < SizeOfXLogRecord)	{		updrqst = AdvanceXLInsertBuffer();		freespace = BLCKSZ - SizeOfXLogPHD;	}	curridx = Insert->curridx;	record = (XLogRecord *) Insert->currpos;	record->xl_prev = Insert->PrevRecord;	if (no_tran)	{		record->xl_xact_prev.xlogid = 0;		record->xl_xact_prev.xrecoff = 0;	}	else		record->xl_xact_prev = MyLastRecPtr;	record->xl_xid = GetCurrentTransactionId();	record->xl_len = len;		/* doesn't include backup blocks */	record->xl_info = info;	record->xl_rmid = rmid;	/* Now we can finish computing the main CRC */	COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),			   SizeOfXLogRecord - sizeof(crc64));	FIN_CRC64(rdata_crc);	record->xl_crc = rdata_crc;	/* Compute record's XLOG location */	INSERT_RECPTR(RecPtr, Insert, curridx);	/* If first XLOG record of transaction, save it in PGPROC array */	if (MyLastRecPtr.xrecoff == 0 && !no_tran)	{		/*		 * We do not acquire SInvalLock here because of possible deadlock.		 * Anyone who wants to inspect other procs' logRec must acquire		 * WALInsertLock, instead.	A better solution would be a per-PROC		 * spinlock, but no time for that before 7.2 --- tgl 12/19/01.		 */		MyProc->logRec = RecPtr;	}	if (XLOG_DEBUG)	{		char		buf[8192];		sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff);		xlog_outrec(buf, record);		if (rdata->data != NULL)		{			strcat(buf, " - ");			RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);		}		elog(LOG, "%s", buf);	}	/* Record begin of record in appropriate places */	if (!no_tran)		MyLastRecPtr = RecPtr;	ProcLastRecPtr = RecPtr;	Insert->PrevRecord = RecPtr;	MyXactMadeXLogEntry = true;	Insert->currpos += SizeOfXLogRecord;	freespace -= SizeOfXLogRecord;	/*	 * Append the data, including backup blocks if any	 */	while (write_len)	{		while (rdata->data == NULL)			rdata = rdata->next;		if (freespace > 0)		{			if (rdata->len > freespace)			{				memcpy(Insert->currpos, rdata->data, freespace);				rdata->data += freespace;				rdata->len -= freespace;				write_len -= freespace;			}			else			{				memcpy(Insert->currpos, rdata->data, rdata->len);				freespace -= rdata->len;				write_len -= rdata->len;				Insert->currpos += rdata->len;				rdata = rdata->next;				continue;			}		}		/* Use next buffer */		updrqst = AdvanceXLInsertBuffer();		curridx = Insert->curridx;		/* Insert cont-record header */		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;		contrecord = (XLogContRecord *) Insert->currpos;		contrecord->xl_rem_len = write_len;		Insert->currpos += SizeOfXLogContRecord;		freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;	}	/* Ensure next record will be properly aligned */	Insert->currpos = (char *) Insert->currpage +		MAXALIGN(Insert->currpos - (char *) Insert->currpage);	freespace = INSERT_FREESPACE(Insert);	/*	 * The recptr I return is the beginning of the *next* record. This	 * will be stored as LSN for changed data pages...	 */	INSERT_RECPTR(RecPtr, Insert, curridx);	/* Need to update shared LogwrtRqst if some block was filled up */	if (freespace < SizeOfXLogRecord)		updrqst = true;			/* curridx is filled and available for								 * writing out */	else		curridx = PrevBufIdx(curridx);	WriteRqst = XLogCtl->xlblocks[curridx];	LWLockRelease(WALInsertLock);	if (updrqst)	{		/* use volatile pointer to prevent code rearrangement */		volatile XLogCtlData *xlogctl = XLogCtl;		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);		/* advance global request to include new block(s) */		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))			xlogctl->LogwrtRqst.Write = WriteRqst;		/* update local result copy while I have the chance */		LogwrtResult = xlogctl->LogwrtResult;		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);	}	ProcLastRecEnd = RecPtr;	END_CRIT_SECTION();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -