typecmds.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,044 行 · 第 1/4 页

C
2,044
字号
		{			int			i;			/* Test attributes that are of the domain */			for (i = 0; i < rtc->natts; i++)			{				int			attnum = rtc->atts[i];				Datum		d;				bool		isNull;				Datum		conResult;				d = heap_getattr(tuple, attnum, tupdesc, &isNull);				econtext->domainValue_datum = d;				econtext->domainValue_isNull = isNull;				conResult = ExecEvalExprSwitchContext(exprstate,													  econtext,													  &isNull, NULL);				if (!isNull && !DatumGetBool(conResult))					ereport(ERROR,							(errcode(ERRCODE_CHECK_VIOLATION),							 errmsg("column \"%s\" of table \"%s\" contains values that violate the new constraint",									NameStr(tupdesc->attrs[attnum - 1]->attname),									RelationGetRelationName(testrel))));			}			ResetExprContext(econtext);		}		heap_endscan(scan);		/* Hold relation lock till commit (XXX bad for concurrency) */		heap_close(testrel, NoLock);	}	FreeExecutorState(estate);	/* Clean up */	heap_close(typrel, RowExclusiveLock);}/* * get_rels_with_domain * * Fetch all relations / attributes which are using the domain * * The result is a list of RelToCheck structs, one for each distinct * relation, each containing one or more attribute numbers that are of * the domain type.  We have opened each rel and acquired the specified lock * type on it. * * XXX this is completely broken because there is no way to lock the domain * to prevent columns from being added or dropped while our command runs. * We can partially protect against column drops by locking relations as we * come across them, but there is still a race condition (the window between * seeing a pg_depend entry and acquiring lock on the relation it references). * Also, holding locks on all these relations simultaneously creates a non- * trivial risk of deadlock.  We can minimize but not eliminate the deadlock * risk by using the weakest suitable lock (ShareLock for most callers). * * XXX to support domains over domains, we'd need to make this smarter, * or make its callers smarter, so that we could find columns of derived * domains.  Arrays of domains would be a problem too. * * Generally used for retrieving a list of tests when adding * new constraints to a domain. */static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode){	List	   *result = NIL;	Relation	depRel;	ScanKeyData key[2];	SysScanDesc depScan;	HeapTuple	depTup;	/*	 * We scan pg_depend to find those things that depend on the domain.	 * (We assume we can ignore refobjsubid for a domain.)	 */	depRel = relation_openr(DependRelationName, AccessShareLock);	ScanKeyEntryInitialize(&key[0], 0x0,						   Anum_pg_depend_refclassid, F_OIDEQ,						   ObjectIdGetDatum(RelOid_pg_type));	ScanKeyEntryInitialize(&key[1], 0x0,						   Anum_pg_depend_refobjid, F_OIDEQ,						   ObjectIdGetDatum(domainOid));	depScan = systable_beginscan(depRel, DependReferenceIndex, true,								 SnapshotNow, 2, key);	while (HeapTupleIsValid(depTup = systable_getnext(depScan)))	{		Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);		RelToCheck *rtc = NULL;		List	   *rellist;		Form_pg_attribute pg_att;		int			ptr;		/* Ignore dependees that aren't user columns of tables */		/* (we assume system columns are never of domain types) */		if (pg_depend->classid != RelOid_pg_class ||			pg_depend->objsubid <= 0)			continue;		/* See if we already have an entry for this relation */		foreach(rellist, result)		{			RelToCheck *rt = (RelToCheck *) lfirst(rellist);			if (RelationGetRelid(rt->rel) == pg_depend->objid)			{				rtc = rt;				break;			}		}		if (rtc == NULL)		{			/* First attribute found for this relation */			Relation	rel;			/* Acquire requested lock on relation */			rel = heap_open(pg_depend->objid, lockmode);			/* Build the RelToCheck entry with enough space for all atts */			rtc = (RelToCheck *) palloc(sizeof(RelToCheck));			rtc->rel = rel;			rtc->natts = 0;			rtc->atts = (int *) palloc(sizeof(int) * RelationGetNumberOfAttributes(rel));			result = lcons(rtc, result);		}		/*		 * Confirm column has not been dropped, and is of the expected		 * type. This defends against an ALTER DROP COLUMN occuring just		 * before we acquired lock ... but if the whole table were		 * dropped, we'd still have a problem.		 */		if (pg_depend->objsubid > RelationGetNumberOfAttributes(rtc->rel))			continue;		pg_att = rtc->rel->rd_att->attrs[pg_depend->objsubid - 1];		if (pg_att->attisdropped || pg_att->atttypid != domainOid)			continue;		/*		 * Okay, add column to result.	We store the columns in		 * column-number order; this is just a hack to improve		 * predictability of regression test output ...		 */		Assert(rtc->natts < RelationGetNumberOfAttributes(rtc->rel));		ptr = rtc->natts++;		while (ptr > 0 && rtc->atts[ptr - 1] > pg_depend->objsubid)		{			rtc->atts[ptr] = rtc->atts[ptr - 1];			ptr--;		}		rtc->atts[ptr] = pg_depend->objsubid;	}	systable_endscan(depScan);	relation_close(depRel, AccessShareLock);	return result;}/* * domainOwnerCheck * * Throw an error if the current user doesn't have permission to modify * the domain in an ALTER DOMAIN statement, or if the type isn't actually * a domain. */static voiddomainOwnerCheck(HeapTuple tup, TypeName *typename){	Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup);	/* Check that this is actually a domain */	if (typTup->typtype != 'd')		ereport(ERROR,				(errcode(ERRCODE_WRONG_OBJECT_TYPE),				 errmsg("\"%s\" is not a domain",						TypeNameToString(typename))));	/* Permission check: must own type */	if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()))		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE,					   TypeNameToString(typename));}/* * domainAddConstraint - code shared between CREATE and ALTER DOMAIN */static char *domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,					int typMod, Constraint *constr,					int *counter, char *domainName){	Node	   *expr;	char	   *ccsrc;	char	   *ccbin;	ParseState *pstate;	CoerceToDomainValue *domVal;	/*	 * Assign or validate constraint name	 */	if (constr->name)	{		if (ConstraintNameIsUsed(CONSTRAINT_DOMAIN,								 domainOid,								 domainNamespace,								 constr->name))			ereport(ERROR,					(errcode(ERRCODE_DUPLICATE_OBJECT),			 errmsg("constraint \"%s\" for domain \"%s\" already exists",					constr->name, domainName)));	}	else		constr->name = GenerateConstraintName(CONSTRAINT_DOMAIN,											  domainOid,											  domainNamespace,											  counter);	/*	 * Convert the A_EXPR in raw_expr into an EXPR	 */	pstate = make_parsestate(NULL);	/*	 * Set up a CoerceToDomainValue to represent the occurrence of VALUE	 * in the expression.  Note that it will appear to have the type of	 * the base type, not the domain.  This seems correct since within the	 * check expression, we should not assume the input value can be	 * considered a member of the domain.	 */	domVal = makeNode(CoerceToDomainValue);	domVal->typeId = baseTypeOid;	domVal->typeMod = typMod;	pstate->p_value_substitute = (Node *) domVal;	expr = transformExpr(pstate, constr->raw_expr);	/*	 * Make sure it yields a boolean result.	 */	expr = coerce_to_boolean(pstate, expr, "CHECK");	/*	 * Make sure no outside relations are referred to.	 */	if (length(pstate->p_rtable) != 0)		ereport(ERROR,				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),				 errmsg("cannot use table references in domain check constraint")));	/*	 * Domains don't allow var clauses (this should be redundant with the	 * above check, but make it anyway)	 */	if (contain_var_clause(expr))		ereport(ERROR,				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),				 errmsg("cannot use table references in domain check constraint")));	/*	 * No subplans or aggregates, either...	 */	if (pstate->p_hasSubLinks)		ereport(ERROR,				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),				 errmsg("cannot use subquery in check constraint")));	if (pstate->p_hasAggs)		ereport(ERROR,				(errcode(ERRCODE_GROUPING_ERROR),				 errmsg("cannot use aggregate in check constraint")));	/*	 * Convert to string form for storage.	 */	ccbin = nodeToString(expr);	/*	 * Deparse it to produce text for consrc.	 *	 * Since VARNOs aren't allowed in domain constraints, relation context	 * isn't required as anything other than a shell.	 */	ccsrc = deparse_expression(expr,							   deparse_context_for(domainName,												   InvalidOid),							   false, false);	/*	 * Store the constraint in pg_constraint	 */	CreateConstraintEntry(constr->name, /* Constraint Name */						  domainNamespace,		/* namespace */						  CONSTRAINT_CHECK,		/* Constraint Type */						  false,	/* Is Deferrable */						  false,	/* Is Deferred */						  InvalidOid,	/* not a relation constraint */						  NULL,						  0,						  domainOid,	/* domain constraint */						  InvalidOid,	/* Foreign key fields */						  NULL,						  0,						  ' ',						  ' ',						  ' ',						  InvalidOid,						  expr, /* Tree form check constraint */						  ccbin,	/* Binary form check constraint */						  ccsrc);		/* Source form check constraint */	/*	 * Return the compiled constraint expression so the calling routine	 * can perform any additional required tests.	 */	return ccbin;}/* * GetDomainConstraints - get a list of the current constraints of domain * * Returns a possibly-empty list of DomainConstraintState nodes. * * This is called by the executor during plan startup for a CoerceToDomain * expression node.  The given constraints will be checked for each value * passed through the node. */List *GetDomainConstraints(Oid typeOid){	List	   *result = NIL;	bool		notNull = false;	Relation	conRel;	conRel = heap_openr(ConstraintRelationName, AccessShareLock);	for (;;)	{		HeapTuple	tup;		HeapTuple	conTup;		Form_pg_type typTup;		ScanKeyData key[1];		SysScanDesc scan;		tup = SearchSysCache(TYPEOID,							 ObjectIdGetDatum(typeOid),							 0, 0, 0);		if (!HeapTupleIsValid(tup))			elog(ERROR, "cache lookup failed for type %u", typeOid);		typTup = (Form_pg_type) GETSTRUCT(tup);		/* Test for NOT NULL Constraint */		if (typTup->typnotnull)			notNull = true;		/* Look for CHECK Constraints on this domain */		ScanKeyEntryInitialize(&key[0], 0x0,							   Anum_pg_constraint_contypid, F_OIDEQ,							   ObjectIdGetDatum(typeOid));		scan = systable_beginscan(conRel, ConstraintTypidIndex, true,								  SnapshotNow, 1, key);		while (HeapTupleIsValid(conTup = systable_getnext(scan)))		{			Form_pg_constraint c = (Form_pg_constraint) GETSTRUCT(conTup);			Datum		val;			bool		isNull;			Expr	   *check_expr;			DomainConstraintState *r;			/* Ignore non-CHECK constraints (presently, shouldn't be any) */			if (c->contype != CONSTRAINT_CHECK)				continue;			/*			 * Not expecting conbin to be NULL, but we'll test for it			 * anyway			 */			val = fastgetattr(conTup, Anum_pg_constraint_conbin,							  conRel->rd_att, &isNull);			if (isNull)				elog(ERROR, "domain \"%s\" constraint \"%s\" has NULL conbin",					 NameStr(typTup->typname), NameStr(c->conname));			check_expr = (Expr *)				stringToNode(DatumGetCString(DirectFunctionCall1(textout,																 val)));			/* ExecInitExpr assumes we already fixed opfuncids */			fix_opfuncids((Node *) check_expr);			r = makeNode(DomainConstraintState);			r->constrainttype = DOM_CONSTRAINT_CHECK;			r->name = pstrdup(NameStr(c->conname));			r->check_expr = ExecInitExpr(check_expr, NULL);			/*			 * use lcons() here because constraints of lower domains			 * should be applied earlier.			 */			result = lcons(r, result);		}		systable_endscan(scan);		if (typTup->typtype != 'd')		{			/* Not a domain, so done */			ReleaseSysCache(tup);			break;		}		/* else loop to next domain in stack */		typeOid = typTup->typbasetype;		ReleaseSysCache(tup);	}	heap_close(conRel, AccessShareLock);	/*	 * Only need to add one NOT NULL check regardless of how many domains	 * in the stack request it.	 */	if (notNull)	{		DomainConstraintState *r = makeNode(DomainConstraintState);		r->constrainttype = DOM_CONSTRAINT_NOTNULL;		r->name = pstrdup("NOT NULL");		r->check_expr = NULL;		/* lcons to apply the nullness check FIRST */		result = lcons(r, result);	}	return result;}/* * ALTER DOMAIN .. OWNER TO * * Eventually this should allow changing ownership of other kinds of types, * but some thought must be given to handling complex types.  (A table's * rowtype probably shouldn't be allowed as target, but what of a standalone * composite type?) * * Assumes that permission checks have been completed earlier. */voidAlterTypeOwner(List *names, AclId newOwnerSysId){	TypeName   *typename;	Oid			typeOid;	Relation	rel;	HeapTuple	tup;	Form_pg_type typTup;	/* Make a TypeName so we can use standard type lookup machinery */	typename = makeNode(TypeName);	typename->names = names;	typename->typmod = -1;	typename->arrayBounds = NIL;	/* Lock the type table */	rel = heap_openr(TypeRelationName, RowExclusiveLock);	/* Use LookupTypeName here so that shell types can be processed (why?) */	typeOid = LookupTypeName(typename);	if (!OidIsValid(typeOid))		ereport(ERROR,				(errcode(ERRCODE_UNDEFINED_OBJECT),				 errmsg("type \"%s\" does not exist",						TypeNameToString(typename))));	tup = SearchSysCacheCopy(TYPEOID,							 ObjectIdGetDatum(typeOid),							 0, 0, 0);	if (!HeapTupleIsValid(tup))		elog(ERROR, "cache lookup failed for type %u", typeOid);	typTup = (Form_pg_type) GETSTRUCT(tup);	/* Check that this is actually a domain */	if (typTup->typtype != 'd')		ereport(ERROR,				(errcode(ERRCODE_WRONG_OBJECT_TYPE),				 errmsg("\"%s\" is not a domain",						TypeNameToString(typename))));	/* Modify the owner --- okay to scribble on typTup because it's a copy */	typTup->typowner = newOwnerSysId;	simple_heap_update(rel, &tup->t_self, tup);	CatalogUpdateIndexes(rel, tup);	/* Clean up */	heap_close(rel, RowExclusiveLock);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?