📄 nodeagg.c
字号:
} } /* No more groups */ return NULL;}/* ----------------- * ExecInitAgg * * Creates the run-time information for the agg node produced by the * planner and initializes its outer subtree * ----------------- */AggState *ExecInitAgg(Agg *node, EState *estate){ AggState *aggstate; AggStatePerAgg peragg; Plan *outerPlan; ExprContext *econtext; int numaggs, aggno; List *alist; /* * create state structure */ aggstate = makeNode(AggState); aggstate->ss.ps.plan = (Plan *) node; aggstate->ss.ps.state = estate; aggstate->aggs = NIL; aggstate->numaggs = 0; aggstate->eqfunctions = NULL; aggstate->hashfunctions = NULL; aggstate->peragg = NULL; aggstate->agg_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; aggstate->hashtable = NULL; /* * Create expression contexts. We need two, one for per-input-tuple * processing and one for per-output-tuple processing. We cheat a * little by using ExecAssignExprContext() to build both. */ ExecAssignExprContext(estate, &aggstate->ss.ps); aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; ExecAssignExprContext(estate, &aggstate->ss.ps); /* * We also need a long-lived memory context for holding hashtable data * structures and transition values. NOTE: the details of what is * stored in aggcontext and what is stored in the regular per-query * memory context are driven by a simple decision: we want to reset * the aggcontext in ExecReScanAgg to recover no-longer-wanted space. */ aggstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, "AggContext", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);#define AGG_NSLOTS 2 /* * tuple table initialization */ ExecInitScanTupleSlot(estate, &aggstate->ss); ExecInitResultTupleSlot(estate, &aggstate->ss.ps); /* * initialize child expressions * * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs * contain other agg calls in their arguments. This would make no * sense under SQL semantics anyway (and it's forbidden by the spec). * Because that is true, we don't need to worry about evaluating the * aggs in any particular order. */ aggstate->ss.ps.targetlist = (List *) ExecInitExpr((Expr *) node->plan.targetlist, (PlanState *) aggstate); aggstate->ss.ps.qual = (List *) ExecInitExpr((Expr *) node->plan.qual, (PlanState *) aggstate); /* * initialize child nodes */ outerPlan = outerPlan(node); outerPlanState(aggstate) = ExecInitNode(outerPlan, estate); /* * initialize source tuple type. */ ExecAssignScanTypeFromOuterPlan(&aggstate->ss); /* * Initialize result tuple type and projection info. */ ExecAssignResultTypeFromTL(&aggstate->ss.ps); ExecAssignProjectionInfo(&aggstate->ss.ps); /* * get the count of aggregates in targetlist and quals */ numaggs = aggstate->numaggs; Assert(numaggs == length(aggstate->aggs)); if (numaggs <= 0) { /* * This is not an error condition: we might be using the Agg node * just to do hash-based grouping. Even in the regular case, * constant-expression simplification could optimize away all of * the Aggrefs in the targetlist and qual. So keep going, but * force local copy of numaggs positive so that palloc()s below * don't choke. */ numaggs = 1; } /* * If we are grouping, precompute fmgr lookup data for inner loop. We * need both equality and hashing functions to do it by hashing, but * only equality if not hashing. */ if (node->numCols > 0) { if (node->aggstrategy == AGG_HASHED) execTuplesHashPrepare(ExecGetScanType(&aggstate->ss), node->numCols, node->grpColIdx, &aggstate->eqfunctions, &aggstate->hashfunctions); else aggstate->eqfunctions = execTuplesMatchPrepare(ExecGetScanType(&aggstate->ss), node->numCols, node->grpColIdx); } /* * Set up aggregate-result storage in the output expr context, and * also allocate my private per-agg working storage */ econtext = aggstate->ss.ps.ps_ExprContext; econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); aggstate->peragg = peragg; if (node->aggstrategy == AGG_HASHED) { build_hash_table(aggstate); aggstate->table_filled = false; } else { AggStatePerGroup pergroup; pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); aggstate->pergroup = pergroup; } /* * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg data. We also detect duplicate * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). * When duplicates are detected, we only make an AggStatePerAgg struct * for the first one. The clones are simply pointed at the same * result entry by giving them duplicate aggno values. */ aggno = -1; foreach(alist, aggstate->aggs) { AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(alist); Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; AggStatePerAgg peraggstate; Oid inputType; HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; AclResult aclresult; Oid transfn_oid, finalfn_oid; Expr *transfnexpr, *finalfnexpr; Datum textInitVal; int i; /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); /* Look for a previous duplicate aggregate */ for (i = 0; i <= aggno; i++) { if (equal(aggref, peragg[i].aggref) && !contain_volatile_functions((Node *) aggref)) break; } if (i <= aggno) { /* Found a match to an existing entry, so just mark it */ aggrefstate->aggno = i; continue; } /* Nope, so assign a new PerAgg record */ peraggstate = &peragg[++aggno]; /* Mark Aggref state node with assigned index in the result array */ aggrefstate->aggno = aggno; /* Fill in the peraggstate data */ peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; /* * Get actual datatype of the input. We need this because it may * be different from the agg's declared input type, when the agg * accepts ANY (eg, COUNT(*)) or ANYARRAY or ANYELEMENT. */ inputType = exprType((Node *) aggref->target); aggTuple = SearchSysCache(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid), 0, 0, 0); if (!HeapTupleIsValid(aggTuple)) elog(ERROR, "cache lookup failed for aggregate %u", aggref->aggfnoid); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); /* Check permission to call aggregate function */ aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; /* resolve actual type of transition state, if polymorphic */ aggtranstype = aggform->aggtranstype; if (aggtranstype == ANYARRAYOID || aggtranstype == ANYELEMENTOID) { /* have to fetch the agg's declared input type... */ Oid agg_arg_types[FUNC_MAX_ARGS]; int agg_nargs; (void) get_func_signature(aggref->aggfnoid, agg_arg_types, &agg_nargs); Assert(agg_nargs == 1); aggtranstype = resolve_generic_type(aggtranstype, inputType, agg_arg_types[0]); } /* build expression trees using actual argument & result types */ build_aggregate_fnexprs(inputType, aggtranstype, aggref->aggtype, transfn_oid, finalfn_oid, &transfnexpr, &finalfnexpr); fmgr_info(transfn_oid, &peraggstate->transfn); peraggstate->transfn.fn_expr = (Node *) transfnexpr; if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); peraggstate->finalfn.fn_expr = (Node *) finalfnexpr; } get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); /* * initval is potentially null, so don't try to access it as a * struct field. Must do it the hard way with SysCacheGetAttr. */ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) peraggstate->initValue = (Datum) 0; else peraggstate->initValue = GetAggInitVal(textInitVal, aggtranstype); /* * If the transfn is strict and the initval is NULL, make sure * input type and transtype are the same (or at least binary- * compatible), so that it's OK to use the first input value as * the initial transValue. This should have been checked at agg * definition time, but just in case... */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { if (!IsBinaryCoercible(inputType, aggtranstype)) ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); } if (aggref->aggdistinct) { Oid eq_function; /* We don't implement DISTINCT aggs in the HASHED case */ Assert(node->aggstrategy != AGG_HASHED); peraggstate->inputType = inputType; get_typlenbyval(inputType, &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal); eq_function = equality_oper_funcid(inputType); fmgr_info(eq_function, &(peraggstate->equalfn)); peraggstate->sortOperator = ordering_oper_opid(inputType); peraggstate->sortstate = NULL; } ReleaseSysCache(aggTuple); } /* Update numaggs to match number of unique aggregates found */ aggstate->numaggs = aggno + 1; return aggstate;}static DatumGetAggInitVal(Datum textInitVal, Oid transtype){ char *strInitVal; HeapTuple tup; Oid typinput, typelem; Datum initVal; strInitVal = DatumGetCString(DirectFunctionCall1(textout, textInitVal)); tup = SearchSysCache(TYPEOID, ObjectIdGetDatum(transtype), 0, 0, 0); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", transtype); typinput = ((Form_pg_type) GETSTRUCT(tup))->typinput; typelem = ((Form_pg_type) GETSTRUCT(tup))->typelem; ReleaseSysCache(tup); initVal = OidFunctionCall3(typinput, CStringGetDatum(strInitVal), ObjectIdGetDatum(typelem), Int32GetDatum(-1)); pfree(strInitVal); return initVal;}intExecCountSlotsAgg(Agg *node){ return ExecCountSlotsNode(outerPlan(node)) + ExecCountSlotsNode(innerPlan(node)) + AGG_NSLOTS;}voidExecEndAgg(AggState *node){ PlanState *outerPlan; int aggno; /* Make sure we have closed any open tuplesorts */ for (aggno = 0; aggno < node->numaggs; aggno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; if (peraggstate->sortstate) tuplesort_end(peraggstate->sortstate); } /* * Free both the expr contexts. */ ExecFreeExprContext(&node->ss.ps); node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); /* clean up tuple table */ ExecClearTuple(node->ss.ss_ScanTupleSlot); MemoryContextDelete(node->aggcontext); outerPlan = outerPlanState(node); ExecEndNode(outerPlan);}voidExecReScanAgg(AggState *node, ExprContext *exprCtxt){ ExprContext *econtext = node->ss.ps.ps_ExprContext; int aggno; node->agg_done = false; if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { /* * In the hashed case, if we haven't yet built the hash table then * we can just return; nothing done yet, so nothing to undo. If * subnode's chgParam is not NULL then it will be re-scanned by * ExecProcNode, else no reason to re-scan it at all. */ if (!node->table_filled) return; /* * If we do have the hash table and the subplan does not have any * parameter changes, then we can just rescan the existing hash * table; no need to build it again. */ if (((PlanState *) node)->lefttree->chgParam == NULL) { ResetTupleHashIterator(node->hashtable, &node->hashiter); return; } } /* Make sure we have closed any open tuplesorts */ for (aggno = 0; aggno < node->numaggs; aggno++) { AggStatePerAgg peraggstate = &node->peragg[aggno]; if (peraggstate->sortstate) tuplesort_end(peraggstate->sortstate); peraggstate->sortstate = NULL; } /* Release first tuple of group, if we have made a copy */ if (node->grp_firstTuple != NULL) { heap_freetuple(node->grp_firstTuple); node->grp_firstTuple = NULL; } /* Forget current agg values */ MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); /* Release all temp storage */ MemoryContextReset(node->aggcontext); if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { /* Rebuild an empty hash table */ build_hash_table(node); node->table_filled = false; } else { /* Reset the per-group state (in particular, mark transvalues null) */ MemSet(node->pergroup, 0, sizeof(AggStatePerGroupData) * node->numaggs); } /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (((PlanState *) node)->lefttree->chgParam == NULL) ExecReScan(((PlanState *) node)->lefttree, exprCtxt);}/* * aggregate_dummy - dummy execution routine for aggregate functions * * This function is listed as the implementation (prosrc field) of pg_proc * entries for aggregate functions. Its only purpose is to throw an error * if someone mistakenly executes such a function in the normal way. * * Perhaps someday we could assign real meaning to the prosrc field of * an aggregate? */Datumaggregate_dummy(PG_FUNCTION_ARGS){ elog(ERROR, "aggregate function %u called as normal function", fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -