📄 nodeagg.c
字号:
MemoryContextSwitchTo(oldContext); } if (haveOldVal && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); tuplesort_end(peraggstate->sortstate); peraggstate->sortstate = NULL;}/* * Compute the final value of one aggregate. * * The finalfunction will be run, and the result delivered, in the * output-tuple context; caller's CurrentMemoryContext does not matter. */static voidfinalize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull){ MemoryContext oldContext; oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); /* * Apply the agg's finalfn if one is provided, else return transValue. */ if (OidIsValid(peraggstate->finalfn_oid)) { FunctionCallInfoData fcinfo; MemSet(&fcinfo, 0, sizeof(fcinfo)); fcinfo.flinfo = &peraggstate->finalfn; fcinfo.nargs = 1; fcinfo.arg[0] = pergroupstate->transValue; fcinfo.argnull[0] = pergroupstate->transValueIsNull; if (fcinfo.flinfo->fn_strict && pergroupstate->transValueIsNull) { /* don't call a strict function with NULL inputs */ *resultVal = (Datum) 0; *resultIsNull = true; } else { *resultVal = FunctionCallInvoke(&fcinfo); *resultIsNull = fcinfo.isnull; } } else { *resultVal = pergroupstate->transValue; *resultIsNull = pergroupstate->transValueIsNull; } /* * If result is pass-by-ref, make sure it is in the right context. */ if (!peraggstate->resulttypeByVal && !*resultIsNull && !MemoryContextContains(CurrentMemoryContext, DatumGetPointer(*resultVal))) *resultVal = datumCopy(*resultVal, peraggstate->resulttypeByVal, peraggstate->resulttypeLen); MemoryContextSwitchTo(oldContext);}/* * Initialize the hash table to empty. * * The hash table always lives in the aggcontext memory context. */static voidbuild_hash_table(AggState *aggstate){ Agg *node = (Agg *) aggstate->ss.ps.plan; MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; Size entrysize; Assert(node->aggstrategy == AGG_HASHED); Assert(node->numGroups > 0); entrysize = sizeof(AggHashEntryData) + (aggstate->numaggs - 1) *sizeof(AggStatePerGroupData); aggstate->hashtable = BuildTupleHashTable(node->numCols, node->grpColIdx, aggstate->eqfunctions, aggstate->hashfunctions, node->numGroups, entrysize, aggstate->aggcontext, tmpmem);}/* * Find or create a hashtable entry for the tuple group containing the * given tuple. * * When called, CurrentMemoryContext should be the per-query context. */static AggHashEntrylookup_hash_entry(AggState *aggstate, TupleTableSlot *slot){ AggHashEntry entry; bool isnew; entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable, slot, &isnew); if (isnew) { /* initialize aggregates for new tuple group */ initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); } return entry;}/* * ExecAgg - * * ExecAgg receives tuples from its outer subplan and aggregates over * the appropriate attribute for each aggregate function use (Aggref * node) appearing in the targetlist or qual of the node. The number * of tuples to aggregate over depends on whether grouped or plain * aggregation is selected. In grouped aggregation, we produce a result * row for each group; in plain aggregation there's a single result row * for the whole query. In either case, the value of each aggregate is * stored in the expression context to be used when ExecProject evaluates * the result tuple. */TupleTableSlot *ExecAgg(AggState *node){ if (node->agg_done) return NULL; if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { if (!node->table_filled) agg_fill_hash_table(node); return agg_retrieve_hash_table(node); } else return agg_retrieve_direct(node);}/* * ExecAgg for non-hashed case */static TupleTableSlot *agg_retrieve_direct(AggState *aggstate){ Agg *node = (Agg *) aggstate->ss.ps.plan; PlanState *outerPlan; ExprContext *econtext; ExprContext *tmpcontext; ProjectionInfo *projInfo; Datum *aggvalues; bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; TupleTableSlot *outerslot; TupleTableSlot *firstSlot; int aggno; /* * get state info from node */ outerPlan = outerPlanState(aggstate); /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; aggvalues = econtext->ecxt_aggvalues; aggnulls = econtext->ecxt_aggnulls; /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; projInfo = aggstate->ss.ps.ps_ProjInfo; peragg = aggstate->peragg; pergroup = aggstate->pergroup; firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one matching * aggstate->ss.ps.qual */ while (!aggstate->agg_done) { /* * If we don't already have the first tuple of the new group, * fetch it from the outer plan. */ if (aggstate->grp_firstTuple == NULL) { outerslot = ExecProcNode(outerPlan); if (!TupIsNull(outerslot)) { /* * Make a copy of the first input tuple; we will use this * for comparisons (in group mode) and for projection. */ aggstate->grp_firstTuple = heap_copytuple(outerslot->val); } else { /* outer plan produced no tuples at all */ aggstate->agg_done = true; /* If we are grouping, we should produce no tuples too */ if (node->aggstrategy != AGG_PLAIN) return NULL; } } /* * Clear the per-output-tuple context for each group */ ResetExprContext(econtext); /* * Initialize working state for a new input tuple group */ initialize_aggregates(aggstate, peragg, pergroup); if (aggstate->grp_firstTuple != NULL) { /* * Store the copied first input tuple in the tuple table slot * reserved for it. The tuple will be deleted when it is * cleared from the slot. */ ExecStoreTuple(aggstate->grp_firstTuple, firstSlot, InvalidBuffer, true); aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ /* set up for first advance_aggregates call */ tmpcontext->ecxt_scantuple = firstSlot; /* * Process each outer-plan tuple, and then fetch the next one, * until we exhaust the outer plan or cross a group boundary. */ for (;;) { advance_aggregates(aggstate, pergroup); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); outerslot = ExecProcNode(outerPlan); if (TupIsNull(outerslot)) { /* no more outer-plan tuples available */ aggstate->agg_done = true; break; } /* set up for next advance_aggregates call */ tmpcontext->ecxt_scantuple = outerslot; /* * If we are grouping, check whether we've crossed a group * boundary. */ if (node->aggstrategy == AGG_SORTED) { if (!execTuplesMatch(firstSlot->val, outerslot->val, firstSlot->ttc_tupleDescriptor, node->numCols, node->grpColIdx, aggstate->eqfunctions, tmpcontext->ecxt_per_tuple_memory)) { /* * Save the first input tuple of the next group. */ aggstate->grp_firstTuple = heap_copytuple(outerslot->val); break; } } } } /* * Done scanning input tuple group. Finalize each aggregate * calculation, and stash results in the per-output-tuple context. */ for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; if (peraggstate->aggref->aggdistinct) process_sorted_aggregate(aggstate, peraggstate, pergroupstate); finalize_aggregate(aggstate, peraggstate, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); } /* * If we have no first tuple (ie, the outerPlan didn't return * anything), create a dummy all-nulls input tuple for use by * ExecQual/ExecProject. 99.44% of the time this is a waste of cycles, * because ordinarily the projected output tuple's targetlist * cannot contain any direct (non-aggregated) references to input * columns, so the dummy tuple will not be referenced. However * there are special cases where this isn't so --- in particular * an UPDATE involving an aggregate will have a targetlist * reference to ctid. We need to return a null for ctid in that * situation, not coredump. * * The values returned for the aggregates will be the initial values * of the transition functions. */ if (TupIsNull(firstSlot)) { TupleDesc tupType; /* Should only happen in non-grouped mode */ Assert(node->aggstrategy == AGG_PLAIN); Assert(aggstate->agg_done); tupType = firstSlot->ttc_tupleDescriptor; /* watch out for zero-column input tuples, though... */ if (tupType && tupType->natts > 0) { HeapTuple nullsTuple; Datum *dvalues; char *dnulls; dvalues = (Datum *) palloc0(sizeof(Datum) * tupType->natts); dnulls = (char *) palloc(sizeof(char) * tupType->natts); MemSet(dnulls, 'n', sizeof(char) * tupType->natts); nullsTuple = heap_formtuple(tupType, dvalues, dnulls); ExecStoreTuple(nullsTuple, firstSlot, InvalidBuffer, true); pfree(dvalues); pfree(dnulls); } } /* * Use the representative input tuple for any references to * non-aggregated input columns in the qual and tlist. */ econtext->ecxt_scantuple = firstSlot; /* * Check the qual (HAVING clause); if the group does not match, * ignore it and loop back to try to process another group. */ if (ExecQual(aggstate->ss.ps.qual, econtext, false)) { /* * Form and return a projection tuple using the aggregate results * and the representative input tuple. Note we do not support * aggregates returning sets ... */ return ExecProject(projInfo, NULL); } } /* No more groups */ return NULL;}/* * ExecAgg for hashed case: phase 1, read input and build hash table */static voidagg_fill_hash_table(AggState *aggstate){ PlanState *outerPlan; ExprContext *tmpcontext; AggHashEntry entry; TupleTableSlot *outerslot; /* * get state info from node */ outerPlan = outerPlanState(aggstate); /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; /* * Process each outer-plan tuple, and then fetch the next one, until * we exhaust the outer plan. */ for (;;) { outerslot = ExecProcNode(outerPlan); if (TupIsNull(outerslot)) break; /* set up for advance_aggregates call */ tmpcontext->ecxt_scantuple = outerslot; /* Find or build hashtable entry for this tuple's group */ entry = lookup_hash_entry(aggstate, outerslot); /* Advance the aggregates */ advance_aggregates(aggstate, entry->pergroup); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); } aggstate->table_filled = true; /* Initialize to walk the hash table */ ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter);}/* * ExecAgg for hashed case: phase 2, retrieving groups from hash table */static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate){ ExprContext *econtext; ProjectionInfo *projInfo; Datum *aggvalues; bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; AggHashEntry entry; TupleTableSlot *firstSlot; int aggno; /* * get state info from node */ /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; aggvalues = econtext->ecxt_aggvalues; aggnulls = econtext->ecxt_aggnulls; projInfo = aggstate->ss.ps.ps_ProjInfo; peragg = aggstate->peragg; firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one satisfying * aggstate->ss.ps.qual */ while (!aggstate->agg_done) { /* * Find the next entry in the hash table */ entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter); if (entry == NULL) { /* No more entries in hashtable, so done */ aggstate->agg_done = TRUE; return NULL; } /* * Clear the per-output-tuple context for each group */ ResetExprContext(econtext); /* * Store the copied first input tuple in the tuple table slot * reserved for it, so that it can be used in ExecProject. */ ExecStoreTuple(entry->shared.firstTuple, firstSlot, InvalidBuffer, false); pergroup = entry->pergroup; /* * Finalize each aggregate calculation, and stash results in the * per-output-tuple context. */ for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; Assert(!peraggstate->aggref->aggdistinct); finalize_aggregate(aggstate, peraggstate, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); } /* * Use the representative input tuple for any references to * non-aggregated input columns in the qual and tlist. */ econtext->ecxt_scantuple = firstSlot; /* * Check the qual (HAVING clause); if the group does not match, * ignore it and loop back to try to process another group. */ if (ExecQual(aggstate->ss.ps.qual, econtext, false)) { /* * Form and return a projection tuple using the aggregate results * and the representative input tuple. Note we do not support * aggregates returning sets ... */ return ExecProject(projInfo, NULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -