📄 twophase.c
字号:
* * If a GXACT remains valid across multiple checkpoints, it'll be fsynced * each time. This is considered unusual enough that we don't bother to * expend any extra code to avoid the redundant fsyncs. (They should be * reasonably cheap anyway, since they won't cause I/O.) */voidCheckPointTwoPhase(XLogRecPtr redo_horizon){ TransactionId *xids; int nxids; char path[MAXPGPATH]; int i; /* * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab * it just long enough to make a list of the XIDs that require fsyncing, * and then do the I/O afterwards. * * This approach creates a race condition: someone else could delete a * GXACT between the time we release TwoPhaseStateLock and the time we try * to open its state file. We handle this by special-casing ENOENT * failures: if we see that, we verify that the GXACT is no longer valid, * and if so ignore the failure. */ if (max_prepared_xacts <= 0) return; /* nothing to do */ xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId)); nxids = 0; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; if (gxact->valid && XLByteLE(gxact->prepare_lsn, redo_horizon)) xids[nxids++] = gxact->proc.xid; } LWLockRelease(TwoPhaseStateLock); for (i = 0; i < nxids; i++) { TransactionId xid = xids[i]; int fd; TwoPhaseFilePath(path, xid); fd = BasicOpenFile(path, O_RDWR | PG_BINARY, 0); if (fd < 0) { if (errno == ENOENT) { /* OK if gxact is no longer valid */ if (!TransactionIdIsPrepared(xid)) continue; /* Restore errno in case it was changed */ errno = ENOENT; } ereport(ERROR, (errcode_for_file_access(), errmsg("could not open two-phase state file \"%s\": %m", path))); } if (pg_fsync(fd) != 0) { close(fd); ereport(ERROR, (errcode_for_file_access(), errmsg("could not fsync two-phase state file \"%s\": %m", path))); } if (close(fd) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close two-phase state file \"%s\": %m", path))); } pfree(xids);}/* * PrescanPreparedTransactions * * Scan the pg_twophase directory and determine the range of valid XIDs * present. This is run during database startup, after we have completed * reading WAL. ShmemVariableCache->nextXid has been set to one more than * the highest XID for which evidence exists in WAL. * * We throw away any prepared xacts with main XID beyond nextXid --- if any * are present, it suggests that the DBA has done a PITR recovery to an * earlier point in time without cleaning out pg_twophase. We dare not * try to recover such prepared xacts since they likely depend on database * state that doesn't exist now. * * However, we will advance nextXid beyond any subxact XIDs belonging to * valid prepared xacts. We need to do this since subxact commit doesn't * write a WAL entry, and so there might be no evidence in WAL of those * subxact XIDs. * * Our other responsibility is to determine and return the oldest valid XID * among the prepared xacts (if none, return ShmemVariableCache->nextXid). * This is needed to synchronize pg_subtrans startup properly. */TransactionIdPrescanPreparedTransactions(void){ TransactionId origNextXid = ShmemVariableCache->nextXid; TransactionId result = origNextXid; DIR *cldir; struct dirent *clde; cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { if (strlen(clde->d_name) == 8 && strspn(clde->d_name, "0123456789ABCDEF") == 8) { TransactionId xid; char *buf; TwoPhaseFileHeader *hdr; TransactionId *subxids; int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); /* Reject XID if too new */ if (TransactionIdFollowsOrEquals(xid, origNextXid)) { ereport(WARNING, (errmsg("removing future two-phase state file \"%s\"", clde->d_name))); RemoveTwoPhaseFile(xid, true); continue; } /* * Note: we can't check if already processed because clog * subsystem isn't up yet. */ /* Read and validate file */ buf = ReadTwoPhaseFile(xid); if (buf == NULL) { ereport(WARNING, (errmsg("removing corrupt two-phase state file \"%s\"", clde->d_name))); RemoveTwoPhaseFile(xid, true); continue; } /* Deconstruct header */ hdr = (TwoPhaseFileHeader *) buf; if (!TransactionIdEquals(hdr->xid, xid)) { ereport(WARNING, (errmsg("removing corrupt two-phase state file \"%s\"", clde->d_name))); RemoveTwoPhaseFile(xid, true); pfree(buf); continue; } /* * OK, we think this file is valid. Incorporate xid into the * running-minimum result. */ if (TransactionIdPrecedes(xid, result)) result = xid; /* * Examine subtransaction XIDs ... they should all follow main * XID, and they may force us to advance nextXid. */ subxids = (TransactionId *) (buf + MAXALIGN(sizeof(TwoPhaseFileHeader))); for (i = 0; i < hdr->nsubxacts; i++) { TransactionId subxid = subxids[i]; Assert(TransactionIdFollows(subxid, xid)); if (TransactionIdFollowsOrEquals(subxid, ShmemVariableCache->nextXid)) { ShmemVariableCache->nextXid = subxid; TransactionIdAdvance(ShmemVariableCache->nextXid); } } pfree(buf); } } FreeDir(cldir); return result;}/* * RecoverPreparedTransactions * * Scan the pg_twophase directory and reload shared-memory state for each * prepared transaction (reacquire locks, etc). This is run during database * startup. */voidRecoverPreparedTransactions(void){ char dir[MAXPGPATH]; DIR *cldir; struct dirent *clde; snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); cldir = AllocateDir(dir); while ((clde = ReadDir(cldir, dir)) != NULL) { if (strlen(clde->d_name) == 8 && strspn(clde->d_name, "0123456789ABCDEF") == 8) { TransactionId xid; char *buf; char *bufptr; TwoPhaseFileHeader *hdr; TransactionId *subxids; GlobalTransaction gxact; int i; xid = (TransactionId) strtoul(clde->d_name, NULL, 16); /* Already processed? */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { ereport(WARNING, (errmsg("removing stale two-phase state file \"%s\"", clde->d_name))); RemoveTwoPhaseFile(xid, true); continue; } /* Read and validate file */ buf = ReadTwoPhaseFile(xid); if (buf == NULL) { ereport(WARNING, (errmsg("removing corrupt two-phase state file \"%s\"", clde->d_name))); RemoveTwoPhaseFile(xid, true); continue; } ereport(LOG, (errmsg("recovering prepared transaction %u", xid))); /* Deconstruct header */ hdr = (TwoPhaseFileHeader *) buf; Assert(TransactionIdEquals(hdr->xid, xid)); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); subxids = (TransactionId *) bufptr; bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); /* * Reconstruct subtrans state for the transaction --- needed * because pg_subtrans is not preserved over a restart. Note that * we are linking all the subtransactions directly to the * top-level XID; there may originally have been a more complex * hierarchy, but there's no need to restore that exactly. */ for (i = 0; i < hdr->nsubxacts; i++) SubTransSetParent(subxids[i], xid); /* * Recreate its GXACT and dummy PGPROC * * Note: since we don't have the PREPARE record's WAL location at * hand, we leave prepare_lsn zeroes. This means the GXACT will * be fsync'd on every future checkpoint. We assume this * situation is infrequent enough that the performance cost is * negligible (especially since we know the state file has already * been fsynced). */ gxact = MarkAsPreparing(xid, hdr->gid, hdr->prepared_at, hdr->owner, hdr->database); GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); MarkAsPrepared(gxact); /* * Recover other state (notably locks) using resource managers */ ProcessRecords(bufptr, xid, twophase_recover_callbacks); pfree(buf); } } FreeDir(cldir);}/* * RecordTransactionCommitPrepared * * This is basically the same as RecordTransactionCommit: in particular, * we must set the inCommit flag to avoid a race condition. * * We know the transaction made at least one XLOG entry (its PREPARE), * so it is never possible to optimize out the commit record. */static voidRecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileNode *rels){ XLogRecData rdata[3]; int lastrdata = 0; xl_xact_commit_prepared xlrec; XLogRecPtr recptr; START_CRIT_SECTION(); /* See notes in RecordTransactionCommit */ MyProc->inCommit = true; /* Emit the XLOG commit record */ xlrec.xid = xid; xlrec.crec.xact_time = GetCurrentTimestamp(); xlrec.crec.nrels = nrels; xlrec.crec.nsubxacts = nchildren; rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommitPrepared; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; } /* dump committed child Xids */ if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); rdata[2].data = (char *) children; rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } rdata[lastrdata].next = NULL; recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata); /* * We don't currently try to sleep before flush here ... nor is there any * support for async commit of a prepared xact (the very idea is probably * a contradiction) */ /* Flush XLOG to disk */ XLogFlush(recptr); /* Mark the transaction committed in pg_clog */ TransactionIdCommit(xid); /* to avoid race conditions, the parent must commit first */ TransactionIdCommitTree(nchildren, children); /* Checkpoint can proceed now */ MyProc->inCommit = false; END_CRIT_SECTION();}/* * RecordTransactionAbortPrepared * * This is basically the same as RecordTransactionAbort. * * We know the transaction made at least one XLOG entry (its PREPARE), * so it is never possible to optimize out the abort record. */static voidRecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, RelFileNode *rels){ XLogRecData rdata[3]; int lastrdata = 0; xl_xact_abort_prepared xlrec; XLogRecPtr recptr; /* * Catch the scenario where we aborted partway through * RecordTransactionCommitPrepared ... */ if (TransactionIdDidCommit(xid)) elog(PANIC, "cannot abort transaction %u, it was already committed", xid); START_CRIT_SECTION(); /* Emit the XLOG abort record */ xlrec.xid = xid; xlrec.arec.xact_time = GetCurrentTimestamp(); xlrec.arec.nrels = nrels; xlrec.arec.nsubxacts = nchildren; rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactAbortPrepared; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; } /* dump committed child Xids */ if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); rdata[2].data = (char *) children; rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } rdata[lastrdata].next = NULL; recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); /* * Mark the transaction aborted in clog. This is not absolutely necessary * but we may as well do it while we are here. */ TransactionIdAbort(xid); TransactionIdAbortTree(nchildren, children); END_CRIT_SECTION();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -