sequence.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 1,123 行 · 第 1/2 页

C
1,123
字号
/*------------------------------------------------------------------------- * * sequence.c *	  PostgreSQL sequences support code. * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION *	  $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.103.2.2 2004/04/06 16:39:40 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/heapam.h"#include "catalog/namespace.h"#include "catalog/pg_type.h"#include "commands/defrem.h"#include "commands/tablecmds.h"#include "commands/sequence.h"#include "miscadmin.h"#include "utils/acl.h"#include "utils/builtins.h"/* * We don't want to log each fetching of a value from a sequence, * so we pre-log a few fetches in advance. In the event of * crash we can lose as much as we pre-logged. */#define SEQ_LOG_VALS	32/* * The "special area" of a sequence's buffer page looks like this. */#define SEQ_MAGIC	  0x1717typedef struct sequence_magic{	uint32		magic;} sequence_magic;/* * We store a SeqTable item for every sequence we have touched in the current * session.  This is needed to hold onto nextval/currval state.  (We can't * rely on the relcache, since it's only, well, a cache, and may decide to * discard entries.) * * XXX We use linear search to find pre-existing SeqTable entries.	This is * good when only a small number of sequences are touched in a session, but * would suck with many different sequences.  Perhaps use a hashtable someday. */typedef struct SeqTableData{	struct SeqTableData *next;	/* link to next SeqTable object */	Oid			relid;			/* pg_class OID of this sequence */	TransactionId xid;			/* xact in which we last did a seq op */	int64		last;			/* value last returned by nextval */	int64		cached;			/* last value already cached for nextval */	/* if last != cached, we have not used up all the cached values */	int64		increment;		/* copy of sequence's increment field */} SeqTableData;typedef SeqTableData *SeqTable;static SeqTable seqtab = NULL;	/* Head of list of SeqTable items */static void init_sequence(RangeVar *relation,			  SeqTable *p_elm, Relation *p_rel);static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf);static void init_params(List *options, Form_pg_sequence new, bool isInit);static void do_setval(RangeVar *sequence, int64 next, bool iscalled);/* * DefineSequence *				Creates a new sequence relation */voidDefineSequence(CreateSeqStmt *seq){	FormData_pg_sequence new;	CreateStmt *stmt = makeNode(CreateStmt);	Oid			seqoid;	Relation	rel;	Buffer		buf;	PageHeader	page;	sequence_magic *sm;	HeapTuple	tuple;	TupleDesc	tupDesc;	Datum		value[SEQ_COL_LASTCOL];	char		null[SEQ_COL_LASTCOL];	int			i;	NameData	name;	/* Check and set all option values */	init_params(seq->options, &new, true);	/*	 * Create relation (and fill *null & *value)	 */	stmt->tableElts = NIL;	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)	{		ColumnDef  *coldef;		TypeName   *typnam;		typnam = makeNode(TypeName);		typnam->setof = FALSE;		typnam->arrayBounds = NIL;		typnam->typmod = -1;		coldef = makeNode(ColumnDef);		coldef->typename = typnam;		coldef->inhcount = 0;		coldef->is_local = true;		coldef->is_not_null = true;		coldef->raw_default = NULL;		coldef->cooked_default = NULL;		coldef->constraints = NIL;		coldef->support = NULL;		null[i - 1] = ' ';		switch (i)		{			case SEQ_COL_NAME:				typnam->typeid = NAMEOID;				coldef->colname = "sequence_name";				namestrcpy(&name, seq->sequence->relname);				value[i - 1] = NameGetDatum(&name);				break;			case SEQ_COL_LASTVAL:				typnam->typeid = INT8OID;				coldef->colname = "last_value";				value[i - 1] = Int64GetDatumFast(new.last_value);				break;			case SEQ_COL_INCBY:				typnam->typeid = INT8OID;				coldef->colname = "increment_by";				value[i - 1] = Int64GetDatumFast(new.increment_by);				break;			case SEQ_COL_MAXVALUE:				typnam->typeid = INT8OID;				coldef->colname = "max_value";				value[i - 1] = Int64GetDatumFast(new.max_value);				break;			case SEQ_COL_MINVALUE:				typnam->typeid = INT8OID;				coldef->colname = "min_value";				value[i - 1] = Int64GetDatumFast(new.min_value);				break;			case SEQ_COL_CACHE:				typnam->typeid = INT8OID;				coldef->colname = "cache_value";				value[i - 1] = Int64GetDatumFast(new.cache_value);				break;			case SEQ_COL_LOG:				typnam->typeid = INT8OID;				coldef->colname = "log_cnt";				value[i - 1] = Int64GetDatum((int64) 1);				break;			case SEQ_COL_CYCLE:				typnam->typeid = BOOLOID;				coldef->colname = "is_cycled";				value[i - 1] = BoolGetDatum(new.is_cycled);				break;			case SEQ_COL_CALLED:				typnam->typeid = BOOLOID;				coldef->colname = "is_called";				value[i - 1] = BoolGetDatum(false);				break;		}		stmt->tableElts = lappend(stmt->tableElts, coldef);	}	stmt->relation = seq->sequence;	stmt->inhRelations = NIL;	stmt->constraints = NIL;	stmt->hasoids = false;	stmt->oncommit = ONCOMMIT_NOOP;	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE);	rel = heap_open(seqoid, AccessExclusiveLock);	tupDesc = RelationGetDescr(rel);	/* Initialize first page of relation with special magic number */	buf = ReadBuffer(rel, P_NEW);	if (!BufferIsValid(buf))		elog(ERROR, "ReadBuffer failed");	Assert(BufferGetBlockNumber(buf) == 0);	page = (PageHeader) BufferGetPage(buf);	PageInit((Page) page, BufferGetPageSize(buf), sizeof(sequence_magic));	sm = (sequence_magic *) PageGetSpecialPointer(page);	sm->magic = SEQ_MAGIC;	/* hack: ensure heap_insert will insert on the just-created page */	rel->rd_targblock = 0;	/* Now form & insert sequence tuple */	tuple = heap_formtuple(tupDesc, value, null);	simple_heap_insert(rel, tuple);	Assert(ItemPointerGetOffsetNumber(&(tuple->t_self)) == FirstOffsetNumber);	/*	 * Two special hacks here:	 *	 * 1. Since VACUUM does not process sequences, we have to force the tuple	 * to have xmin = FrozenTransactionId now.	Otherwise it would become	 * invisible to SELECTs after 2G transactions.	It is okay to do this	 * because if the current transaction aborts, no other xact will ever	 * examine the sequence tuple anyway.	 *	 * 2. Even though heap_insert emitted a WAL log record, we have to emit	 * an XLOG_SEQ_LOG record too, since (a) the heap_insert record will	 * not have the right xmin, and (b) REDO of the heap_insert record	 * would re-init page and sequence magic number would be lost.	This	 * means two log records instead of one :-(	 */	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);	START_CRIT_SECTION();	{		/*		 * Note that the "tuple" structure is still just a local tuple		 * record created by heap_formtuple; its t_data pointer doesn't		 * point at the disk buffer.  To scribble on the disk buffer we		 * need to fetch the item pointer.	But do the same to the local		 * tuple, since that will be the source for the WAL log record,		 * below.		 */		ItemId		itemId;		Item		item;		itemId = PageGetItemId((Page) page, FirstOffsetNumber);		item = PageGetItem((Page) page, itemId);		HeapTupleHeaderSetXmin((HeapTupleHeader) item, FrozenTransactionId);		((HeapTupleHeader) item)->t_infomask |= HEAP_XMIN_COMMITTED;		HeapTupleHeaderSetXmin(tuple->t_data, FrozenTransactionId);		tuple->t_data->t_infomask |= HEAP_XMIN_COMMITTED;	}	/* XLOG stuff */	if (!rel->rd_istemp)	{		xl_seq_rec	xlrec;		XLogRecPtr	recptr;		XLogRecData rdata[2];		Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);		/* We do not log first nextval call, so "advance" sequence here */		/* Note we are scribbling on local tuple, not the disk buffer */		newseq->is_called = true;		newseq->log_cnt = 0;		xlrec.node = rel->rd_node;		rdata[0].buffer = InvalidBuffer;		rdata[0].data = (char *) &xlrec;		rdata[0].len = sizeof(xl_seq_rec);		rdata[0].next = &(rdata[1]);		rdata[1].buffer = InvalidBuffer;		rdata[1].data = (char *) tuple->t_data;		rdata[1].len = tuple->t_len;		rdata[1].next = NULL;		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);		PageSetLSN(page, recptr);		PageSetSUI(page, ThisStartUpID);	}	END_CRIT_SECTION();	LockBuffer(buf, BUFFER_LOCK_UNLOCK);	WriteBuffer(buf);	heap_close(rel, NoLock);}/* * AlterSequence * * Modify the definition of a sequence relation */voidAlterSequence(AlterSeqStmt *stmt){	SeqTable	elm;	Relation	seqrel;	Buffer		buf;	Page		page;	Form_pg_sequence seq;	FormData_pg_sequence new;	/* open and AccessShareLock sequence */	init_sequence(stmt->sequence, &elm, &seqrel);	/* allow ALTER to sequence owner only */	if (!pg_class_ownercheck(elm->relid, GetUserId()))		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,					   stmt->sequence->relname);	/* lock page' buffer and read tuple into new sequence structure */	seq = read_info(elm, seqrel, &buf);	page = BufferGetPage(buf);	/* Copy old values of options into workspace */	memcpy(&new, seq, sizeof(FormData_pg_sequence));	/* Check and set new values */	init_params(stmt->options, &new, false);	/* Now okay to update the on-disk tuple */	memcpy(seq, &new, sizeof(FormData_pg_sequence));	/* Clear local cache so that we don't think we have cached numbers */	elm->last = new.last_value;			/* last returned number */	elm->cached = new.last_value;		/* last cached number (forget										 * cached values) */	START_CRIT_SECTION();	/* XLOG stuff */	if (!seqrel->rd_istemp)	{		xl_seq_rec	xlrec;		XLogRecPtr	recptr;		XLogRecData rdata[2];		xlrec.node = seqrel->rd_node;		rdata[0].buffer = InvalidBuffer;		rdata[0].data = (char *) &xlrec;		rdata[0].len = sizeof(xl_seq_rec);		rdata[0].next = &(rdata[1]);		rdata[1].buffer = InvalidBuffer;		rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;		rdata[1].len = ((PageHeader) page)->pd_special -			((PageHeader) page)->pd_upper;		rdata[1].next = NULL;		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);		PageSetLSN(page, recptr);		PageSetSUI(page, ThisStartUpID);	}	END_CRIT_SECTION();	LockBuffer(buf, BUFFER_LOCK_UNLOCK);	WriteBuffer(buf);	relation_close(seqrel, NoLock);}Datumnextval(PG_FUNCTION_ARGS){	text	   *seqin = PG_GETARG_TEXT_P(0);	RangeVar   *sequence;	SeqTable	elm;	Relation	seqrel;	Buffer		buf;	Page		page;	Form_pg_sequence seq;	int64		incby,				maxv,				minv,				cache,				log,				fetch,				last;	int64		result,				next,				rescnt = 0;	bool		logit = false;	sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin,															 "nextval"));	/* open and AccessShareLock sequence */	init_sequence(sequence, &elm, &seqrel);	if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)		ereport(ERROR,				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),				 errmsg("permission denied for sequence %s",						sequence->relname)));	if (elm->last != elm->cached)		/* some numbers were cached */	{		elm->last += elm->increment;		relation_close(seqrel, NoLock);		PG_RETURN_INT64(elm->last);	}	/* lock page' buffer and read tuple */	seq = read_info(elm, seqrel, &buf);	page = BufferGetPage(buf);	last = next = result = seq->last_value;	incby = seq->increment_by;	maxv = seq->max_value;	minv = seq->min_value;	fetch = cache = seq->cache_value;	log = seq->log_cnt;	if (!seq->is_called)	{		rescnt++;				/* last_value if not called */		fetch--;		log--;	}	/*	 * Decide whether we should emit a WAL log record.	If so, force up	 * the fetch count to grab SEQ_LOG_VALS more values than we actually	 * need to cache.  (These will then be usable without logging.)	 *	 * If this is the first nextval after a checkpoint, we must force a new	 * WAL record to be written anyway, else replay starting from the	 * checkpoint would fail to advance the sequence past the logged	 * values.	In this case we may as well fetch extra values.	 */	if (log < fetch)	{		/* forced log to satisfy local demand for values */		fetch = log = fetch + SEQ_LOG_VALS;		logit = true;	}	else	{		XLogRecPtr	redoptr = GetRedoRecPtr();		if (XLByteLE(PageGetLSN(page), redoptr))		{			/* last update of seq was before checkpoint */			fetch = log = fetch + SEQ_LOG_VALS;			logit = true;		}	}	while (fetch)				/* try to fetch cache [+ log ] numbers */	{		/*		 * Check MAXVALUE for ascending sequences and MINVALUE for		 * descending sequences		 */		if (incby > 0)		{			/* ascending sequence */			if ((maxv >= 0 && next > maxv - incby) ||				(maxv < 0 && next + incby > maxv))			{				if (rescnt > 0)					break;		/* stop fetching */				if (!seq->is_cycled)				{					char		buf[100];					snprintf(buf, sizeof(buf), INT64_FORMAT, maxv);					ereport(ERROR,					  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),					   errmsg("nextval: reached maximum value of sequence \"%s\" (%s)",							  sequence->relname, buf)));				}				next = minv;			}			else				next += incby;		}		else		{			/* descending sequence */			if ((minv < 0 && next < minv - incby) ||				(minv >= 0 && next + incby < minv))			{				if (rescnt > 0)					break;		/* stop fetching */				if (!seq->is_cycled)				{					char		buf[100];					snprintf(buf, sizeof(buf), INT64_FORMAT, minv);					ereport(ERROR,					  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),					   errmsg("nextval: reached minimum value of sequence \"%s\" (%s)",							  sequence->relname, buf)));				}				next = maxv;			}			else				next += incby;		}		fetch--;		if (rescnt < cache)		{			log--;			rescnt++;			last = next;			if (rescnt == 1)	/* if it's first result - */				result = next;	/* it's what to return */		}	}	log -= fetch;				/* adjust for any unfetched numbers */	Assert(log >= 0);	/* save info in local cache */	elm->last = result;			/* last returned number */	elm->cached = last;			/* last fetched number */	START_CRIT_SECTION();	/* XLOG stuff */	if (logit && !seqrel->rd_istemp)	{		xl_seq_rec	xlrec;		XLogRecPtr	recptr;		XLogRecData rdata[2];		xlrec.node = seqrel->rd_node;		rdata[0].buffer = InvalidBuffer;		rdata[0].data = (char *) &xlrec;		rdata[0].len = sizeof(xl_seq_rec);		rdata[0].next = &(rdata[1]);		/* set values that will be saved in xlog */		seq->last_value = next;		seq->is_called = true;		seq->log_cnt = 0;		rdata[1].buffer = InvalidBuffer;		rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;		rdata[1].len = ((PageHeader) page)->pd_special -			((PageHeader) page)->pd_upper;		rdata[1].next = NULL;		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG | XLOG_NO_TRAN, rdata);		PageSetLSN(page, recptr);		PageSetSUI(page, ThisStartUpID);	}	/* update on-disk data */	seq->last_value = last;		/* last fetched number */	seq->is_called = true;	seq->log_cnt = log;			/* how much is logged */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?