📄 twophase.c
字号:
LWLockRelease(TwoPhaseStateLock); return; } } LWLockRelease(TwoPhaseStateLock); elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);}/* * TransactionIdIsPrepared * True iff transaction associated with the identifier is prepared * for two-phase commit * * Note: only gxacts marked "valid" are considered; but notice we do not * check the locking status. * * This is not currently exported, because it is only needed internally. */static boolTransactionIdIsPrepared(TransactionId xid){ bool result = false; int i; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; if (gxact->valid && gxact->proc.xid == xid) { result = true; break; } } LWLockRelease(TwoPhaseStateLock); return result;}/* * Returns an array of all prepared transactions for the user-level * function pg_prepared_xact. * * The returned array and all its elements are copies of internal data * structures, to minimize the time we need to hold the TwoPhaseStateLock. * * WARNING -- we return even those transactions that are not fully prepared * yet. The caller should filter them out if he doesn't want them. * * The returned array is palloc'd. */static intGetPreparedTransactionList(GlobalTransaction *gxacts){ GlobalTransaction array; int num; int i; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); if (TwoPhaseState->numPrepXacts == 0) { LWLockRelease(TwoPhaseStateLock); *gxacts = NULL; return 0; } num = TwoPhaseState->numPrepXacts; array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num); *gxacts = array; for (i = 0; i < num; i++) memcpy(array + i, TwoPhaseState->prepXacts[i], sizeof(GlobalTransactionData)); LWLockRelease(TwoPhaseStateLock); return num;}/* Working status for pg_prepared_xact */typedef struct{ GlobalTransaction array; int ngxacts; int currIdx;} Working_State;/* * pg_prepared_xact * Produce a view with one row per prepared transaction. * * This function is here so we don't have to export the * GlobalTransactionData struct definition. */Datumpg_prepared_xact(PG_FUNCTION_ARGS){ FuncCallContext *funcctx; Working_State *status; if (SRF_IS_FIRSTCALL()) { TupleDesc tupdesc; MemoryContext oldcontext; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); /* * Switch to memory context appropriate for multiple function calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* build tupdesc for result tuples */ /* this had better match pg_prepared_xacts view in system_views.sql */ tupdesc = CreateTemplateTupleDesc(5, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction", XIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared", TIMESTAMPTZOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid", OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid", OIDOID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); /* * Collect all the 2PC status information that we will format and send * out as a result set. */ status = (Working_State *) palloc(sizeof(Working_State)); funcctx->user_fctx = (void *) status; status->ngxacts = GetPreparedTransactionList(&status->array); status->currIdx = 0; MemoryContextSwitchTo(oldcontext); } funcctx = SRF_PERCALL_SETUP(); status = (Working_State *) funcctx->user_fctx; while (status->array != NULL && status->currIdx < status->ngxacts) { GlobalTransaction gxact = &status->array[status->currIdx++]; Datum values[5]; bool nulls[5]; HeapTuple tuple; Datum result; if (!gxact->valid) continue; /* * Form tuple with appropriate data. */ MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); values[0] = TransactionIdGetDatum(gxact->proc.xid); values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid)); values[2] = TimestampTzGetDatum(gxact->prepared_at); values[3] = ObjectIdGetDatum(gxact->owner); values[4] = ObjectIdGetDatum(gxact->proc.databaseId); tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); result = HeapTupleGetDatum(tuple); SRF_RETURN_NEXT(funcctx, result); } SRF_RETURN_DONE(funcctx);}/* * TwoPhaseGetDummyProc * Get the PGPROC that represents a prepared transaction specified by XID */PGPROC *TwoPhaseGetDummyProc(TransactionId xid){ PGPROC *result = NULL; int i; static TransactionId cached_xid = InvalidTransactionId; static PGPROC *cached_proc = NULL; /* * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called * repeatedly for the same XID. We can save work with a simple cache. */ if (xid == cached_xid) return cached_proc; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; if (gxact->proc.xid == xid) { result = &gxact->proc; break; } } LWLockRelease(TwoPhaseStateLock); if (result == NULL) /* should not happen */ elog(ERROR, "failed to find dummy PGPROC for xid %u", xid); cached_xid = xid; cached_proc = result; return result;}/************************************************************************//* State file support *//************************************************************************/#define TwoPhaseFilePath(path, xid) \ snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)/* * 2PC state file format: * * 1. TwoPhaseFileHeader * 2. TransactionId[] (subtransactions) * 3. RelFileNode[] (files to be deleted at commit) * 4. RelFileNode[] (files to be deleted at abort) * 5. TwoPhaseRecordOnDisk * 6. ... * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID) * 8. CRC32 * * Each segment except the final CRC32 is MAXALIGN'd. *//* * Header for a 2PC state file */#define TWOPHASE_MAGIC 0x57F94531 /* format identifier */typedef struct TwoPhaseFileHeader{ uint32 magic; /* format identifier */ uint32 total_len; /* actual file length */ TransactionId xid; /* original transaction XID */ Oid database; /* OID of database it was in */ TimestampTz prepared_at; /* time of preparation */ Oid owner; /* user running the transaction */ int32 nsubxacts; /* number of following subxact XIDs */ int32 ncommitrels; /* number of delete-on-commit rels */ int32 nabortrels; /* number of delete-on-abort rels */ char gid[GIDSIZE]; /* GID for transaction */} TwoPhaseFileHeader;/* * Header for each record in a state file * * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header. * The rmgr data will be stored starting on a MAXALIGN boundary. */typedef struct TwoPhaseRecordOnDisk{ uint32 len; /* length of rmgr data */ TwoPhaseRmgrId rmid; /* resource manager for this record */ uint16 info; /* flag bits for use by rmgr */} TwoPhaseRecordOnDisk;/* * During prepare, the state file is assembled in memory before writing it * to WAL and the actual state file. We use a chain of XLogRecData blocks * so that we will be able to pass the state file contents directly to * XLogInsert. */static struct xllist{ XLogRecData *head; /* first data block in the chain */ XLogRecData *tail; /* last block in chain */ uint32 bytes_free; /* free bytes left in tail block */ uint32 total_len; /* total data bytes in chain */} records;/* * Append a block of data to records data structure. * * NB: each block is padded to a MAXALIGN multiple. This must be * accounted for when the file is later read! * * The data is copied, so the caller is free to modify it afterwards. */static voidsave_state_data(const void *data, uint32 len){ uint32 padlen = MAXALIGN(len); if (padlen > records.bytes_free) { records.tail->next = palloc0(sizeof(XLogRecData)); records.tail = records.tail->next; records.tail->buffer = InvalidBuffer; records.tail->len = 0; records.tail->next = NULL; records.bytes_free = Max(padlen, 512); records.tail->data = palloc(records.bytes_free); } memcpy(((char *) records.tail->data) + records.tail->len, data, len); records.tail->len += padlen; records.bytes_free -= padlen; records.total_len += padlen;}/* * Start preparing a state file. * * Initializes data structure and inserts the 2PC file header record. */voidStartPrepare(GlobalTransaction gxact){ TransactionId xid = gxact->proc.xid; TwoPhaseFileHeader hdr; TransactionId *children; RelFileNode *commitrels; RelFileNode *abortrels; /* Initialize linked list */ records.head = palloc0(sizeof(XLogRecData)); records.head->buffer = InvalidBuffer; records.head->len = 0; records.head->next = NULL; records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512); records.head->data = palloc(records.bytes_free); records.tail = records.head; records.total_len = 0; /* Create header */ hdr.magic = TWOPHASE_MAGIC; hdr.total_len = 0; /* EndPrepare will fill this in */ hdr.xid = xid; hdr.database = gxact->proc.databaseId; hdr.prepared_at = gxact->prepared_at; hdr.owner = gxact->owner; hdr.nsubxacts = xactGetCommittedChildren(&children); hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL); hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL); StrNCpy(hdr.gid, gxact->gid, GIDSIZE); save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); /* Add the additional info about subxacts and deletable files */ if (hdr.nsubxacts > 0) { save_state_data(children, hdr.nsubxacts * sizeof(TransactionId)); /* While we have the child-xact data, stuff it in the gxact too */ GXactLoadSubxactData(gxact, hdr.nsubxacts, children); } if (hdr.ncommitrels > 0) { save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode)); pfree(commitrels); } if (hdr.nabortrels > 0) { save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode)); pfree(abortrels); }}/* * Finish preparing state file. * * Calculates CRC and writes state file to WAL and in pg_twophase directory. */voidEndPrepare(GlobalTransaction gxact){ TransactionId xid = gxact->proc.xid; TwoPhaseFileHeader *hdr; char path[MAXPGPATH]; XLogRecData *record; pg_crc32 statefile_crc; pg_crc32 bogus_crc; int fd; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, NULL, 0); /* Go back and fill in total_len in the file header record */ hdr = (TwoPhaseFileHeader *) records.head->data; Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32); /* * If the file size exceeds MaxAllocSize, we won't be able to read it in * ReadTwoPhaseFile. Check for that now, rather than fail at commit time. */ if (hdr->total_len > MaxAllocSize) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("two-phase state file maximum length exceeded"))); /* * Create the 2PC state file. * * Note: because we use BasicOpenFile(), we are responsible for ensuring * the FD gets closed in any error exit path. Once we get into the * critical section, though, it doesn't matter since any failure causes * PANIC anyway. */ TwoPhaseFilePath(path, xid); fd = BasicOpenFile(path, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not create two-phase state file \"%s\": %m", path))); /* Write data to file, and calculate CRC as we pass over it */ INIT_CRC32(statefile_crc); for (record = records.head; record != NULL; record = record->next) { COMP_CRC32(statefile_crc, record->data, record->len); if ((write(fd, record->data, record->len)) != record->len) { close(fd); ereport(ERROR, (errcode_for_file_access(), errmsg("could not write two-phase state file: %m")));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -