📄 twophase.c
字号:
/*------------------------------------------------------------------------- * * twophase.c * Two-phase commit support functions. * * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.39.2.2 2008/05/19 18:16:46 heikki Exp $ * * NOTES * Each global transaction is associated with a global transaction * identifier (GID). The client assigns a GID to a postgres * transaction with the PREPARE TRANSACTION command. * * We keep all active global transactions in a shared memory array. * When the PREPARE TRANSACTION command is issued, the GID is * reserved for the transaction in the array. This is done before * a WAL entry is made, because the reservation checks for duplicate * GIDs and aborts the transaction if there already is a global * transaction in prepared state with the same GID. * * A global transaction (gxact) also has a dummy PGPROC that is entered * into the ProcArray array; this is what keeps the XID considered * running by TransactionIdIsInProgress. It is also convenient as a * PGPROC to hook the gxact's locks to. * * In order to survive crashes and shutdowns, all prepared * transactions must be stored in permanent storage. This includes * locking information, pending notifications etc. All that state * information is written to the per-transaction state file in * the pg_twophase directory. * *------------------------------------------------------------------------- */#include "postgres.h"#include <fcntl.h>#include <sys/stat.h>#include <sys/types.h>#include <time.h>#include <unistd.h>#include "access/heapam.h"#include "access/subtrans.h"#include "access/transam.h"#include "access/twophase.h"#include "access/twophase_rmgr.h"#include "access/xact.h"#include "catalog/pg_type.h"#include "funcapi.h"#include "miscadmin.h"#include "pgstat.h"#include "storage/fd.h"#include "storage/procarray.h"#include "storage/smgr.h"#include "utils/builtins.h"#include "utils/memutils.h"/* * Directory where Two-phase commit files reside within PGDATA */#define TWOPHASE_DIR "pg_twophase"/* GUC variable, can't be changed after startup */int max_prepared_xacts = 5;/* * This struct describes one global transaction that is in prepared state * or attempting to become prepared. * * The first component of the struct is a dummy PGPROC that is inserted * into the global ProcArray so that the transaction appears to still be * running and holding locks. It must be first because we cast pointers * to PGPROC and pointers to GlobalTransactionData back and forth. * * The lifecycle of a global transaction is: * * 1. After checking that the requested GID is not in use, set up an * entry in the TwoPhaseState->prepXacts array with the correct XID and GID, * with locking_xid = my own XID and valid = false. * * 2. After successfully completing prepare, set valid = true and enter the * contained PGPROC into the global ProcArray. * * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry * is valid and its locking_xid is no longer active, then store my current * XID into locking_xid. This prevents concurrent attempts to commit or * rollback the same prepared xact. * * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry * from the ProcArray and the TwoPhaseState->prepXacts array and return it to * the freelist. * * Note that if the preparing transaction fails between steps 1 and 2, the * entry will remain in prepXacts until recycled. We can detect recyclable * entries by checking for valid = false and locking_xid no longer active. * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h */#define GIDSIZE 200typedef struct GlobalTransactionData{ PGPROC proc; /* dummy proc */ TimestampTz prepared_at; /* time of preparation */ XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */ Oid owner; /* ID of user that executed the xact */ TransactionId locking_xid; /* top-level XID of backend working on xact */ bool valid; /* TRUE if fully prepared */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */} GlobalTransactionData;/* * Two Phase Commit shared state. Access to this struct is protected * by TwoPhaseStateLock. */typedef struct TwoPhaseStateData{ /* Head of linked list of free GlobalTransactionData structs */ SHMEM_OFFSET freeGXacts; /* Number of valid prepXacts entries. */ int numPrepXacts; /* * There are max_prepared_xacts items in this array, but C wants a * fixed-size array. */ GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */} TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */static TwoPhaseStateData *TwoPhaseState;static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileNode *rels);static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileNode *rels);static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]);/* * Initialization of shared memory */SizeTwoPhaseShmemSize(void){ Size size; /* Need the fixed struct, the array of pointers, and the GTD structs */ size = offsetof(TwoPhaseStateData, prepXacts); size = add_size(size, mul_size(max_prepared_xacts, sizeof(GlobalTransaction))); size = MAXALIGN(size); size = add_size(size, mul_size(max_prepared_xacts, sizeof(GlobalTransactionData))); return size;}voidTwoPhaseShmemInit(void){ bool found; TwoPhaseState = ShmemInitStruct("Prepared Transaction Table", TwoPhaseShmemSize(), &found); if (!IsUnderPostmaster) { GlobalTransaction gxacts; int i; Assert(!found); TwoPhaseState->freeGXacts = INVALID_OFFSET; TwoPhaseState->numPrepXacts = 0; /* * Initialize the linked list of free GlobalTransactionData structs */ gxacts = (GlobalTransaction) ((char *) TwoPhaseState + MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + sizeof(GlobalTransaction) * max_prepared_xacts)); for (i = 0; i < max_prepared_xacts; i++) { gxacts[i].proc.links.next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]); } } else Assert(found);}/* * MarkAsPreparing * Reserve the GID for the given transaction. * * Internally, this creates a gxact struct and puts it into the active array. * NOTE: this is also used when reloading a gxact after a crash; so avoid * assuming that we can use very much backend context. */GlobalTransactionMarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid){ GlobalTransaction gxact; int i; if (strlen(gid) >= GIDSIZE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("transaction identifier \"%s\" is too long", gid))); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); /* * First, find and recycle any gxacts that failed during prepare. We do * this partly to ensure we don't mistakenly say their GIDs are still * reserved, and partly so we don't fail on out-of-slots unnecessarily. */ for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid)) { /* It's dead Jim ... remove from the active array */ TwoPhaseState->numPrepXacts--; TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; /* and put it back in the freelist */ gxact->proc.links.next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact); /* Back up index count too, so we don't miss scanning one */ i--; } } /* Check for conflicting GID */ for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; if (strcmp(gxact->gid, gid) == 0) { ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("transaction identifier \"%s\" is already in use", gid))); } } /* Get a free gxact from the freelist */ if (TwoPhaseState->freeGXacts == INVALID_OFFSET) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("maximum number of prepared transactions reached"), errhint("Increase max_prepared_transactions (currently %d).", max_prepared_xacts))); gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts); TwoPhaseState->freeGXacts = gxact->proc.links.next; /* Initialize it */ MemSet(&gxact->proc, 0, sizeof(PGPROC)); SHMQueueElemInit(&(gxact->proc.links)); gxact->proc.waitStatus = STATUS_OK; /* We set up the gxact's VXID as InvalidBackendId/XID */ gxact->proc.lxid = (LocalTransactionId) xid; gxact->proc.xid = xid; gxact->proc.xmin = InvalidTransactionId; gxact->proc.pid = 0; gxact->proc.backendId = InvalidBackendId; gxact->proc.databaseId = databaseid; gxact->proc.roleId = owner; gxact->proc.inCommit = false; gxact->proc.vacuumFlags = 0; gxact->proc.lwWaiting = false; gxact->proc.lwExclusive = false; gxact->proc.lwWaitLink = NULL; gxact->proc.waitLock = NULL; gxact->proc.waitProcLock = NULL; for (i = 0; i < NUM_LOCK_PARTITIONS; i++) SHMQueueInit(&(gxact->proc.myProcLocks[i])); /* subxid data must be filled later by GXactLoadSubxactData */ gxact->proc.subxids.overflowed = false; gxact->proc.subxids.nxids = 0; gxact->prepared_at = prepared_at; /* initialize LSN to 0 (start of WAL) */ gxact->prepare_lsn.xlogid = 0; gxact->prepare_lsn.xrecoff = 0; gxact->owner = owner; gxact->locking_xid = xid; gxact->valid = false; strcpy(gxact->gid, gid); /* And insert it into the active array */ Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; LWLockRelease(TwoPhaseStateLock); return gxact;}/* * GXactLoadSubxactData * * If the transaction being persisted had any subtransactions, this must * be called before MarkAsPrepared() to load information into the dummy * PGPROC. */static voidGXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, TransactionId *children){ /* We need no extra lock since the GXACT isn't valid yet */ if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS) { gxact->proc.subxids.overflowed = true; nsubxacts = PGPROC_MAX_CACHED_SUBXIDS; } if (nsubxacts > 0) { memcpy(gxact->proc.subxids.xids, children, nsubxacts * sizeof(TransactionId)); gxact->proc.subxids.nxids = nsubxacts; }}/* * MarkAsPrepared * Mark the GXACT as fully valid, and enter it into the global ProcArray. */static voidMarkAsPrepared(GlobalTransaction gxact){ /* Lock here may be overkill, but I'm not convinced of that ... */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); Assert(!gxact->valid); gxact->valid = true; LWLockRelease(TwoPhaseStateLock); /* * Put it into the global ProcArray so TransactionIdIsInProgress considers * the XID as still running. */ ProcArrayAdd(&gxact->proc);}/* * LockGXact * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. */static GlobalTransactionLockGXact(const char *gid, Oid user){ int i; LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; /* Ignore not-yet-valid GIDs */ if (!gxact->valid) continue; if (strcmp(gxact->gid, gid) != 0) continue; /* Found it, but has someone else got it locked? */ if (TransactionIdIsValid(gxact->locking_xid)) { if (TransactionIdIsActive(gxact->locking_xid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("prepared transaction with identifier \"%s\" is busy", gid))); gxact->locking_xid = InvalidTransactionId; } if (user != gxact->owner && !superuser_arg(user)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied to finish prepared transaction"), errhint("Must be superuser or the user that prepared the transaction."))); /* * Note: it probably would be possible to allow committing from * another database; but at the moment NOTIFY is known not to work and * there may be some other issues as well. Hence disallow until * someone gets motivated to make it work. */ if (MyDatabaseId != gxact->proc.databaseId) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("prepared transaction belongs to another database"), errhint("Connect to the database where the transaction was prepared to finish it."))); /* OK for me to lock it */ gxact->locking_xid = GetTopTransactionId(); LWLockRelease(TwoPhaseStateLock); return gxact; } LWLockRelease(TwoPhaseStateLock); ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("prepared transaction with identifier \"%s\" does not exist", gid))); /* NOTREACHED */ return NULL;}/* * RemoveGXact * Remove the prepared transaction from the shared memory array. * * NB: caller should have already removed it from ProcArray */static voidRemoveGXact(GlobalTransaction gxact){ int i; LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { if (gxact == TwoPhaseState->prepXacts[i]) { /* remove from the active array */ TwoPhaseState->numPrepXacts--; TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; /* and put it back in the freelist */ gxact->proc.links.next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -