txid.c

来自「postgresql8.3.4源码,开源数据库」· C语言 代码 · 共 592 行

C
592
字号
/*------------------------------------------------------------------------- * txid.c * *	Export internal transaction IDs to user level. * * Note that only top-level transaction IDs are ever converted to TXID. * This is important because TXIDs frequently persist beyond the global * xmin horizon, or may even be shipped to other machines, so we cannot * rely on being able to correlate subtransaction IDs with their parents * via functions such as SubTransGetTopmostTransaction(). * * *	Copyright (c) 2003-2008, PostgreSQL Global Development Group *	Author: Jan Wieck, Afilias USA INC. *	64-bit txids: Marko Kreen, Skype Technologies * *	$PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.4 2008/01/01 19:45:53 momjian Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/transam.h"#include "access/xact.h"#include "funcapi.h"#include "libpq/pqformat.h"#include "utils/builtins.h"#ifndef INT64_IS_BUSTED/* txid will be signed int8 in database, so must limit to 63 bits */#define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF)#else/* we only really have 32 bits to work with :-( */#define MAX_TXID   UINT64CONST(0x7FFFFFFF)#endif/* Use unsigned variant internally */typedef uint64 txid;/* sprintf format code for uint64 */#define TXID_FMT UINT64_FORMAT/* * If defined, use bsearch() function for searching for txids in snapshots * that have more than the specified number of values. */#define USE_BSEARCH_IF_NXIP_GREATER 30/* * Snapshot containing 8byte txids. */typedef struct{	/*	 * 4-byte length hdr, should not be touched directly.	 *	 * Explicit embedding is ok as we want always correct alignment anyway.	 */	int32		__varsz;	uint32		nxip;			/* number of txids in xip array */	txid		xmin;	txid		xmax;	txid		xip[1];			/* in-progress txids, xmin <= xip[i] < xmax */} TxidSnapshot;#define TXID_SNAPSHOT_SIZE(nxip) \	(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))/* * Epoch values from xact.c */typedef struct{	TransactionId last_xid;	uint32		epoch;} TxidEpoch;/* * Fetch epoch data from xact.c. */static voidload_xid_epoch(TxidEpoch *state){	GetNextXidAndEpoch(&state->last_xid, &state->epoch);}/* * do a TransactionId -> txid conversion for an XID near the given epoch */static txidconvert_xid(TransactionId xid, const TxidEpoch *state){#ifndef INT64_IS_BUSTED	uint64		epoch;	/* return special xid's as-is */	if (!TransactionIdIsNormal(xid))		return (txid) xid;	/* xid can be on either side when near wrap-around */	epoch = (uint64) state->epoch;	if (xid > state->last_xid &&		TransactionIdPrecedes(xid, state->last_xid))		epoch--;	else if (xid < state->last_xid &&			 TransactionIdFollows(xid, state->last_xid))		epoch++;	return (epoch << 32) | xid;#else							/* INT64_IS_BUSTED */	/* we can't do anything with the epoch, so ignore it */	return (txid) xid & MAX_TXID;#endif   /* INT64_IS_BUSTED */}/* * txid comparator for qsort/bsearch */static intcmp_txid(const void *aa, const void *bb){	txid		a = *(const txid *) aa;	txid		b = *(const txid *) bb;	if (a < b)		return -1;	if (a > b)		return 1;	return 0;}/* * sort a snapshot's txids, so we can use bsearch() later. * * For consistency of on-disk representation, we always sort even if bsearch * will not be used. */static voidsort_snapshot(TxidSnapshot *snap){	if (snap->nxip > 1)		qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);}/* * check txid visibility. */static boolis_visible_txid(txid value, const TxidSnapshot *snap){	if (value < snap->xmin)		return true;	else if (value >= snap->xmax)		return false;#ifdef USE_BSEARCH_IF_NXIP_GREATER	else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)	{		void	   *res;		res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);		/* if found, transaction is still in progress */		return (res) ? false : true;	}#endif	else	{		uint32		i;		for (i = 0; i < snap->nxip; i++)		{			if (value == snap->xip[i])				return false;		}		return true;	}}/* * helper functions to use StringInfo for TxidSnapshot creation. */static StringInfobuf_init(txid xmin, txid xmax){	TxidSnapshot snap;	StringInfo	buf;	snap.xmin = xmin;	snap.xmax = xmax;	snap.nxip = 0;	buf = makeStringInfo();	appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0));	return buf;}static voidbuf_add_txid(StringInfo buf, txid xid){	TxidSnapshot *snap = (TxidSnapshot *) buf->data;	/* do this before possible realloc */	snap->nxip++;	appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));}static TxidSnapshot *buf_finalize(StringInfo buf){	TxidSnapshot *snap = (TxidSnapshot *) buf->data;	SET_VARSIZE(snap, buf->len);	/* buf is not needed anymore */	buf->data = NULL;	pfree(buf);	return snap;}/* * simple number parser. * * We return 0 on error, which is invalid value for txid. */static txidstr2txid(const char *s, const char **endp){	txid		val = 0;	txid		cutoff = MAX_TXID / 10;	txid		cutlim = MAX_TXID % 10;	for (; *s; s++)	{		unsigned	d;		if (*s < '0' || *s > '9')			break;		d = *s - '0';		/*		 * check for overflow		 */		if (val > cutoff || (val == cutoff && d > cutlim))		{			val = 0;			break;		}		val = val * 10 + d;	}	if (endp)		*endp = s;	return val;}/* * parse snapshot from cstring */static TxidSnapshot *parse_snapshot(const char *str){	txid		xmin;	txid		xmax;	txid		last_val = 0,				val;	const char *str_start = str;	const char *endp;	StringInfo	buf;	xmin = str2txid(str, &endp);	if (*endp != ':')		goto bad_format;	str = endp + 1;	xmax = str2txid(str, &endp);	if (*endp != ':')		goto bad_format;	str = endp + 1;	/* it should look sane */	if (xmin == 0 || xmax == 0 || xmin > xmax)		goto bad_format;	/* allocate buffer */	buf = buf_init(xmin, xmax);	/* loop over values */	while (*str != '\0')	{		/* read next value */		val = str2txid(str, &endp);		str = endp;		/* require the input to be in order */		if (val < xmin || val >= xmax || val <= last_val)			goto bad_format;		buf_add_txid(buf, val);		last_val = val;		if (*str == ',')			str++;		else if (*str != '\0')			goto bad_format;	}	return buf_finalize(buf);bad_format:	elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);	return NULL;}/* * Public functions. * * txid_current() and txid_current_snapshot() are the only ones that * communicate with core xid machinery.  All the others work on data * returned by them. *//* * txid_current() returns int8 * *		Return the current toplevel transaction ID as TXID */Datumtxid_current(PG_FUNCTION_ARGS){	txid		val;	TxidEpoch	state;	load_xid_epoch(&state);	val = convert_xid(GetTopTransactionId(), &state);	PG_RETURN_INT64(val);}/* * txid_current_snapshot() returns txid_snapshot * *		Return current snapshot in TXID format * * Note that only top-transaction XIDs are included in the snapshot. */Datumtxid_current_snapshot(PG_FUNCTION_ARGS){	TxidSnapshot *snap;	uint32		nxip,				i,				size;	TxidEpoch	state;	Snapshot	cur;	cur = ActiveSnapshot;	if (cur == NULL)		elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");	load_xid_epoch(&state);	/* allocate */	nxip = cur->xcnt;	size = TXID_SNAPSHOT_SIZE(nxip);	snap = palloc(size);	SET_VARSIZE(snap, size);	/* fill */	snap->xmin = convert_xid(cur->xmin, &state);	snap->xmax = convert_xid(cur->xmax, &state);	snap->nxip = nxip;	for (i = 0; i < nxip; i++)		snap->xip[i] = convert_xid(cur->xip[i], &state);	/* we want them guaranteed to be in ascending order */	sort_snapshot(snap);	PG_RETURN_POINTER(snap);}/* * txid_snapshot_in(cstring) returns txid_snapshot * *		input function for type txid_snapshot */Datumtxid_snapshot_in(PG_FUNCTION_ARGS){	char	   *str = PG_GETARG_CSTRING(0);	TxidSnapshot *snap;	snap = parse_snapshot(str);	PG_RETURN_POINTER(snap);}/* * txid_snapshot_out(txid_snapshot) returns cstring * *		output function for type txid_snapshot */Datumtxid_snapshot_out(PG_FUNCTION_ARGS){	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);	StringInfoData str;	uint32		i;	initStringInfo(&str);	appendStringInfo(&str, TXID_FMT ":", snap->xmin);	appendStringInfo(&str, TXID_FMT ":", snap->xmax);	for (i = 0; i < snap->nxip; i++)	{		if (i > 0)			appendStringInfoChar(&str, ',');		appendStringInfo(&str, TXID_FMT, snap->xip[i]);	}	PG_RETURN_CSTRING(str.data);}/* * txid_snapshot_recv(internal) returns txid_snapshot * *		binary input function for type txid_snapshot * *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip */Datumtxid_snapshot_recv(PG_FUNCTION_ARGS){	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);	TxidSnapshot *snap;	txid		last = 0;	int			nxip;	int			i;	int			avail;	int			expect;	txid		xmin,				xmax;	/*	 * load nxip and check for nonsense.	 *	 * (nxip > avail) check is against int overflows in 'expect'.	 */	nxip = pq_getmsgint(buf, 4);	avail = buf->len - buf->cursor;	expect = 8 + 8 + nxip * 8;	if (nxip < 0 || nxip > avail || expect > avail)		goto bad_format;	xmin = pq_getmsgint64(buf);	xmax = pq_getmsgint64(buf);	if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)		goto bad_format;	snap = palloc(TXID_SNAPSHOT_SIZE(nxip));	snap->xmin = xmin;	snap->xmax = xmax;	snap->nxip = nxip;	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));	for (i = 0; i < nxip; i++)	{		txid		cur = pq_getmsgint64(buf);		if (cur <= last || cur < xmin || cur >= xmax)			goto bad_format;		snap->xip[i] = cur;		last = cur;	}	PG_RETURN_POINTER(snap);bad_format:	elog(ERROR, "invalid snapshot data");	return (Datum) NULL;}/* * txid_snapshot_send(txid_snapshot) returns bytea * *		binary output function for type txid_snapshot * *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip */Datumtxid_snapshot_send(PG_FUNCTION_ARGS){	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);	StringInfoData buf;	uint32		i;	pq_begintypsend(&buf);	pq_sendint(&buf, snap->nxip, 4);	pq_sendint64(&buf, snap->xmin);	pq_sendint64(&buf, snap->xmax);	for (i = 0; i < snap->nxip; i++)		pq_sendint64(&buf, snap->xip[i]);	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));}/* * txid_visible_in_snapshot(int8, txid_snapshot) returns bool * *		is txid visible in snapshot ? */Datumtxid_visible_in_snapshot(PG_FUNCTION_ARGS){	txid		value = PG_GETARG_INT64(0);	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);	PG_RETURN_BOOL(is_visible_txid(value, snap));}/* * txid_snapshot_xmin(txid_snapshot) returns int8 * *		return snapshot's xmin */Datumtxid_snapshot_xmin(PG_FUNCTION_ARGS){	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);	PG_RETURN_INT64(snap->xmin);}/* * txid_snapshot_xmax(txid_snapshot) returns int8 * *		return snapshot's xmax */Datumtxid_snapshot_xmax(PG_FUNCTION_ARGS){	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);	PG_RETURN_INT64(snap->xmax);}/* * txid_snapshot_xip(txid_snapshot) returns setof int8 * *		return in-progress TXIDs in snapshot. */Datumtxid_snapshot_xip(PG_FUNCTION_ARGS){	FuncCallContext *fctx;	TxidSnapshot *snap;	txid		value;	/* on first call initialize snap_state and get copy of snapshot */	if (SRF_IS_FIRSTCALL())	{		TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);		fctx = SRF_FIRSTCALL_INIT();		/* make a copy of user snapshot */		snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));		memcpy(snap, arg, VARSIZE(arg));		fctx->user_fctx = snap;	}	/* return values one-by-one */	fctx = SRF_PERCALL_SETUP();	snap = fctx->user_fctx;	if (fctx->call_cntr < snap->nxip)	{		value = snap->xip[fctx->call_cntr];		SRF_RETURN_NEXT(fctx, Int64GetDatum(value));	}	else	{		SRF_RETURN_DONE(fctx);	}}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?