📄 tuplesort.c
字号:
state->status = TSS_INITIAL; state->randomAccess = randomAccess; state->allowedMem = workMem * 1024L; state->availMem = state->allowedMem; state->tapeset = NULL; state->memtupcount = 0; state->memtupsize = 1024; /* initial guess */ state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *)); state->memtupindex = NULL; /* until and unless needed */ USEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->currentRun = 0; /* Algorithm D variables will be initialized by inittapes, if needed */ state->result_tape = -1; /* flag that result tape has not been formed */ return state;}Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, Oid *sortOperators, AttrNumber *attNums, int workMem, bool randomAccess){ Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); int i; AssertArg(nkeys > 0);#ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", nkeys, workMem, randomAccess ? 't' : 'f');#endif state->comparetup = comparetup_heap; state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; state->tupDesc = tupDesc; state->nKeys = nkeys; state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData)); state->sortFnKinds = (SortFunctionKind *) palloc0(nkeys * sizeof(SortFunctionKind)); for (i = 0; i < nkeys; i++) { RegProcedure sortFunction; AssertArg(sortOperators[i] != 0); AssertArg(attNums[i] != 0); /* select a function that implements the sort operator */ SelectSortFunction(sortOperators[i], &sortFunction, &state->sortFnKinds[i]); /* * We needn't fill in sk_strategy or sk_subtype since these scankeys * will never be passed to an index. */ ScanKeyInit(&state->scanKeys[i], attNums[i], InvalidStrategy, sortFunction, (Datum) 0); } return state;}Tuplesortstate *tuplesort_begin_index(Relation indexRel, bool enforceUnique, int workMem, bool randomAccess){ Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);#ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin index sort: unique = %c, workMem = %d, randomAccess = %c", enforceUnique ? 't' : 'f', workMem, randomAccess ? 't' : 'f');#endif state->comparetup = comparetup_index; state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; state->indexRel = indexRel; /* see comments below about btree dependence of this code... */ state->indexScanKey = _bt_mkscankey_nodata(indexRel); state->enforceUnique = enforceUnique; return state;}Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, int workMem, bool randomAccess){ Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); RegProcedure sortFunction; int16 typlen; bool typbyval;#ifdef TRACE_SORT if (trace_sort) elog(LOG, "begin datum sort: workMem = %d, randomAccess = %c", workMem, randomAccess ? 't' : 'f');#endif state->comparetup = comparetup_datum; state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; state->datumType = datumType; state->sortOperator = sortOperator; /* select a function that implements the sort operator */ SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind); /* and look up the function */ fmgr_info(sortFunction, &state->sortOpFn); /* lookup necessary attributes of the datum type */ get_typlenbyval(datumType, &typlen, &typbyval); state->datumTypeLen = typlen; state->datumTypeByVal = typbyval; return state;}/* * tuplesort_end * * Release resources and clean up. */voidtuplesort_end(Tuplesortstate *state){ int i;#ifdef TRACE_SORT long spaceUsed;#endif if (state->tapeset) {#ifdef TRACE_SORT spaceUsed = LogicalTapeSetBlocks(state->tapeset);#endif LogicalTapeSetClose(state->tapeset); } else {#ifdef TRACE_SORT spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;#endif } if (state->memtuples) { for (i = 0; i < state->memtupcount; i++) pfree(state->memtuples[i]); pfree(state->memtuples); } if (state->memtupindex) pfree(state->memtupindex); /* * this stuff might better belong in a variant-specific shutdown routine */ if (state->scanKeys) pfree(state->scanKeys); if (state->sortFnKinds) pfree(state->sortFnKinds);#ifdef TRACE_SORT if (trace_sort) { if (state->tapeset) elog(LOG, "external sort ended, %ld disk blocks used: %s", spaceUsed, pg_rusage_show(&state->ru_start)); else elog(LOG, "internal sort ended, %ld KB used: %s", spaceUsed, pg_rusage_show(&state->ru_start)); }#endif pfree(state);}/* * Accept one tuple while collecting input data for sort. * * Note that the input tuple is always copied; the caller need not save it. */voidtuplesort_puttuple(Tuplesortstate *state, void *tuple){ /* * Copy the given tuple into memory we control, and decrease availMem. * Then call the code shared with the Datum case. */ tuple = COPYTUP(state, tuple); puttuple_common(state, tuple);}/* * Accept one Datum while collecting input data for sort. * * If the Datum is pass-by-ref type, the value will be copied. */voidtuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull){ DatumTuple *tuple; /* * Build pseudo-tuple carrying the datum, and decrease availMem. */ if (isNull || state->datumTypeByVal) { tuple = (DatumTuple *) palloc(sizeof(DatumTuple)); tuple->val = val; tuple->isNull = isNull; } else { Size datalen; Size tuplelen; char *newVal; datalen = datumGetSize(val, false, state->datumTypeLen); tuplelen = datalen + MAXALIGN(sizeof(DatumTuple)); tuple = (DatumTuple *) palloc(tuplelen); newVal = ((char *) tuple) + MAXALIGN(sizeof(DatumTuple)); memcpy(newVal, DatumGetPointer(val), datalen); tuple->val = PointerGetDatum(newVal); tuple->isNull = false; } USEMEM(state, GetMemoryChunkSpace(tuple)); puttuple_common(state, (void *) tuple);}/* * Shared code for tuple and datum cases. */static voidputtuple_common(Tuplesortstate *state, void *tuple){ switch (state->status) { case TSS_INITIAL: /* * Save the copied tuple into the unsorted array. */ if (state->memtupcount >= state->memtupsize) { /* Grow the unsorted array as needed. */ FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->memtupsize *= 2; state->memtuples = (void **) repalloc(state->memtuples, state->memtupsize * sizeof(void *)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); } state->memtuples[state->memtupcount++] = tuple; /* * Done if we still fit in available memory. */ if (!LACKMEM(state)) return; /* * Nope; time to switch to tape-based operation. */ inittapes(state); /* * Dump tuples until we are back under the limit. */ dumptuples(state, false); break; case TSS_BUILDRUNS: /* * Insert the copied tuple into the heap, with run number * currentRun if it can go into the current run, else run number * currentRun+1. The tuple can go into the current run if it is * >= the first not-yet-output tuple. (Actually, it could go into * the current run if it is >= the most recently output tuple ... * but that would require keeping around the tuple we last output, * and it's simplest to let writetup free each tuple as soon as * it's written.) * * Note there will always be at least one tuple in the heap at * this point; see dumptuples. */ Assert(state->memtupcount > 0); if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0) tuplesort_heap_insert(state, tuple, state->currentRun, true); else tuplesort_heap_insert(state, tuple, state->currentRun + 1, true); /* * If we are over the memory limit, dump tuples till we're under. */ dumptuples(state, false); break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * All tuples have been provided; finish the sort. */voidtuplesort_performsort(Tuplesortstate *state){#ifdef TRACE_SORT if (trace_sort) elog(LOG, "performsort starting: %s", pg_rusage_show(&state->ru_start));#endif switch (state->status) { case TSS_INITIAL: /* * We were able to accumulate all the tuples within the allowed * amount of memory. Just qsort 'em and we're done. */ if (state->memtupcount > 1) { qsort_tuplesortstate = state; qsort((void *) state->memtuples, state->memtupcount, sizeof(void *), qsort_comparetup); } state->current = 0; state->eof_reached = false; state->markpos_offset = 0; state->markpos_eof = false; state->status = TSS_SORTEDINMEM; break; case TSS_BUILDRUNS: /* * Finish tape-based sort. First, flush all tuples remaining in * memory out to tape; then merge until we have a single remaining * run (or, if !randomAccess, one run per tape). Note that * mergeruns sets the correct state->status. */ dumptuples(state, true); mergeruns(state); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; break; default: elog(ERROR, "invalid tuplesort state"); break; }#ifdef TRACE_SORT if (trace_sort) elog(LOG, "performsort done%s: %s", (state->status == TSS_FINALMERGE) ? " (except final merge)" : "", pg_rusage_show(&state->ru_start));#endif}/* * Fetch the next tuple in either forward or back direction. * Returns NULL if no more tuples. If should_free is set, the * caller must pfree the returned tuple when done with it. */void *tuplesort_gettuple(Tuplesortstate *state, bool forward, bool *should_free){ unsigned int tuplen; void *tup; switch (state->status) { case TSS_SORTEDINMEM: Assert(forward || state->randomAccess); *should_free = false; if (forward) { if (state->current < state->memtupcount) return state->memtuples[state->current++]; state->eof_reached = true; return NULL; } else { if (state->current <= 0) return NULL; /* * if all tuples are fetched already then we return last * tuple, else - tuple before last returned. */ if (state->eof_reached) state->eof_reached = false; else { state->current--; /* last returned tuple */ if (state->current <= 0) return NULL; } return state->memtuples[state->current - 1];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -