⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 twophase.c

📁 postgresql8.3.4源码,开源数据库
💻 C
📖 第 1 页 / 共 4 页
字号:
		}	}	FIN_CRC32(statefile_crc);	/*	 * Write a deliberately bogus CRC to the state file; this is just paranoia	 * to catch the case where four more bytes will run us out of disk space.	 */	bogus_crc = ~statefile_crc;	if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))	{		close(fd);		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not write two-phase state file: %m")));	}	/* Back up to prepare for rewriting the CRC */	if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)	{		close(fd);		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not seek in two-phase state file: %m")));	}	/*	 * The state file isn't valid yet, because we haven't written the correct	 * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.	 *	 * Between the time we have written the WAL entry and the time we write	 * out the correct state file CRC, we have an inconsistency: the xact is	 * prepared according to WAL but not according to our on-disk state. We	 * use a critical section to force a PANIC if we are unable to complete	 * the write --- then, WAL replay should repair the inconsistency.	The	 * odds of a PANIC actually occurring should be very tiny given that we	 * were able to write the bogus CRC above.	 *	 * We have to set inCommit here, too; otherwise a checkpoint starting	 * immediately after the WAL record is inserted could complete without	 * fsync'ing our state file.  (This is essentially the same kind of race	 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit	 * uses inCommit for; see notes there.)	 *	 * We save the PREPARE record's location in the gxact for later use by	 * CheckPointTwoPhase.	 */	START_CRIT_SECTION();	MyProc->inCommit = true;	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,									records.head);	XLogFlush(gxact->prepare_lsn);	/* If we crash now, we have prepared: WAL replay will fix things */	/* write correct CRC and close file */	if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))	{		close(fd);		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not write two-phase state file: %m")));	}	if (close(fd) != 0)		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not close two-phase state file: %m")));	/*	 * Mark the prepared transaction as valid.	As soon as xact.c marks MyProc	 * as not running our XID (which it will do immediately after this	 * function returns), others can commit/rollback the xact.	 *	 * NB: a side effect of this is to make a dummy ProcArray entry for the	 * prepared XID.  This must happen before we clear the XID from MyProc,	 * else there is a window where the XID is not running according to	 * TransactionIdIsInProgress, and onlookers would be entitled to assume	 * the xact crashed.  Instead we have a window where the same XID appears	 * twice in ProcArray, which is OK.	 */	MarkAsPrepared(gxact);	/*	 * Now we can mark ourselves as out of the commit critical section: a	 * checkpoint starting after this will certainly see the gxact as a	 * candidate for fsyncing.	 */	MyProc->inCommit = false;	END_CRIT_SECTION();	records.tail = records.head = NULL;}/* * Register a 2PC record to be written to state file. */voidRegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,					   const void *data, uint32 len){	TwoPhaseRecordOnDisk record;	record.rmid = rmid;	record.info = info;	record.len = len;	save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));	if (len > 0)		save_state_data(data, len);}/* * Read and validate the state file for xid. * * If it looks OK (has a valid magic number and CRC), return the palloc'd * contents of the file.  Otherwise return NULL. */static char *ReadTwoPhaseFile(TransactionId xid){	char		path[MAXPGPATH];	char	   *buf;	TwoPhaseFileHeader *hdr;	int			fd;	struct stat stat;	uint32		crc_offset;	pg_crc32	calc_crc,				file_crc;	TwoPhaseFilePath(path, xid);	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);	if (fd < 0)	{		ereport(WARNING,				(errcode_for_file_access(),				 errmsg("could not open two-phase state file \"%s\": %m",						path)));		return NULL;	}	/*	 * Check file length.  We can determine a lower bound pretty easily. We	 * set an upper bound to avoid palloc() failure on a corrupt file, though	 * we can't guarantee that we won't get an out of memory error anyway,	 * even on a valid file.	 */	if (fstat(fd, &stat))	{		close(fd);		ereport(WARNING,				(errcode_for_file_access(),				 errmsg("could not stat two-phase state file \"%s\": %m",						path)));		return NULL;	}	if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +						MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +						sizeof(pg_crc32)) ||		stat.st_size > MaxAllocSize)	{		close(fd);		return NULL;	}	crc_offset = stat.st_size - sizeof(pg_crc32);	if (crc_offset != MAXALIGN(crc_offset))	{		close(fd);		return NULL;	}	/*	 * OK, slurp in the file.	 */	buf = (char *) palloc(stat.st_size);	if (read(fd, buf, stat.st_size) != stat.st_size)	{		close(fd);		ereport(WARNING,				(errcode_for_file_access(),				 errmsg("could not read two-phase state file \"%s\": %m",						path)));		pfree(buf);		return NULL;	}	close(fd);	hdr = (TwoPhaseFileHeader *) buf;	if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)	{		pfree(buf);		return NULL;	}	INIT_CRC32(calc_crc);	COMP_CRC32(calc_crc, buf, crc_offset);	FIN_CRC32(calc_crc);	file_crc = *((pg_crc32 *) (buf + crc_offset));	if (!EQ_CRC32(calc_crc, file_crc))	{		pfree(buf);		return NULL;	}	return buf;}/* * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED */voidFinishPreparedTransaction(const char *gid, bool isCommit){	GlobalTransaction gxact;	TransactionId xid;	char	   *buf;	char	   *bufptr;	TwoPhaseFileHeader *hdr;	TransactionId latestXid;	TransactionId *children;	RelFileNode *commitrels;	RelFileNode *abortrels;	int			i;	/*	 * Validate the GID, and lock the GXACT to ensure that two backends do not	 * try to commit the same GID at once.	 */	gxact = LockGXact(gid, GetUserId());	xid = gxact->proc.xid;	/*	 * Read and validate the state file	 */	buf = ReadTwoPhaseFile(xid);	if (buf == NULL)		ereport(ERROR,				(errcode(ERRCODE_DATA_CORRUPTED),				 errmsg("two-phase state file for transaction %u is corrupt",						xid)));	/*	 * Disassemble the header area	 */	hdr = (TwoPhaseFileHeader *) buf;	Assert(TransactionIdEquals(hdr->xid, xid));	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));	children = (TransactionId *) bufptr;	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));	commitrels = (RelFileNode *) bufptr;	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));	abortrels = (RelFileNode *) bufptr;	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));	/* compute latestXid among all children */	latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);	/*	 * The order of operations here is critical: make the XLOG entry for	 * commit or abort, then mark the transaction committed or aborted in	 * pg_clog, then remove its PGPROC from the global ProcArray (which means	 * TransactionIdIsInProgress will stop saying the prepared xact is in	 * progress), then run the post-commit or post-abort callbacks. The	 * callbacks will release the locks the transaction held.	 */	if (isCommit)		RecordTransactionCommitPrepared(xid,										hdr->nsubxacts, children,										hdr->ncommitrels, commitrels);	else		RecordTransactionAbortPrepared(xid,									   hdr->nsubxacts, children,									   hdr->nabortrels, abortrels);	ProcArrayRemove(&gxact->proc, latestXid);	/*	 * In case we fail while running the callbacks, mark the gxact invalid so	 * no one else will try to commit/rollback, and so it can be recycled	 * properly later.	It is still locked by our XID so it won't go away yet.	 *	 * (We assume it's safe to do this without taking TwoPhaseStateLock.)	 */	gxact->valid = false;	/*	 * We have to remove any files that were supposed to be dropped. For	 * consistency with the regular xact.c code paths, must do this before	 * releasing locks, so do it before running the callbacks.	 *	 * NB: this code knows that we couldn't be dropping any temp rels ...	 */	if (isCommit)	{		for (i = 0; i < hdr->ncommitrels; i++)			smgrdounlink(smgropen(commitrels[i]), false, false);	}	else	{		for (i = 0; i < hdr->nabortrels; i++)			smgrdounlink(smgropen(abortrels[i]), false, false);	}	/* And now do the callbacks */	if (isCommit)		ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);	else		ProcessRecords(bufptr, xid, twophase_postabort_callbacks);	/* Count the prepared xact as committed or aborted */	AtEOXact_PgStat(isCommit);	/*	 * And now we can clean up our mess.	 */	RemoveTwoPhaseFile(xid, true);	RemoveGXact(gxact);	pfree(buf);}/* * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile) * and call the indicated callbacks for each 2PC record. */static voidProcessRecords(char *bufptr, TransactionId xid,			   const TwoPhaseCallback callbacks[]){	for (;;)	{		TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;		Assert(record->rmid <= TWOPHASE_RM_MAX_ID);		if (record->rmid == TWOPHASE_RM_END_ID)			break;		bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));		if (callbacks[record->rmid] != NULL)			callbacks[record->rmid] (xid, record->info,									 (void *) bufptr, record->len);		bufptr += MAXALIGN(record->len);	}}/* * Remove the 2PC file for the specified XID. * * If giveWarning is false, do not complain about file-not-present; * this is an expected case during WAL replay. */voidRemoveTwoPhaseFile(TransactionId xid, bool giveWarning){	char		path[MAXPGPATH];	TwoPhaseFilePath(path, xid);	if (unlink(path))		if (errno != ENOENT || giveWarning)			ereport(WARNING,					(errcode_for_file_access(),				   errmsg("could not remove two-phase state file \"%s\": %m",						  path)));}/* * Recreates a state file. This is used in WAL replay. * * Note: content and len don't include CRC. */voidRecreateTwoPhaseFile(TransactionId xid, void *content, int len){	char		path[MAXPGPATH];	pg_crc32	statefile_crc;	int			fd;	/* Recompute CRC */	INIT_CRC32(statefile_crc);	COMP_CRC32(statefile_crc, content, len);	FIN_CRC32(statefile_crc);	TwoPhaseFilePath(path, xid);	fd = BasicOpenFile(path,					   O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,					   S_IRUSR | S_IWUSR);	if (fd < 0)		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not recreate two-phase state file \"%s\": %m",						path)));	/* Write content and CRC */	if (write(fd, content, len) != len)	{		close(fd);		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not write two-phase state file: %m")));	}	if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))	{		close(fd);		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not write two-phase state file: %m")));	}	/*	 * We must fsync the file because the end-of-replay checkpoint will not do	 * so, there being no GXACT in shared memory yet to tell it to.	 */	if (pg_fsync(fd) != 0)	{		close(fd);		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not fsync two-phase state file: %m")));	}	if (close(fd) != 0)		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not close two-phase state file: %m")));}/* * CheckPointTwoPhase -- handle 2PC component of checkpointing. * * We must fsync the state file of any GXACT that is valid and has a PREPARE * LSN <= the checkpoint's redo horizon.  (If the gxact isn't valid yet or * has a later LSN, this checkpoint is not responsible for fsyncing it.) * * This is deliberately run as late as possible in the checkpoint sequence, * because GXACTs ordinarily have short lifespans, and so it is quite * possible that GXACTs that were valid at checkpoint start will no longer * exist if we wait a little bit.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -