typecmds.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,044 行 · 第 1/4 页
C
2,044 行
{ int i; /* Test attributes that are of the domain */ for (i = 0; i < rtc->natts; i++) { int attnum = rtc->atts[i]; Datum d; bool isNull; Datum conResult; d = heap_getattr(tuple, attnum, tupdesc, &isNull); econtext->domainValue_datum = d; econtext->domainValue_isNull = isNull; conResult = ExecEvalExprSwitchContext(exprstate, econtext, &isNull, NULL); if (!isNull && !DatumGetBool(conResult)) ereport(ERROR, (errcode(ERRCODE_CHECK_VIOLATION), errmsg("column \"%s\" of table \"%s\" contains values that violate the new constraint", NameStr(tupdesc->attrs[attnum - 1]->attname), RelationGetRelationName(testrel)))); } ResetExprContext(econtext); } heap_endscan(scan); /* Hold relation lock till commit (XXX bad for concurrency) */ heap_close(testrel, NoLock); } FreeExecutorState(estate); /* Clean up */ heap_close(typrel, RowExclusiveLock);}/* * get_rels_with_domain * * Fetch all relations / attributes which are using the domain * * The result is a list of RelToCheck structs, one for each distinct * relation, each containing one or more attribute numbers that are of * the domain type. We have opened each rel and acquired the specified lock * type on it. * * XXX this is completely broken because there is no way to lock the domain * to prevent columns from being added or dropped while our command runs. * We can partially protect against column drops by locking relations as we * come across them, but there is still a race condition (the window between * seeing a pg_depend entry and acquiring lock on the relation it references). * Also, holding locks on all these relations simultaneously creates a non- * trivial risk of deadlock. We can minimize but not eliminate the deadlock * risk by using the weakest suitable lock (ShareLock for most callers). * * XXX to support domains over domains, we'd need to make this smarter, * or make its callers smarter, so that we could find columns of derived * domains. Arrays of domains would be a problem too. * * Generally used for retrieving a list of tests when adding * new constraints to a domain. */static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode){ List *result = NIL; Relation depRel; ScanKeyData key[2]; SysScanDesc depScan; HeapTuple depTup; /* * We scan pg_depend to find those things that depend on the domain. * (We assume we can ignore refobjsubid for a domain.) */ depRel = relation_openr(DependRelationName, AccessShareLock); ScanKeyEntryInitialize(&key[0], 0x0, Anum_pg_depend_refclassid, F_OIDEQ, ObjectIdGetDatum(RelOid_pg_type)); ScanKeyEntryInitialize(&key[1], 0x0, Anum_pg_depend_refobjid, F_OIDEQ, ObjectIdGetDatum(domainOid)); depScan = systable_beginscan(depRel, DependReferenceIndex, true, SnapshotNow, 2, key); while (HeapTupleIsValid(depTup = systable_getnext(depScan))) { Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup); RelToCheck *rtc = NULL; List *rellist; Form_pg_attribute pg_att; int ptr; /* Ignore dependees that aren't user columns of tables */ /* (we assume system columns are never of domain types) */ if (pg_depend->classid != RelOid_pg_class || pg_depend->objsubid <= 0) continue; /* See if we already have an entry for this relation */ foreach(rellist, result) { RelToCheck *rt = (RelToCheck *) lfirst(rellist); if (RelationGetRelid(rt->rel) == pg_depend->objid) { rtc = rt; break; } } if (rtc == NULL) { /* First attribute found for this relation */ Relation rel; /* Acquire requested lock on relation */ rel = heap_open(pg_depend->objid, lockmode); /* Build the RelToCheck entry with enough space for all atts */ rtc = (RelToCheck *) palloc(sizeof(RelToCheck)); rtc->rel = rel; rtc->natts = 0; rtc->atts = (int *) palloc(sizeof(int) * RelationGetNumberOfAttributes(rel)); result = lcons(rtc, result); } /* * Confirm column has not been dropped, and is of the expected * type. This defends against an ALTER DROP COLUMN occuring just * before we acquired lock ... but if the whole table were * dropped, we'd still have a problem. */ if (pg_depend->objsubid > RelationGetNumberOfAttributes(rtc->rel)) continue; pg_att = rtc->rel->rd_att->attrs[pg_depend->objsubid - 1]; if (pg_att->attisdropped || pg_att->atttypid != domainOid) continue; /* * Okay, add column to result. We store the columns in * column-number order; this is just a hack to improve * predictability of regression test output ... */ Assert(rtc->natts < RelationGetNumberOfAttributes(rtc->rel)); ptr = rtc->natts++; while (ptr > 0 && rtc->atts[ptr - 1] > pg_depend->objsubid) { rtc->atts[ptr] = rtc->atts[ptr - 1]; ptr--; } rtc->atts[ptr] = pg_depend->objsubid; } systable_endscan(depScan); relation_close(depRel, AccessShareLock); return result;}/* * domainOwnerCheck * * Throw an error if the current user doesn't have permission to modify * the domain in an ALTER DOMAIN statement, or if the type isn't actually * a domain. */static voiddomainOwnerCheck(HeapTuple tup, TypeName *typename){ Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup); /* Check that this is actually a domain */ if (typTup->typtype != 'd') ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a domain", TypeNameToString(typename)))); /* Permission check: must own type */ if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, TypeNameToString(typename));}/* * domainAddConstraint - code shared between CREATE and ALTER DOMAIN */static char *domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, int typMod, Constraint *constr, int *counter, char *domainName){ Node *expr; char *ccsrc; char *ccbin; ParseState *pstate; CoerceToDomainValue *domVal; /* * Assign or validate constraint name */ if (constr->name) { if (ConstraintNameIsUsed(CONSTRAINT_DOMAIN, domainOid, domainNamespace, constr->name)) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("constraint \"%s\" for domain \"%s\" already exists", constr->name, domainName))); } else constr->name = GenerateConstraintName(CONSTRAINT_DOMAIN, domainOid, domainNamespace, counter); /* * Convert the A_EXPR in raw_expr into an EXPR */ pstate = make_parsestate(NULL); /* * Set up a CoerceToDomainValue to represent the occurrence of VALUE * in the expression. Note that it will appear to have the type of * the base type, not the domain. This seems correct since within the * check expression, we should not assume the input value can be * considered a member of the domain. */ domVal = makeNode(CoerceToDomainValue); domVal->typeId = baseTypeOid; domVal->typeMod = typMod; pstate->p_value_substitute = (Node *) domVal; expr = transformExpr(pstate, constr->raw_expr); /* * Make sure it yields a boolean result. */ expr = coerce_to_boolean(pstate, expr, "CHECK"); /* * Make sure no outside relations are referred to. */ if (length(pstate->p_rtable) != 0) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot use table references in domain check constraint"))); /* * Domains don't allow var clauses (this should be redundant with the * above check, but make it anyway) */ if (contain_var_clause(expr)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot use table references in domain check constraint"))); /* * No subplans or aggregates, either... */ if (pstate->p_hasSubLinks) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use subquery in check constraint"))); if (pstate->p_hasAggs) ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("cannot use aggregate in check constraint"))); /* * Convert to string form for storage. */ ccbin = nodeToString(expr); /* * Deparse it to produce text for consrc. * * Since VARNOs aren't allowed in domain constraints, relation context * isn't required as anything other than a shell. */ ccsrc = deparse_expression(expr, deparse_context_for(domainName, InvalidOid), false, false); /* * Store the constraint in pg_constraint */ CreateConstraintEntry(constr->name, /* Constraint Name */ domainNamespace, /* namespace */ CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ InvalidOid, /* not a relation constraint */ NULL, 0, domainOid, /* domain constraint */ InvalidOid, /* Foreign key fields */ NULL, 0, ' ', ' ', ' ', InvalidOid, expr, /* Tree form check constraint */ ccbin, /* Binary form check constraint */ ccsrc); /* Source form check constraint */ /* * Return the compiled constraint expression so the calling routine * can perform any additional required tests. */ return ccbin;}/* * GetDomainConstraints - get a list of the current constraints of domain * * Returns a possibly-empty list of DomainConstraintState nodes. * * This is called by the executor during plan startup for a CoerceToDomain * expression node. The given constraints will be checked for each value * passed through the node. */List *GetDomainConstraints(Oid typeOid){ List *result = NIL; bool notNull = false; Relation conRel; conRel = heap_openr(ConstraintRelationName, AccessShareLock); for (;;) { HeapTuple tup; HeapTuple conTup; Form_pg_type typTup; ScanKeyData key[1]; SysScanDesc scan; tup = SearchSysCache(TYPEOID, ObjectIdGetDatum(typeOid), 0, 0, 0); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", typeOid); typTup = (Form_pg_type) GETSTRUCT(tup); /* Test for NOT NULL Constraint */ if (typTup->typnotnull) notNull = true; /* Look for CHECK Constraints on this domain */ ScanKeyEntryInitialize(&key[0], 0x0, Anum_pg_constraint_contypid, F_OIDEQ, ObjectIdGetDatum(typeOid)); scan = systable_beginscan(conRel, ConstraintTypidIndex, true, SnapshotNow, 1, key); while (HeapTupleIsValid(conTup = systable_getnext(scan))) { Form_pg_constraint c = (Form_pg_constraint) GETSTRUCT(conTup); Datum val; bool isNull; Expr *check_expr; DomainConstraintState *r; /* Ignore non-CHECK constraints (presently, shouldn't be any) */ if (c->contype != CONSTRAINT_CHECK) continue; /* * Not expecting conbin to be NULL, but we'll test for it * anyway */ val = fastgetattr(conTup, Anum_pg_constraint_conbin, conRel->rd_att, &isNull); if (isNull) elog(ERROR, "domain \"%s\" constraint \"%s\" has NULL conbin", NameStr(typTup->typname), NameStr(c->conname)); check_expr = (Expr *) stringToNode(DatumGetCString(DirectFunctionCall1(textout, val))); /* ExecInitExpr assumes we already fixed opfuncids */ fix_opfuncids((Node *) check_expr); r = makeNode(DomainConstraintState); r->constrainttype = DOM_CONSTRAINT_CHECK; r->name = pstrdup(NameStr(c->conname)); r->check_expr = ExecInitExpr(check_expr, NULL); /* * use lcons() here because constraints of lower domains * should be applied earlier. */ result = lcons(r, result); } systable_endscan(scan); if (typTup->typtype != 'd') { /* Not a domain, so done */ ReleaseSysCache(tup); break; } /* else loop to next domain in stack */ typeOid = typTup->typbasetype; ReleaseSysCache(tup); } heap_close(conRel, AccessShareLock); /* * Only need to add one NOT NULL check regardless of how many domains * in the stack request it. */ if (notNull) { DomainConstraintState *r = makeNode(DomainConstraintState); r->constrainttype = DOM_CONSTRAINT_NOTNULL; r->name = pstrdup("NOT NULL"); r->check_expr = NULL; /* lcons to apply the nullness check FIRST */ result = lcons(r, result); } return result;}/* * ALTER DOMAIN .. OWNER TO * * Eventually this should allow changing ownership of other kinds of types, * but some thought must be given to handling complex types. (A table's * rowtype probably shouldn't be allowed as target, but what of a standalone * composite type?) * * Assumes that permission checks have been completed earlier. */voidAlterTypeOwner(List *names, AclId newOwnerSysId){ TypeName *typename; Oid typeOid; Relation rel; HeapTuple tup; Form_pg_type typTup; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeNode(TypeName); typename->names = names; typename->typmod = -1; typename->arrayBounds = NIL; /* Lock the type table */ rel = heap_openr(TypeRelationName, RowExclusiveLock); /* Use LookupTypeName here so that shell types can be processed (why?) */ typeOid = LookupTypeName(typename); if (!OidIsValid(typeOid)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("type \"%s\" does not exist", TypeNameToString(typename)))); tup = SearchSysCacheCopy(TYPEOID, ObjectIdGetDatum(typeOid), 0, 0, 0); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", typeOid); typTup = (Form_pg_type) GETSTRUCT(tup); /* Check that this is actually a domain */ if (typTup->typtype != 'd') ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a domain", TypeNameToString(typename)))); /* Modify the owner --- okay to scribble on typTup because it's a copy */ typTup->typowner = newOwnerSysId; simple_heap_update(rel, &tup->t_self, tup); CatalogUpdateIndexes(rel, tup); /* Clean up */ heap_close(rel, RowExclusiveLock);}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?