⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 twophase.c

📁 postgresql8.3.4源码,开源数据库
💻 C
📖 第 1 页 / 共 4 页
字号:
/*------------------------------------------------------------------------- * * twophase.c *		Two-phase commit support functions. * * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION *		$PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.39.2.2 2008/05/19 18:16:46 heikki Exp $ * * NOTES *		Each global transaction is associated with a global transaction *		identifier (GID). The client assigns a GID to a postgres *		transaction with the PREPARE TRANSACTION command. * *		We keep all active global transactions in a shared memory array. *		When the PREPARE TRANSACTION command is issued, the GID is *		reserved for the transaction in the array. This is done before *		a WAL entry is made, because the reservation checks for duplicate *		GIDs and aborts the transaction if there already is a global *		transaction in prepared state with the same GID. * *		A global transaction (gxact) also has a dummy PGPROC that is entered *		into the ProcArray array; this is what keeps the XID considered *		running by TransactionIdIsInProgress.  It is also convenient as a *		PGPROC to hook the gxact's locks to. * *		In order to survive crashes and shutdowns, all prepared *		transactions must be stored in permanent storage. This includes *		locking information, pending notifications etc. All that state *		information is written to the per-transaction state file in *		the pg_twophase directory. * *------------------------------------------------------------------------- */#include "postgres.h"#include <fcntl.h>#include <sys/stat.h>#include <sys/types.h>#include <time.h>#include <unistd.h>#include "access/heapam.h"#include "access/subtrans.h"#include "access/transam.h"#include "access/twophase.h"#include "access/twophase_rmgr.h"#include "access/xact.h"#include "catalog/pg_type.h"#include "funcapi.h"#include "miscadmin.h"#include "pgstat.h"#include "storage/fd.h"#include "storage/procarray.h"#include "storage/smgr.h"#include "utils/builtins.h"#include "utils/memutils.h"/* * Directory where Two-phase commit files reside within PGDATA */#define TWOPHASE_DIR "pg_twophase"/* GUC variable, can't be changed after startup */int			max_prepared_xacts = 5;/* * This struct describes one global transaction that is in prepared state * or attempting to become prepared. * * The first component of the struct is a dummy PGPROC that is inserted * into the global ProcArray so that the transaction appears to still be * running and holding locks.  It must be first because we cast pointers * to PGPROC and pointers to GlobalTransactionData back and forth. * * The lifecycle of a global transaction is: * * 1. After checking that the requested GID is not in use, set up an * entry in the TwoPhaseState->prepXacts array with the correct XID and GID, * with locking_xid = my own XID and valid = false. * * 2. After successfully completing prepare, set valid = true and enter the * contained PGPROC into the global ProcArray. * * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry * is valid and its locking_xid is no longer active, then store my current * XID into locking_xid.  This prevents concurrent attempts to commit or * rollback the same prepared xact. * * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry * from the ProcArray and the TwoPhaseState->prepXacts array and return it to * the freelist. * * Note that if the preparing transaction fails between steps 1 and 2, the * entry will remain in prepXacts until recycled.  We can detect recyclable * entries by checking for valid = false and locking_xid no longer active. * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h */#define GIDSIZE 200typedef struct GlobalTransactionData{	PGPROC		proc;			/* dummy proc */	TimestampTz prepared_at;	/* time of preparation */	XLogRecPtr	prepare_lsn;	/* XLOG offset of prepare record */	Oid			owner;			/* ID of user that executed the xact */	TransactionId locking_xid;	/* top-level XID of backend working on xact */	bool		valid;			/* TRUE if fully prepared */	char		gid[GIDSIZE];	/* The GID assigned to the prepared xact */} GlobalTransactionData;/* * Two Phase Commit shared state.  Access to this struct is protected * by TwoPhaseStateLock. */typedef struct TwoPhaseStateData{	/* Head of linked list of free GlobalTransactionData structs */	SHMEM_OFFSET freeGXacts;	/* Number of valid prepXacts entries. */	int			numPrepXacts;	/*	 * There are max_prepared_xacts items in this array, but C wants a	 * fixed-size array.	 */	GlobalTransaction prepXacts[1];		/* VARIABLE LENGTH ARRAY */} TwoPhaseStateData;			/* VARIABLE LENGTH STRUCT */static TwoPhaseStateData *TwoPhaseState;static void RecordTransactionCommitPrepared(TransactionId xid,								int nchildren,								TransactionId *children,								int nrels,								RelFileNode *rels);static void RecordTransactionAbortPrepared(TransactionId xid,							   int nchildren,							   TransactionId *children,							   int nrels,							   RelFileNode *rels);static void ProcessRecords(char *bufptr, TransactionId xid,			   const TwoPhaseCallback callbacks[]);/* * Initialization of shared memory */SizeTwoPhaseShmemSize(void){	Size		size;	/* Need the fixed struct, the array of pointers, and the GTD structs */	size = offsetof(TwoPhaseStateData, prepXacts);	size = add_size(size, mul_size(max_prepared_xacts,								   sizeof(GlobalTransaction)));	size = MAXALIGN(size);	size = add_size(size, mul_size(max_prepared_xacts,								   sizeof(GlobalTransactionData)));	return size;}voidTwoPhaseShmemInit(void){	bool		found;	TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",									TwoPhaseShmemSize(),									&found);	if (!IsUnderPostmaster)	{		GlobalTransaction gxacts;		int			i;		Assert(!found);		TwoPhaseState->freeGXacts = INVALID_OFFSET;		TwoPhaseState->numPrepXacts = 0;		/*		 * Initialize the linked list of free GlobalTransactionData structs		 */		gxacts = (GlobalTransaction)			((char *) TwoPhaseState +			 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +					  sizeof(GlobalTransaction) * max_prepared_xacts));		for (i = 0; i < max_prepared_xacts; i++)		{			gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;			TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);		}	}	else		Assert(found);}/* * MarkAsPreparing *		Reserve the GID for the given transaction. * * Internally, this creates a gxact struct and puts it into the active array. * NOTE: this is also used when reloading a gxact after a crash; so avoid * assuming that we can use very much backend context. */GlobalTransactionMarkAsPreparing(TransactionId xid, const char *gid,				TimestampTz prepared_at, Oid owner, Oid databaseid){	GlobalTransaction gxact;	int			i;	if (strlen(gid) >= GIDSIZE)		ereport(ERROR,				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),				 errmsg("transaction identifier \"%s\" is too long",						gid)));	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);	/*	 * First, find and recycle any gxacts that failed during prepare. We do	 * this partly to ensure we don't mistakenly say their GIDs are still	 * reserved, and partly so we don't fail on out-of-slots unnecessarily.	 */	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		gxact = TwoPhaseState->prepXacts[i];		if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))		{			/* It's dead Jim ... remove from the active array */			TwoPhaseState->numPrepXacts--;			TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];			/* and put it back in the freelist */			gxact->proc.links.next = TwoPhaseState->freeGXacts;			TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);			/* Back up index count too, so we don't miss scanning one */			i--;		}	}	/* Check for conflicting GID */	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		gxact = TwoPhaseState->prepXacts[i];		if (strcmp(gxact->gid, gid) == 0)		{			ereport(ERROR,					(errcode(ERRCODE_DUPLICATE_OBJECT),					 errmsg("transaction identifier \"%s\" is already in use",							gid)));		}	}	/* Get a free gxact from the freelist */	if (TwoPhaseState->freeGXacts == INVALID_OFFSET)		ereport(ERROR,				(errcode(ERRCODE_OUT_OF_MEMORY),				 errmsg("maximum number of prepared transactions reached"),				 errhint("Increase max_prepared_transactions (currently %d).",						 max_prepared_xacts)));	gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);	TwoPhaseState->freeGXacts = gxact->proc.links.next;	/* Initialize it */	MemSet(&gxact->proc, 0, sizeof(PGPROC));	SHMQueueElemInit(&(gxact->proc.links));	gxact->proc.waitStatus = STATUS_OK;	/* We set up the gxact's VXID as InvalidBackendId/XID */	gxact->proc.lxid = (LocalTransactionId) xid;	gxact->proc.xid = xid;	gxact->proc.xmin = InvalidTransactionId;	gxact->proc.pid = 0;	gxact->proc.backendId = InvalidBackendId;	gxact->proc.databaseId = databaseid;	gxact->proc.roleId = owner;	gxact->proc.inCommit = false;	gxact->proc.vacuumFlags = 0;	gxact->proc.lwWaiting = false;	gxact->proc.lwExclusive = false;	gxact->proc.lwWaitLink = NULL;	gxact->proc.waitLock = NULL;	gxact->proc.waitProcLock = NULL;	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)		SHMQueueInit(&(gxact->proc.myProcLocks[i]));	/* subxid data must be filled later by GXactLoadSubxactData */	gxact->proc.subxids.overflowed = false;	gxact->proc.subxids.nxids = 0;	gxact->prepared_at = prepared_at;	/* initialize LSN to 0 (start of WAL) */	gxact->prepare_lsn.xlogid = 0;	gxact->prepare_lsn.xrecoff = 0;	gxact->owner = owner;	gxact->locking_xid = xid;	gxact->valid = false;	strcpy(gxact->gid, gid);	/* And insert it into the active array */	Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);	TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;	LWLockRelease(TwoPhaseStateLock);	return gxact;}/* * GXactLoadSubxactData * * If the transaction being persisted had any subtransactions, this must * be called before MarkAsPrepared() to load information into the dummy * PGPROC. */static voidGXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,					 TransactionId *children){	/* We need no extra lock since the GXACT isn't valid yet */	if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)	{		gxact->proc.subxids.overflowed = true;		nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;	}	if (nsubxacts > 0)	{		memcpy(gxact->proc.subxids.xids, children,			   nsubxacts * sizeof(TransactionId));		gxact->proc.subxids.nxids = nsubxacts;	}}/* * MarkAsPrepared *		Mark the GXACT as fully valid, and enter it into the global ProcArray. */static voidMarkAsPrepared(GlobalTransaction gxact){	/* Lock here may be overkill, but I'm not convinced of that ... */	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);	Assert(!gxact->valid);	gxact->valid = true;	LWLockRelease(TwoPhaseStateLock);	/*	 * Put it into the global ProcArray so TransactionIdIsInProgress considers	 * the XID as still running.	 */	ProcArrayAdd(&gxact->proc);}/* * LockGXact *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE. */static GlobalTransactionLockGXact(const char *gid, Oid user){	int			i;	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];		/* Ignore not-yet-valid GIDs */		if (!gxact->valid)			continue;		if (strcmp(gxact->gid, gid) != 0)			continue;		/* Found it, but has someone else got it locked? */		if (TransactionIdIsValid(gxact->locking_xid))		{			if (TransactionIdIsActive(gxact->locking_xid))				ereport(ERROR,						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),				errmsg("prepared transaction with identifier \"%s\" is busy",					   gid)));			gxact->locking_xid = InvalidTransactionId;		}		if (user != gxact->owner && !superuser_arg(user))			ereport(ERROR,					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),				  errmsg("permission denied to finish prepared transaction"),					 errhint("Must be superuser or the user that prepared the transaction.")));		/*		 * Note: it probably would be possible to allow committing from		 * another database; but at the moment NOTIFY is known not to work and		 * there may be some other issues as well.	Hence disallow until		 * someone gets motivated to make it work.		 */		if (MyDatabaseId != gxact->proc.databaseId)			ereport(ERROR,					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),				  errmsg("prepared transaction belongs to another database"),					 errhint("Connect to the database where the transaction was prepared to finish it.")));		/* OK for me to lock it */		gxact->locking_xid = GetTopTransactionId();		LWLockRelease(TwoPhaseStateLock);		return gxact;	}	LWLockRelease(TwoPhaseStateLock);	ereport(ERROR,			(errcode(ERRCODE_UNDEFINED_OBJECT),		 errmsg("prepared transaction with identifier \"%s\" does not exist",				gid)));	/* NOTREACHED */	return NULL;}/* * RemoveGXact *		Remove the prepared transaction from the shared memory array. * * NB: caller should have already removed it from ProcArray */static voidRemoveGXact(GlobalTransaction gxact){	int			i;	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		if (gxact == TwoPhaseState->prepXacts[i])		{			/* remove from the active array */			TwoPhaseState->numPrepXacts--;			TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];			/* and put it back in the freelist */			gxact->proc.links.next = TwoPhaseState->freeGXacts;			TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -