📄 nodeagg.c
字号:
/*------------------------------------------------------------------------- * * nodeAgg.c * Routines to handle aggregate nodes. * * ExecAgg evaluates each aggregate in the following steps: * * transvalue = initcond * foreach input_value do * transvalue = transfunc(transvalue, input_value) * result = finalfunc(transvalue) * * If a finalfunc is not supplied then the result is just the ending * value of transvalue. * * If transfunc is marked "strict" in pg_proc and initcond is NULL, * then the first non-NULL input_value is assigned directly to transvalue, * and transfunc isn't applied until the second non-NULL input_value. * The agg's input type and transtype must be the same in this case! * * If transfunc is marked "strict" then NULL input_values are skipped, * keeping the previous transvalue. If transfunc is not strict then it * is called for every input tuple and must deal with NULL initcond * or NULL input_value for itself. * * If finalfunc is marked "strict" then it is not called when the * ending transvalue is NULL, instead a NULL result is created * automatically (this is just the usual handling of strict functions, * of course). A non-strict finalfunc can make its own choice of * what to return for a NULL ending transvalue. * * We compute aggregate input expressions and run the transition functions * in a temporary econtext (aggstate->tmpcontext). This is reset at * least once per input tuple, so when the transvalue datatype is * pass-by-reference, we have to be careful to copy it into a longer-lived * memory context, and free the prior value to avoid memory leakage. * We store transvalues in the memory context aggstate->aggcontext, * which is also used for the hashtable structures in AGG_HASHED mode. * The node's regular econtext (aggstate->csstate.cstate.cs_ExprContext) * is used to run finalize functions and compute the output tuple; * this context can be reset once per output tuple. * * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.116.2.2 2004/07/10 18:39:44 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/heapam.h"#include "catalog/pg_aggregate.h"#include "catalog/pg_operator.h"#include "executor/executor.h"#include "executor/nodeAgg.h"#include "miscadmin.h"#include "optimizer/clauses.h"#include "parser/parse_agg.h"#include "parser/parse_coerce.h"#include "parser/parse_expr.h"#include "parser/parse_oper.h"#include "utils/acl.h"#include "utils/builtins.h"#include "utils/lsyscache.h"#include "utils/syscache.h"#include "utils/tuplesort.h"#include "utils/datum.h"/* * AggStatePerAggData - per-aggregate working state for the Agg scan */typedef struct AggStatePerAggData{ /* * These values are set up during ExecInitAgg() and do not change * thereafter: */ /* Links to Aggref expr and state nodes this working state is for */ AggrefExprState *aggrefstate; Aggref *aggref; /* Oids of transfer functions */ Oid transfn_oid; Oid finalfn_oid; /* may be InvalidOid */ /* * fmgr lookup data for transfer functions --- only valid when * corresponding oid is not InvalidOid. Note in particular that * fn_strict flags are kept here. */ FmgrInfo transfn; FmgrInfo finalfn; /* * Type of input data and Oid of sort operator to use for it; only * set/used when aggregate has DISTINCT flag. (These are not used * directly by nodeAgg, but must be passed to the Tuplesort object.) */ Oid inputType; Oid sortOperator; /* * fmgr lookup data for input type's equality operator --- only * set/used when aggregate has DISTINCT flag. */ FmgrInfo equalfn; /* * initial value from pg_aggregate entry */ Datum initValue; bool initValueIsNull; /* * We need the len and byval info for the agg's input, result, and * transition data types in order to know how to copy/delete values. */ int16 inputtypeLen, resulttypeLen, transtypeLen; bool inputtypeByVal, resulttypeByVal, transtypeByVal; /* * These values are working state that is initialized at the start of * an input tuple group and updated for each input tuple. * * For a simple (non DISTINCT) aggregate, we just feed the input values * straight to the transition function. If it's DISTINCT, we pass the * input values into a Tuplesort object; then at completion of the * input tuple group, we scan the sorted values, eliminate duplicates, * and run the transition function on the rest. */ Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */} AggStatePerAggData;/* * AggStatePerGroupData - per-aggregate-per-group working state * * These values are working state that is initialized at the start of * an input tuple group and updated for each input tuple. * * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these * structs (pointed to by aggstate->pergroup); we re-use the array for * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the * hash table contains an array of these structs for each tuple group. * * Logically, the sortstate field belongs in this struct, but we do not * keep it here for space reasons: we don't support DISTINCT aggregates * in AGG_HASHED mode, so there's no reason to use up a pointer field * in every entry of the hashtable. */typedef struct AggStatePerGroupData{ Datum transValue; /* current transition value */ bool transValueIsNull; bool noTransValue; /* true if transValue not set yet */ /* * Note: noTransValue initially has the same value as * transValueIsNull, and if true both are cleared to false at the same * time. They are not the same though: if transfn later returns a * NULL, we want to keep that NULL and not auto-replace it with a * later input value. Only the first non-NULL input will be * auto-substituted. */} AggStatePerGroupData;/* * To implement hashed aggregation, we need a hashtable that stores a * representative tuple and an array of AggStatePerGroup structs for each * distinct set of GROUP BY column values. We compute the hash key from * the GROUP BY columns. */typedef struct AggHashEntryData *AggHashEntry;typedef struct AggHashEntryData{ TupleHashEntryData shared; /* common header for hash table entries */ /* per-aggregate transition status array - must be last! */ AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */} AggHashEntryData; /* VARIABLE LENGTH STRUCT */static void initialize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup);static void advance_transition_function(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum newVal, bool isNull);static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup);static void process_sorted_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate);static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull);static void build_hash_table(AggState *aggstate);static AggHashEntry lookup_hash_entry(AggState *aggstate, TupleTableSlot *slot);static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);static void agg_fill_hash_table(AggState *aggstate);static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);static Datum GetAggInitVal(Datum textInitVal, Oid transtype);/* * Initialize all aggregates for a new group of input values. * * When called, CurrentMemoryContext should be the per-query context. */static voidinitialize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup){ int aggno; for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; Aggref *aggref = peraggstate->aggref; /* * Start a fresh sort operation for each DISTINCT aggregate. */ if (aggref->aggdistinct) { /* * In case of rescan, maybe there could be an uncompleted sort * operation? Clean it up if so. */ if (peraggstate->sortstate) tuplesort_end(peraggstate->sortstate); peraggstate->sortstate = tuplesort_begin_datum(peraggstate->inputType, peraggstate->sortOperator, false); } /* * If we are reinitializing after a group boundary, we have to free * any prior transValue to avoid memory leakage. We must check not * only the isnull flag but whether the pointer is NULL; since * pergroupstate is initialized with palloc0, the initial condition * has isnull = 0 and null pointer. */ if (!peraggstate->transtypeByVal && !pergroupstate->transValueIsNull && DatumGetPointer(pergroupstate->transValue) != NULL) pfree(DatumGetPointer(pergroupstate->transValue)); /* * (Re)set transValue to the initial value. * * Note that when the initial value is pass-by-ref, we must copy it * (into the aggcontext) since we will pfree the transValue later. */ if (peraggstate->initValueIsNull) pergroupstate->transValue = peraggstate->initValue; else { MemoryContext oldContext; oldContext = MemoryContextSwitchTo(aggstate->aggcontext); pergroupstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->transValueIsNull = peraggstate->initValueIsNull; /* * If the initial value for the transition state doesn't exist in * the pg_aggregate table then we will let the first non-NULL * value returned from the outer procNode become the initial * value. (This is useful for aggregates like max() and min().) * The noTransValue flag signals that we still need to do this. */ pergroupstate->noTransValue = peraggstate->initValueIsNull; }}/* * Given a new input value, advance the transition function of an aggregate. * * It doesn't matter which memory context this is called in. */static voidadvance_transition_function(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, Datum newVal, bool isNull){ FunctionCallInfoData fcinfo; MemoryContext oldContext; if (peraggstate->transfn.fn_strict) { /* * For a strict transfn, nothing happens at a NULL input tuple; we * just keep the prior transValue. */ if (isNull) return; if (pergroupstate->noTransValue) { /* * transValue has not been initialized. This is the first * non-NULL input value. We use it as the initial value for * transValue. (We already checked that the agg's input type * is binary-compatible with its transtype, so straight copy * here is OK.) * * We must copy the datum into aggcontext if it is pass-by-ref. * We do not need to pfree the old transValue, since it's * NULL. */ oldContext = MemoryContextSwitchTo(aggstate->aggcontext); pergroupstate->transValue = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); pergroupstate->transValueIsNull = false; pergroupstate->noTransValue = false; MemoryContextSwitchTo(oldContext); return; } if (pergroupstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the * transfn is strict *and* returned a NULL on a prior cycle. * If that happens we will propagate the NULL all the way to * the end. */ return; } } /* We run the transition functions in per-input-tuple memory context */ oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); /* * OK to call the transition function * * This is heavily-used code, so manually zero just the necessary fields * instead of using MemSet(). Compare FunctionCall2(). */ /* MemSet(&fcinfo, 0, sizeof(fcinfo)); */ fcinfo.context = NULL; fcinfo.resultinfo = NULL; fcinfo.isnull = false; fcinfo.flinfo = &peraggstate->transfn; fcinfo.nargs = 2; fcinfo.arg[0] = pergroupstate->transValue; fcinfo.argnull[0] = pergroupstate->transValueIsNull; fcinfo.arg[1] = newVal; fcinfo.argnull[1] = isNull; newVal = FunctionCallInvoke(&fcinfo); /* * If pass-by-ref datatype, must copy the new value into aggcontext * and pfree the prior transValue. But if transfn returned a pointer * to its first input, we don't need to do anything. */ if (!peraggstate->transtypeByVal && DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) { if (!fcinfo.isnull) { MemoryContextSwitchTo(aggstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); } if (!pergroupstate->transValueIsNull) pfree(DatumGetPointer(pergroupstate->transValue)); } pergroupstate->transValue = newVal; pergroupstate->transValueIsNull = fcinfo.isnull; MemoryContextSwitchTo(oldContext);}/* * Advance all the aggregates for one input tuple. The input tuple * has been stored in tmpcontext->ecxt_scantuple, so that it is accessible * to ExecEvalExpr. pergroup is the array of per-group structs to use * (this might be in a hashtable entry). * * When called, CurrentMemoryContext should be the per-query context. */static voidadvance_aggregates(AggState *aggstate, AggStatePerGroup pergroup){ ExprContext *econtext = aggstate->tmpcontext; int aggno; for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; AggrefExprState *aggrefstate = peraggstate->aggrefstate; Aggref *aggref = peraggstate->aggref; Datum newVal; bool isNull; newVal = ExecEvalExprSwitchContext(aggrefstate->target, econtext, &isNull, NULL); if (aggref->aggdistinct) { /* in DISTINCT mode, we may ignore nulls */ if (isNull) continue; tuplesort_putdatum(peraggstate->sortstate, newVal, isNull); } else { advance_transition_function(aggstate, peraggstate, pergroupstate, newVal, isNull); } }}/* * Run the transition function for a DISTINCT aggregate. This is called * after we have completed entering all the input values into the sort * object. We complete the sort, read out the values in sorted order, * and run the transition function on each non-duplicate value. * * When called, CurrentMemoryContext should be the per-query context. */static voidprocess_sorted_aggregate(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate){ Datum oldVal = (Datum) 0; bool haveOldVal = false; MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; MemoryContext oldContext; Datum newVal; bool isNull; tuplesort_performsort(peraggstate->sortstate); /* * Note: if input type is pass-by-ref, the datums returned by the sort * are freshly palloc'd in the per-query context, so we must be * careful to pfree them when they are no longer needed. */ while (tuplesort_getdatum(peraggstate->sortstate, true, &newVal, &isNull)) { /* * DISTINCT always suppresses nulls, per SQL spec, regardless of * the transition function's strictness. */ if (isNull) continue; /* * Clear and select the working context for evaluation of the * equality function and transition function. */ MemoryContextReset(workcontext); oldContext = MemoryContextSwitchTo(workcontext); if (haveOldVal && DatumGetBool(FunctionCall2(&peraggstate->equalfn, oldVal, newVal))) { /* equal to prior, so forget this one */ if (!peraggstate->inputtypeByVal) pfree(DatumGetPointer(newVal)); } else { advance_transition_function(aggstate, peraggstate, pergroupstate, newVal, false); /* forget the old value, if any */ if (haveOldVal && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); /* and remember the new one for subsequent equality checks */ oldVal = newVal; haveOldVal = true; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -