txid.c
来自「postgresql8.3.4源码,开源数据库」· C语言 代码 · 共 592 行
C
592 行
/*------------------------------------------------------------------------- * txid.c * * Export internal transaction IDs to user level. * * Note that only top-level transaction IDs are ever converted to TXID. * This is important because TXIDs frequently persist beyond the global * xmin horizon, or may even be shipped to other machines, so we cannot * rely on being able to correlate subtransaction IDs with their parents * via functions such as SubTransGetTopmostTransaction(). * * * Copyright (c) 2003-2008, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * 64-bit txids: Marko Kreen, Skype Technologies * * $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.4 2008/01/01 19:45:53 momjian Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/transam.h"#include "access/xact.h"#include "funcapi.h"#include "libpq/pqformat.h"#include "utils/builtins.h"#ifndef INT64_IS_BUSTED/* txid will be signed int8 in database, so must limit to 63 bits */#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)#else/* we only really have 32 bits to work with :-( */#define MAX_TXID UINT64CONST(0x7FFFFFFF)#endif/* Use unsigned variant internally */typedef uint64 txid;/* sprintf format code for uint64 */#define TXID_FMT UINT64_FORMAT/* * If defined, use bsearch() function for searching for txids in snapshots * that have more than the specified number of values. */#define USE_BSEARCH_IF_NXIP_GREATER 30/* * Snapshot containing 8byte txids. */typedef struct{ /* * 4-byte length hdr, should not be touched directly. * * Explicit embedding is ok as we want always correct alignment anyway. */ int32 __varsz; uint32 nxip; /* number of txids in xip array */ txid xmin; txid xmax; txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */} TxidSnapshot;#define TXID_SNAPSHOT_SIZE(nxip) \ (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))/* * Epoch values from xact.c */typedef struct{ TransactionId last_xid; uint32 epoch;} TxidEpoch;/* * Fetch epoch data from xact.c. */static voidload_xid_epoch(TxidEpoch *state){ GetNextXidAndEpoch(&state->last_xid, &state->epoch);}/* * do a TransactionId -> txid conversion for an XID near the given epoch */static txidconvert_xid(TransactionId xid, const TxidEpoch *state){#ifndef INT64_IS_BUSTED uint64 epoch; /* return special xid's as-is */ if (!TransactionIdIsNormal(xid)) return (txid) xid; /* xid can be on either side when near wrap-around */ epoch = (uint64) state->epoch; if (xid > state->last_xid && TransactionIdPrecedes(xid, state->last_xid)) epoch--; else if (xid < state->last_xid && TransactionIdFollows(xid, state->last_xid)) epoch++; return (epoch << 32) | xid;#else /* INT64_IS_BUSTED */ /* we can't do anything with the epoch, so ignore it */ return (txid) xid & MAX_TXID;#endif /* INT64_IS_BUSTED */}/* * txid comparator for qsort/bsearch */static intcmp_txid(const void *aa, const void *bb){ txid a = *(const txid *) aa; txid b = *(const txid *) bb; if (a < b) return -1; if (a > b) return 1; return 0;}/* * sort a snapshot's txids, so we can use bsearch() later. * * For consistency of on-disk representation, we always sort even if bsearch * will not be used. */static voidsort_snapshot(TxidSnapshot *snap){ if (snap->nxip > 1) qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);}/* * check txid visibility. */static boolis_visible_txid(txid value, const TxidSnapshot *snap){ if (value < snap->xmin) return true; else if (value >= snap->xmax) return false;#ifdef USE_BSEARCH_IF_NXIP_GREATER else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER) { void *res; res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid); /* if found, transaction is still in progress */ return (res) ? false : true; }#endif else { uint32 i; for (i = 0; i < snap->nxip; i++) { if (value == snap->xip[i]) return false; } return true; }}/* * helper functions to use StringInfo for TxidSnapshot creation. */static StringInfobuf_init(txid xmin, txid xmax){ TxidSnapshot snap; StringInfo buf; snap.xmin = xmin; snap.xmax = xmax; snap.nxip = 0; buf = makeStringInfo(); appendBinaryStringInfo(buf, (char *) &snap, TXID_SNAPSHOT_SIZE(0)); return buf;}static voidbuf_add_txid(StringInfo buf, txid xid){ TxidSnapshot *snap = (TxidSnapshot *) buf->data; /* do this before possible realloc */ snap->nxip++; appendBinaryStringInfo(buf, (char *) &xid, sizeof(xid));}static TxidSnapshot *buf_finalize(StringInfo buf){ TxidSnapshot *snap = (TxidSnapshot *) buf->data; SET_VARSIZE(snap, buf->len); /* buf is not needed anymore */ buf->data = NULL; pfree(buf); return snap;}/* * simple number parser. * * We return 0 on error, which is invalid value for txid. */static txidstr2txid(const char *s, const char **endp){ txid val = 0; txid cutoff = MAX_TXID / 10; txid cutlim = MAX_TXID % 10; for (; *s; s++) { unsigned d; if (*s < '0' || *s > '9') break; d = *s - '0'; /* * check for overflow */ if (val > cutoff || (val == cutoff && d > cutlim)) { val = 0; break; } val = val * 10 + d; } if (endp) *endp = s; return val;}/* * parse snapshot from cstring */static TxidSnapshot *parse_snapshot(const char *str){ txid xmin; txid xmax; txid last_val = 0, val; const char *str_start = str; const char *endp; StringInfo buf; xmin = str2txid(str, &endp); if (*endp != ':') goto bad_format; str = endp + 1; xmax = str2txid(str, &endp); if (*endp != ':') goto bad_format; str = endp + 1; /* it should look sane */ if (xmin == 0 || xmax == 0 || xmin > xmax) goto bad_format; /* allocate buffer */ buf = buf_init(xmin, xmax); /* loop over values */ while (*str != '\0') { /* read next value */ val = str2txid(str, &endp); str = endp; /* require the input to be in order */ if (val < xmin || val >= xmax || val <= last_val) goto bad_format; buf_add_txid(buf, val); last_val = val; if (*str == ',') str++; else if (*str != '\0') goto bad_format; } return buf_finalize(buf);bad_format: elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start); return NULL;}/* * Public functions. * * txid_current() and txid_current_snapshot() are the only ones that * communicate with core xid machinery. All the others work on data * returned by them. *//* * txid_current() returns int8 * * Return the current toplevel transaction ID as TXID */Datumtxid_current(PG_FUNCTION_ARGS){ txid val; TxidEpoch state; load_xid_epoch(&state); val = convert_xid(GetTopTransactionId(), &state); PG_RETURN_INT64(val);}/* * txid_current_snapshot() returns txid_snapshot * * Return current snapshot in TXID format * * Note that only top-transaction XIDs are included in the snapshot. */Datumtxid_current_snapshot(PG_FUNCTION_ARGS){ TxidSnapshot *snap; uint32 nxip, i, size; TxidEpoch state; Snapshot cur; cur = ActiveSnapshot; if (cur == NULL) elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL"); load_xid_epoch(&state); /* allocate */ nxip = cur->xcnt; size = TXID_SNAPSHOT_SIZE(nxip); snap = palloc(size); SET_VARSIZE(snap, size); /* fill */ snap->xmin = convert_xid(cur->xmin, &state); snap->xmax = convert_xid(cur->xmax, &state); snap->nxip = nxip; for (i = 0; i < nxip; i++) snap->xip[i] = convert_xid(cur->xip[i], &state); /* we want them guaranteed to be in ascending order */ sort_snapshot(snap); PG_RETURN_POINTER(snap);}/* * txid_snapshot_in(cstring) returns txid_snapshot * * input function for type txid_snapshot */Datumtxid_snapshot_in(PG_FUNCTION_ARGS){ char *str = PG_GETARG_CSTRING(0); TxidSnapshot *snap; snap = parse_snapshot(str); PG_RETURN_POINTER(snap);}/* * txid_snapshot_out(txid_snapshot) returns cstring * * output function for type txid_snapshot */Datumtxid_snapshot_out(PG_FUNCTION_ARGS){ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); StringInfoData str; uint32 i; initStringInfo(&str); appendStringInfo(&str, TXID_FMT ":", snap->xmin); appendStringInfo(&str, TXID_FMT ":", snap->xmax); for (i = 0; i < snap->nxip; i++) { if (i > 0) appendStringInfoChar(&str, ','); appendStringInfo(&str, TXID_FMT, snap->xip[i]); } PG_RETURN_CSTRING(str.data);}/* * txid_snapshot_recv(internal) returns txid_snapshot * * binary input function for type txid_snapshot * * format: int4 nxip, int8 xmin, int8 xmax, int8 xip */Datumtxid_snapshot_recv(PG_FUNCTION_ARGS){ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); TxidSnapshot *snap; txid last = 0; int nxip; int i; int avail; int expect; txid xmin, xmax; /* * load nxip and check for nonsense. * * (nxip > avail) check is against int overflows in 'expect'. */ nxip = pq_getmsgint(buf, 4); avail = buf->len - buf->cursor; expect = 8 + 8 + nxip * 8; if (nxip < 0 || nxip > avail || expect > avail) goto bad_format; xmin = pq_getmsgint64(buf); xmax = pq_getmsgint64(buf); if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID) goto bad_format; snap = palloc(TXID_SNAPSHOT_SIZE(nxip)); snap->xmin = xmin; snap->xmax = xmax; snap->nxip = nxip; SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip)); for (i = 0; i < nxip; i++) { txid cur = pq_getmsgint64(buf); if (cur <= last || cur < xmin || cur >= xmax) goto bad_format; snap->xip[i] = cur; last = cur; } PG_RETURN_POINTER(snap);bad_format: elog(ERROR, "invalid snapshot data"); return (Datum) NULL;}/* * txid_snapshot_send(txid_snapshot) returns bytea * * binary output function for type txid_snapshot * * format: int4 nxip, int8 xmin, int8 xmax, int8 xip */Datumtxid_snapshot_send(PG_FUNCTION_ARGS){ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); StringInfoData buf; uint32 i; pq_begintypsend(&buf); pq_sendint(&buf, snap->nxip, 4); pq_sendint64(&buf, snap->xmin); pq_sendint64(&buf, snap->xmax); for (i = 0; i < snap->nxip; i++) pq_sendint64(&buf, snap->xip[i]); PG_RETURN_BYTEA_P(pq_endtypsend(&buf));}/* * txid_visible_in_snapshot(int8, txid_snapshot) returns bool * * is txid visible in snapshot ? */Datumtxid_visible_in_snapshot(PG_FUNCTION_ARGS){ txid value = PG_GETARG_INT64(0); TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1); PG_RETURN_BOOL(is_visible_txid(value, snap));}/* * txid_snapshot_xmin(txid_snapshot) returns int8 * * return snapshot's xmin */Datumtxid_snapshot_xmin(PG_FUNCTION_ARGS){ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); PG_RETURN_INT64(snap->xmin);}/* * txid_snapshot_xmax(txid_snapshot) returns int8 * * return snapshot's xmax */Datumtxid_snapshot_xmax(PG_FUNCTION_ARGS){ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); PG_RETURN_INT64(snap->xmax);}/* * txid_snapshot_xip(txid_snapshot) returns setof int8 * * return in-progress TXIDs in snapshot. */Datumtxid_snapshot_xip(PG_FUNCTION_ARGS){ FuncCallContext *fctx; TxidSnapshot *snap; txid value; /* on first call initialize snap_state and get copy of snapshot */ if (SRF_IS_FIRSTCALL()) { TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); fctx = SRF_FIRSTCALL_INIT(); /* make a copy of user snapshot */ snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg)); memcpy(snap, arg, VARSIZE(arg)); fctx->user_fctx = snap; } /* return values one-by-one */ fctx = SRF_PERCALL_SETUP(); snap = fctx->user_fctx; if (fctx->call_cntr < snap->nxip) { value = snap->xip[fctx->call_cntr]; SRF_RETURN_NEXT(fctx, Int64GetDatum(value)); } else { SRF_RETURN_DONE(fctx); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?