⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 twophase.c

📁 postgresql8.3.4源码,开源数据库
💻 C
📖 第 1 页 / 共 4 页
字号:
			LWLockRelease(TwoPhaseStateLock);			return;		}	}	LWLockRelease(TwoPhaseStateLock);	elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);}/* * TransactionIdIsPrepared *		True iff transaction associated with the identifier is prepared *		for two-phase commit * * Note: only gxacts marked "valid" are considered; but notice we do not * check the locking status. * * This is not currently exported, because it is only needed internally. */static boolTransactionIdIsPrepared(TransactionId xid){	bool		result = false;	int			i;	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];		if (gxact->valid && gxact->proc.xid == xid)		{			result = true;			break;		}	}	LWLockRelease(TwoPhaseStateLock);	return result;}/* * Returns an array of all prepared transactions for the user-level * function pg_prepared_xact. * * The returned array and all its elements are copies of internal data * structures, to minimize the time we need to hold the TwoPhaseStateLock. * * WARNING -- we return even those transactions that are not fully prepared * yet.  The caller should filter them out if he doesn't want them. * * The returned array is palloc'd. */static intGetPreparedTransactionList(GlobalTransaction *gxacts){	GlobalTransaction array;	int			num;	int			i;	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);	if (TwoPhaseState->numPrepXacts == 0)	{		LWLockRelease(TwoPhaseStateLock);		*gxacts = NULL;		return 0;	}	num = TwoPhaseState->numPrepXacts;	array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);	*gxacts = array;	for (i = 0; i < num; i++)		memcpy(array + i, TwoPhaseState->prepXacts[i],			   sizeof(GlobalTransactionData));	LWLockRelease(TwoPhaseStateLock);	return num;}/* Working status for pg_prepared_xact */typedef struct{	GlobalTransaction array;	int			ngxacts;	int			currIdx;} Working_State;/* * pg_prepared_xact *		Produce a view with one row per prepared transaction. * * This function is here so we don't have to export the * GlobalTransactionData struct definition. */Datumpg_prepared_xact(PG_FUNCTION_ARGS){	FuncCallContext *funcctx;	Working_State *status;	if (SRF_IS_FIRSTCALL())	{		TupleDesc	tupdesc;		MemoryContext oldcontext;		/* create a function context for cross-call persistence */		funcctx = SRF_FIRSTCALL_INIT();		/*		 * Switch to memory context appropriate for multiple function calls		 */		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);		/* build tupdesc for result tuples */		/* this had better match pg_prepared_xacts view in system_views.sql */		tupdesc = CreateTemplateTupleDesc(5, false);		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",						   XIDOID, -1, 0);		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",						   TEXTOID, -1, 0);		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",						   TIMESTAMPTZOID, -1, 0);		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",						   OIDOID, -1, 0);		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",						   OIDOID, -1, 0);		funcctx->tuple_desc = BlessTupleDesc(tupdesc);		/*		 * Collect all the 2PC status information that we will format and send		 * out as a result set.		 */		status = (Working_State *) palloc(sizeof(Working_State));		funcctx->user_fctx = (void *) status;		status->ngxacts = GetPreparedTransactionList(&status->array);		status->currIdx = 0;		MemoryContextSwitchTo(oldcontext);	}	funcctx = SRF_PERCALL_SETUP();	status = (Working_State *) funcctx->user_fctx;	while (status->array != NULL && status->currIdx < status->ngxacts)	{		GlobalTransaction gxact = &status->array[status->currIdx++];		Datum		values[5];		bool		nulls[5];		HeapTuple	tuple;		Datum		result;		if (!gxact->valid)			continue;		/*		 * Form tuple with appropriate data.		 */		MemSet(values, 0, sizeof(values));		MemSet(nulls, 0, sizeof(nulls));		values[0] = TransactionIdGetDatum(gxact->proc.xid);		values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));		values[2] = TimestampTzGetDatum(gxact->prepared_at);		values[3] = ObjectIdGetDatum(gxact->owner);		values[4] = ObjectIdGetDatum(gxact->proc.databaseId);		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);		result = HeapTupleGetDatum(tuple);		SRF_RETURN_NEXT(funcctx, result);	}	SRF_RETURN_DONE(funcctx);}/* * TwoPhaseGetDummyProc *		Get the PGPROC that represents a prepared transaction specified by XID */PGPROC *TwoPhaseGetDummyProc(TransactionId xid){	PGPROC	   *result = NULL;	int			i;	static TransactionId cached_xid = InvalidTransactionId;	static PGPROC *cached_proc = NULL;	/*	 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called	 * repeatedly for the same XID.  We can save work with a simple cache.	 */	if (xid == cached_xid)		return cached_proc;	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)	{		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];		if (gxact->proc.xid == xid)		{			result = &gxact->proc;			break;		}	}	LWLockRelease(TwoPhaseStateLock);	if (result == NULL)			/* should not happen */		elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);	cached_xid = xid;	cached_proc = result;	return result;}/************************************************************************//* State file support													*//************************************************************************/#define TwoPhaseFilePath(path, xid) \	snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)/* * 2PC state file format: * *	1. TwoPhaseFileHeader *	2. TransactionId[] (subtransactions) *	3. RelFileNode[] (files to be deleted at commit) *	4. RelFileNode[] (files to be deleted at abort) *	5. TwoPhaseRecordOnDisk *	6. ... *	7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID) *	8. CRC32 * * Each segment except the final CRC32 is MAXALIGN'd. *//* * Header for a 2PC state file */#define TWOPHASE_MAGIC	0x57F94531		/* format identifier */typedef struct TwoPhaseFileHeader{	uint32		magic;			/* format identifier */	uint32		total_len;		/* actual file length */	TransactionId xid;			/* original transaction XID */	Oid			database;		/* OID of database it was in */	TimestampTz prepared_at;	/* time of preparation */	Oid			owner;			/* user running the transaction */	int32		nsubxacts;		/* number of following subxact XIDs */	int32		ncommitrels;	/* number of delete-on-commit rels */	int32		nabortrels;		/* number of delete-on-abort rels */	char		gid[GIDSIZE];	/* GID for transaction */} TwoPhaseFileHeader;/* * Header for each record in a state file * * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header. * The rmgr data will be stored starting on a MAXALIGN boundary. */typedef struct TwoPhaseRecordOnDisk{	uint32		len;			/* length of rmgr data */	TwoPhaseRmgrId rmid;		/* resource manager for this record */	uint16		info;			/* flag bits for use by rmgr */} TwoPhaseRecordOnDisk;/* * During prepare, the state file is assembled in memory before writing it * to WAL and the actual state file.  We use a chain of XLogRecData blocks * so that we will be able to pass the state file contents directly to * XLogInsert. */static struct xllist{	XLogRecData *head;			/* first data block in the chain */	XLogRecData *tail;			/* last block in chain */	uint32		bytes_free;		/* free bytes left in tail block */	uint32		total_len;		/* total data bytes in chain */}	records;/* * Append a block of data to records data structure. * * NB: each block is padded to a MAXALIGN multiple.  This must be * accounted for when the file is later read! * * The data is copied, so the caller is free to modify it afterwards. */static voidsave_state_data(const void *data, uint32 len){	uint32		padlen = MAXALIGN(len);	if (padlen > records.bytes_free)	{		records.tail->next = palloc0(sizeof(XLogRecData));		records.tail = records.tail->next;		records.tail->buffer = InvalidBuffer;		records.tail->len = 0;		records.tail->next = NULL;		records.bytes_free = Max(padlen, 512);		records.tail->data = palloc(records.bytes_free);	}	memcpy(((char *) records.tail->data) + records.tail->len, data, len);	records.tail->len += padlen;	records.bytes_free -= padlen;	records.total_len += padlen;}/* * Start preparing a state file. * * Initializes data structure and inserts the 2PC file header record. */voidStartPrepare(GlobalTransaction gxact){	TransactionId xid = gxact->proc.xid;	TwoPhaseFileHeader hdr;	TransactionId *children;	RelFileNode *commitrels;	RelFileNode *abortrels;	/* Initialize linked list */	records.head = palloc0(sizeof(XLogRecData));	records.head->buffer = InvalidBuffer;	records.head->len = 0;	records.head->next = NULL;	records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);	records.head->data = palloc(records.bytes_free);	records.tail = records.head;	records.total_len = 0;	/* Create header */	hdr.magic = TWOPHASE_MAGIC;	hdr.total_len = 0;			/* EndPrepare will fill this in */	hdr.xid = xid;	hdr.database = gxact->proc.databaseId;	hdr.prepared_at = gxact->prepared_at;	hdr.owner = gxact->owner;	hdr.nsubxacts = xactGetCommittedChildren(&children);	hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);	hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);	StrNCpy(hdr.gid, gxact->gid, GIDSIZE);	save_state_data(&hdr, sizeof(TwoPhaseFileHeader));	/* Add the additional info about subxacts and deletable files */	if (hdr.nsubxacts > 0)	{		save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));		/* While we have the child-xact data, stuff it in the gxact too */		GXactLoadSubxactData(gxact, hdr.nsubxacts, children);	}	if (hdr.ncommitrels > 0)	{		save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));		pfree(commitrels);	}	if (hdr.nabortrels > 0)	{		save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));		pfree(abortrels);	}}/* * Finish preparing state file. * * Calculates CRC and writes state file to WAL and in pg_twophase directory. */voidEndPrepare(GlobalTransaction gxact){	TransactionId xid = gxact->proc.xid;	TwoPhaseFileHeader *hdr;	char		path[MAXPGPATH];	XLogRecData *record;	pg_crc32	statefile_crc;	pg_crc32	bogus_crc;	int			fd;	/* Add the end sentinel to the list of 2PC records */	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,						   NULL, 0);	/* Go back and fill in total_len in the file header record */	hdr = (TwoPhaseFileHeader *) records.head->data;	Assert(hdr->magic == TWOPHASE_MAGIC);	hdr->total_len = records.total_len + sizeof(pg_crc32);	/*	 * If the file size exceeds MaxAllocSize, we won't be able to read it in	 * ReadTwoPhaseFile. Check for that now, rather than fail at commit time.	 */	if (hdr->total_len > MaxAllocSize)		ereport(ERROR,				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),				 errmsg("two-phase state file maximum length exceeded")));	/*	 * Create the 2PC state file.	 *	 * Note: because we use BasicOpenFile(), we are responsible for ensuring	 * the FD gets closed in any error exit path.  Once we get into the	 * critical section, though, it doesn't matter since any failure causes	 * PANIC anyway.	 */	TwoPhaseFilePath(path, xid);	fd = BasicOpenFile(path,					   O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,					   S_IRUSR | S_IWUSR);	if (fd < 0)		ereport(ERROR,				(errcode_for_file_access(),				 errmsg("could not create two-phase state file \"%s\": %m",						path)));	/* Write data to file, and calculate CRC as we pass over it */	INIT_CRC32(statefile_crc);	for (record = records.head; record != NULL; record = record->next)	{		COMP_CRC32(statefile_crc, record->data, record->len);		if ((write(fd, record->data, record->len)) != record->len)		{			close(fd);			ereport(ERROR,					(errcode_for_file_access(),					 errmsg("could not write two-phase state file: %m")));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -