⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 twophase.c

📁 postgresql8.3.4源码,开源数据库
💻 C
📖 第 1 页 / 共 4 页
字号:
 * * If a GXACT remains valid across multiple checkpoints, it'll be fsynced * each time.  This is considered unusual enough that we don't bother to * expend any extra code to avoid the redundant fsyncs.  (They should be * reasonably cheap anyway, since they won't cause I/O.) */voidCheckPointTwoPhase(XLogRecPtr redo_horizon){	TransactionId *xids;	int			nxids;	char		path[MAXPGPATH];	int			i;	/*	 * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab	 * it just long enough to make a list of the XIDs that require fsyncing,	 * and then do the I/O afterwards.	 *	 * This approach creates a race condition: someone else could delete a	 * GXACT between the time we release TwoPhaseStateLock and the time we try	 * to open its state file.	We handle this by special-casing ENOENT	 * failures: if we see that, we verify that the GXACT is no longer valid,	 * and if so ignore the failure.	 */	if (max_prepared_xacts <= 0)		return;					/* nothing to do */	xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));	nxids = 0;	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];		if (gxact->valid &&			XLByteLE(gxact->prepare_lsn, redo_horizon))			xids[nxids++] = gxact->proc.xid;	}	LWLockRelease(TwoPhaseStateLock);	for (i = 0; i < nxids; i++)	{		TransactionId xid = xids[i];		int			fd;		TwoPhaseFilePath(path, xid);		fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0);		if (fd < 0)		{			if (errno == ENOENT)			{				/* OK if gxact is no longer valid */				if (!TransactionIdIsPrepared(xid))					continue;				/* Restore errno in case it was changed */				errno = ENOENT;			}			ereport(ERROR,					(errcode_for_file_access(),					 errmsg("could not open two-phase state file \"%s\": %m",							path)));		}		if (pg_fsync(fd) != 0)		{			close(fd);			ereport(ERROR,					(errcode_for_file_access(),					 errmsg("could not fsync two-phase state file \"%s\": %m",							path)));		}		if (close(fd) != 0)			ereport(ERROR,					(errcode_for_file_access(),					 errmsg("could not close two-phase state file \"%s\": %m",							path)));	}	pfree(xids);}/* * PrescanPreparedTransactions * * Scan the pg_twophase directory and determine the range of valid XIDs * present.  This is run during database startup, after we have completed * reading WAL.  ShmemVariableCache->nextXid has been set to one more than * the highest XID for which evidence exists in WAL. * * We throw away any prepared xacts with main XID beyond nextXid --- if any * are present, it suggests that the DBA has done a PITR recovery to an * earlier point in time without cleaning out pg_twophase.	We dare not * try to recover such prepared xacts since they likely depend on database * state that doesn't exist now. * * However, we will advance nextXid beyond any subxact XIDs belonging to * valid prepared xacts.  We need to do this since subxact commit doesn't * write a WAL entry, and so there might be no evidence in WAL of those * subxact XIDs. * * Our other responsibility is to determine and return the oldest valid XID * among the prepared xacts (if none, return ShmemVariableCache->nextXid). * This is needed to synchronize pg_subtrans startup properly. */TransactionIdPrescanPreparedTransactions(void){	TransactionId origNextXid = ShmemVariableCache->nextXid;	TransactionId result = origNextXid;	DIR		   *cldir;	struct dirent *clde;	cldir = AllocateDir(TWOPHASE_DIR);	while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)	{		if (strlen(clde->d_name) == 8 &&			strspn(clde->d_name, "0123456789ABCDEF") == 8)		{			TransactionId xid;			char	   *buf;			TwoPhaseFileHeader *hdr;			TransactionId *subxids;			int			i;			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);			/* Reject XID if too new */			if (TransactionIdFollowsOrEquals(xid, origNextXid))			{				ereport(WARNING,						(errmsg("removing future two-phase state file \"%s\"",								clde->d_name)));				RemoveTwoPhaseFile(xid, true);				continue;			}			/*			 * Note: we can't check if already processed because clog			 * subsystem isn't up yet.			 */			/* Read and validate file */			buf = ReadTwoPhaseFile(xid);			if (buf == NULL)			{				ereport(WARNING,					  (errmsg("removing corrupt two-phase state file \"%s\"",							  clde->d_name)));				RemoveTwoPhaseFile(xid, true);				continue;			}			/* Deconstruct header */			hdr = (TwoPhaseFileHeader *) buf;			if (!TransactionIdEquals(hdr->xid, xid))			{				ereport(WARNING,					  (errmsg("removing corrupt two-phase state file \"%s\"",							  clde->d_name)));				RemoveTwoPhaseFile(xid, true);				pfree(buf);				continue;			}			/*			 * OK, we think this file is valid.  Incorporate xid into the			 * running-minimum result.			 */			if (TransactionIdPrecedes(xid, result))				result = xid;			/*			 * Examine subtransaction XIDs ... they should all follow main			 * XID, and they may force us to advance nextXid.			 */			subxids = (TransactionId *)				(buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));			for (i = 0; i < hdr->nsubxacts; i++)			{				TransactionId subxid = subxids[i];				Assert(TransactionIdFollows(subxid, xid));				if (TransactionIdFollowsOrEquals(subxid,												 ShmemVariableCache->nextXid))				{					ShmemVariableCache->nextXid = subxid;					TransactionIdAdvance(ShmemVariableCache->nextXid);				}			}			pfree(buf);		}	}	FreeDir(cldir);	return result;}/* * RecoverPreparedTransactions * * Scan the pg_twophase directory and reload shared-memory state for each * prepared transaction (reacquire locks, etc).  This is run during database * startup. */voidRecoverPreparedTransactions(void){	char		dir[MAXPGPATH];	DIR		   *cldir;	struct dirent *clde;	snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);	cldir = AllocateDir(dir);	while ((clde = ReadDir(cldir, dir)) != NULL)	{		if (strlen(clde->d_name) == 8 &&			strspn(clde->d_name, "0123456789ABCDEF") == 8)		{			TransactionId xid;			char	   *buf;			char	   *bufptr;			TwoPhaseFileHeader *hdr;			TransactionId *subxids;			GlobalTransaction gxact;			int			i;			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);			/* Already processed? */			if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))			{				ereport(WARNING,						(errmsg("removing stale two-phase state file \"%s\"",								clde->d_name)));				RemoveTwoPhaseFile(xid, true);				continue;			}			/* Read and validate file */			buf = ReadTwoPhaseFile(xid);			if (buf == NULL)			{				ereport(WARNING,					  (errmsg("removing corrupt two-phase state file \"%s\"",							  clde->d_name)));				RemoveTwoPhaseFile(xid, true);				continue;			}			ereport(LOG,					(errmsg("recovering prepared transaction %u", xid)));			/* Deconstruct header */			hdr = (TwoPhaseFileHeader *) buf;			Assert(TransactionIdEquals(hdr->xid, xid));			bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));			subxids = (TransactionId *) bufptr;			bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));			bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));			bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));			/*			 * Reconstruct subtrans state for the transaction --- needed			 * because pg_subtrans is not preserved over a restart.  Note that			 * we are linking all the subtransactions directly to the			 * top-level XID; there may originally have been a more complex			 * hierarchy, but there's no need to restore that exactly.			 */			for (i = 0; i < hdr->nsubxacts; i++)				SubTransSetParent(subxids[i], xid);			/*			 * Recreate its GXACT and dummy PGPROC			 *			 * Note: since we don't have the PREPARE record's WAL location at			 * hand, we leave prepare_lsn zeroes.  This means the GXACT will			 * be fsync'd on every future checkpoint.  We assume this			 * situation is infrequent enough that the performance cost is			 * negligible (especially since we know the state file has already			 * been fsynced).			 */			gxact = MarkAsPreparing(xid, hdr->gid,									hdr->prepared_at,									hdr->owner, hdr->database);			GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);			MarkAsPrepared(gxact);			/*			 * Recover other state (notably locks) using resource managers			 */			ProcessRecords(bufptr, xid, twophase_recover_callbacks);			pfree(buf);		}	}	FreeDir(cldir);}/* *	RecordTransactionCommitPrepared * * This is basically the same as RecordTransactionCommit: in particular, * we must set the inCommit flag to avoid a race condition. * * We know the transaction made at least one XLOG entry (its PREPARE), * so it is never possible to optimize out the commit record. */static voidRecordTransactionCommitPrepared(TransactionId xid,								int nchildren,								TransactionId *children,								int nrels,								RelFileNode *rels){	XLogRecData rdata[3];	int			lastrdata = 0;	xl_xact_commit_prepared xlrec;	XLogRecPtr	recptr;	START_CRIT_SECTION();	/* See notes in RecordTransactionCommit */	MyProc->inCommit = true;	/* Emit the XLOG commit record */	xlrec.xid = xid;	xlrec.crec.xact_time = GetCurrentTimestamp();	xlrec.crec.nrels = nrels;	xlrec.crec.nsubxacts = nchildren;	rdata[0].data = (char *) (&xlrec);	rdata[0].len = MinSizeOfXactCommitPrepared;	rdata[0].buffer = InvalidBuffer;	/* dump rels to delete */	if (nrels > 0)	{		rdata[0].next = &(rdata[1]);		rdata[1].data = (char *) rels;		rdata[1].len = nrels * sizeof(RelFileNode);		rdata[1].buffer = InvalidBuffer;		lastrdata = 1;	}	/* dump committed child Xids */	if (nchildren > 0)	{		rdata[lastrdata].next = &(rdata[2]);		rdata[2].data = (char *) children;		rdata[2].len = nchildren * sizeof(TransactionId);		rdata[2].buffer = InvalidBuffer;		lastrdata = 2;	}	rdata[lastrdata].next = NULL;	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);	/*	 * We don't currently try to sleep before flush here ... nor is there any	 * support for async commit of a prepared xact (the very idea is probably	 * a contradiction)	 */	/* Flush XLOG to disk */	XLogFlush(recptr);	/* Mark the transaction committed in pg_clog */	TransactionIdCommit(xid);	/* to avoid race conditions, the parent must commit first */	TransactionIdCommitTree(nchildren, children);	/* Checkpoint can proceed now */	MyProc->inCommit = false;	END_CRIT_SECTION();}/* *	RecordTransactionAbortPrepared * * This is basically the same as RecordTransactionAbort. * * We know the transaction made at least one XLOG entry (its PREPARE), * so it is never possible to optimize out the abort record. */static voidRecordTransactionAbortPrepared(TransactionId xid,							   int nchildren,							   TransactionId *children,							   int nrels,							   RelFileNode *rels){	XLogRecData rdata[3];	int			lastrdata = 0;	xl_xact_abort_prepared xlrec;	XLogRecPtr	recptr;	/*	 * Catch the scenario where we aborted partway through	 * RecordTransactionCommitPrepared ...	 */	if (TransactionIdDidCommit(xid))		elog(PANIC, "cannot abort transaction %u, it was already committed",			 xid);	START_CRIT_SECTION();	/* Emit the XLOG abort record */	xlrec.xid = xid;	xlrec.arec.xact_time = GetCurrentTimestamp();	xlrec.arec.nrels = nrels;	xlrec.arec.nsubxacts = nchildren;	rdata[0].data = (char *) (&xlrec);	rdata[0].len = MinSizeOfXactAbortPrepared;	rdata[0].buffer = InvalidBuffer;	/* dump rels to delete */	if (nrels > 0)	{		rdata[0].next = &(rdata[1]);		rdata[1].data = (char *) rels;		rdata[1].len = nrels * sizeof(RelFileNode);		rdata[1].buffer = InvalidBuffer;		lastrdata = 1;	}	/* dump committed child Xids */	if (nchildren > 0)	{		rdata[lastrdata].next = &(rdata[2]);		rdata[2].data = (char *) children;		rdata[2].len = nchildren * sizeof(TransactionId);		rdata[2].buffer = InvalidBuffer;		lastrdata = 2;	}	rdata[lastrdata].next = NULL;	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);	/* Always flush, since we're about to remove the 2PC state file */	XLogFlush(recptr);	/*	 * Mark the transaction aborted in clog.  This is not absolutely necessary	 * but we may as well do it while we are here.	 */	TransactionIdAbort(xid);	TransactionIdAbortTree(nchildren, children);	END_CRIT_SECTION();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -