📄 tuplesort.c
字号:
state->mergenext[srcTape] = state->memtupindex[tupIndex]; if (state->mergenext[srcTape] == 0) state->mergelast[srcTape] = 0; state->memtupindex[tupIndex] = state->mergefreelist; state->mergefreelist = tupIndex; tuplesort_heap_insert(state, tup, srcTape, false); } /* * When the heap empties, we're done. Write an end-of-run marker on the * output tape, and increment its count of real runs. */ markrunend(state, destTape); state->tp_runs[TAPERANGE]++;#ifdef TRACE_SORT if (trace_sort) elog(LOG, "finished merge step: %s", pg_rusage_show(&state->ru_start));#endif}/* * beginmerge - initialize for a merge pass * * We decrease the counts of real and dummy runs for each tape, and mark * which tapes contain active input runs in mergeactive[]. Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. */static voidbeginmerge(Tuplesortstate *state){ int activeTapes; int tapenum; int srcTape; /* Heap should be empty here */ Assert(state->memtupcount == 0); /* Clear merge-pass state variables */ memset(state->mergeactive, 0, sizeof(state->mergeactive)); memset(state->mergenext, 0, sizeof(state->mergenext)); memset(state->mergelast, 0, sizeof(state->mergelast)); memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem)); state->mergefreelist = 0; /* nothing in the freelist */ state->mergefirstfree = MAXTAPES; /* first slot available for preread */ /* Adjust run counts and mark the active tapes */ activeTapes = 0; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) { if (state->tp_dummy[tapenum] > 0) state->tp_dummy[tapenum]--; else { Assert(state->tp_runs[tapenum] > 0); state->tp_runs[tapenum]--; srcTape = state->tp_tapenum[tapenum]; state->mergeactive[srcTape] = true; activeTapes++; } } /* * Initialize space allocation to let each active input tape have an equal * share of preread space. */ Assert(activeTapes > 0); state->spacePerTape = state->availMem / activeTapes; for (srcTape = 0; srcTape < MAXTAPES; srcTape++) { if (state->mergeactive[srcTape]) state->mergeavailmem[srcTape] = state->spacePerTape; } /* * Preread as many tuples as possible (and at least one) from each active * tape */ mergepreread(state); /* Load the merge heap with the first tuple from each input tape */ for (srcTape = 0; srcTape < MAXTAPES; srcTape++) { int tupIndex = state->mergenext[srcTape]; void *tup; if (tupIndex) { tup = state->memtuples[tupIndex]; state->mergenext[srcTape] = state->memtupindex[tupIndex]; if (state->mergenext[srcTape] == 0) state->mergelast[srcTape] = 0; state->memtupindex[tupIndex] = state->mergefreelist; state->mergefreelist = tupIndex; tuplesort_heap_insert(state, tup, srcTape, false); } }}/* * mergepreread - load tuples from merge input tapes * * This routine exists to improve sequentiality of reads during a merge pass, * as explained in the header comments of this file. Load tuples from each * active source tape until the tape's run is exhausted or it has used up * its fair share of available memory. In any case, we guarantee that there * is at least one preread tuple available from each unexhausted input tape. */static voidmergepreread(Tuplesortstate *state){ int srcTape; unsigned int tuplen; void *tup; int tupIndex; long priorAvail, spaceUsed; for (srcTape = 0; srcTape < MAXTAPES; srcTape++) { if (!state->mergeactive[srcTape]) continue; /* * Skip reading from any tape that still has at least half of its * target memory filled with tuples (threshold fraction may need * adjustment?). This avoids reading just a few tuples when the * incoming runs are not being consumed evenly. */ if (state->mergenext[srcTape] != 0 && state->mergeavailmem[srcTape] <= state->spacePerTape / 2) continue; /* * Read tuples from this tape until it has used up its free memory, * but ensure that we have at least one. */ priorAvail = state->availMem; state->availMem = state->mergeavailmem[srcTape]; while (!LACKMEM(state) || state->mergenext[srcTape] == 0) { /* read next tuple, if any */ if ((tuplen = getlen(state, srcTape, true)) == 0) { state->mergeactive[srcTape] = false; break; } tup = READTUP(state, srcTape, tuplen); /* find or make a free slot in memtuples[] for it */ tupIndex = state->mergefreelist; if (tupIndex) state->mergefreelist = state->memtupindex[tupIndex]; else { tupIndex = state->mergefirstfree++; /* Might need to enlarge arrays! */ if (tupIndex >= state->memtupsize) { FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); FREEMEM(state, GetMemoryChunkSpace(state->memtupindex)); state->memtupsize *= 2; state->memtuples = (void **) repalloc(state->memtuples, state->memtupsize * sizeof(void *)); state->memtupindex = (int *) repalloc(state->memtupindex, state->memtupsize * sizeof(int)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); USEMEM(state, GetMemoryChunkSpace(state->memtupindex)); } } /* store tuple, append to list for its tape */ state->memtuples[tupIndex] = tup; state->memtupindex[tupIndex] = 0; if (state->mergelast[srcTape]) state->memtupindex[state->mergelast[srcTape]] = tupIndex; else state->mergenext[srcTape] = tupIndex; state->mergelast[srcTape] = tupIndex; } /* update per-tape and global availmem counts */ spaceUsed = state->mergeavailmem[srcTape] - state->availMem; state->mergeavailmem[srcTape] = state->availMem; state->availMem = priorAvail - spaceUsed; }}/* * dumptuples - remove tuples from heap and write to tape * * This is used during initial-run building, but not during merging. * * When alltuples = false, dump only enough tuples to get under the * availMem limit (and leave at least one tuple in the heap in any case, * since puttuple assumes it always has a tuple to compare to). * * When alltuples = true, dump everything currently in memory. * (This case is only used at end of input data.) * * If we empty the heap, close out the current run and return (this should * only happen at end of input data). If we see that the tuple run number * at the top of the heap has changed, start a new run. */static voiddumptuples(Tuplesortstate *state, bool alltuples){ while (alltuples || (LACKMEM(state) && state->memtupcount > 1)) { /* * Dump the heap's frontmost entry, and sift up to remove it from the * heap. */ Assert(state->memtupcount > 0); WRITETUP(state, state->tp_tapenum[state->destTape], state->memtuples[0]); tuplesort_heap_siftup(state, true); /* * If the heap is empty *or* top run number has changed, we've * finished the current run. */ if (state->memtupcount == 0 || state->currentRun != state->memtupindex[0]) { markrunend(state, state->tp_tapenum[state->destTape]); state->currentRun++; state->tp_runs[state->destTape]++; state->tp_dummy[state->destTape]--; /* per Alg D step D2 */#ifdef TRACE_SORT if (trace_sort) elog(LOG, "finished writing%s run %d: %s", (state->memtupcount == 0) ? " final" : "", state->currentRun, pg_rusage_show(&state->ru_start));#endif /* * Done if heap is empty, else prepare for new run. */ if (state->memtupcount == 0) break; Assert(state->currentRun == state->memtupindex[0]); selectnewtape(state); } }}/* * tuplesort_rescan - rewind and replay the scan */voidtuplesort_rescan(Tuplesortstate *state){ Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->current = 0; state->eof_reached = false; state->markpos_offset = 0; state->markpos_eof = false; break; case TSS_SORTEDONTAPE: LogicalTapeRewind(state->tapeset, state->result_tape, false); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * tuplesort_markpos - saves current position in the merged sort file */voidtuplesort_markpos(Tuplesortstate *state){ Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->markpos_offset = state->current; state->markpos_eof = state->eof_reached; break; case TSS_SORTEDONTAPE: LogicalTapeTell(state->tapeset, state->result_tape, &state->markpos_block, &state->markpos_offset); state->markpos_eof = state->eof_reached; break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * tuplesort_restorepos - restores current position in merged sort file to * last saved position */voidtuplesort_restorepos(Tuplesortstate *state){ Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->current = state->markpos_offset; state->eof_reached = state->markpos_eof; break; case TSS_SORTEDONTAPE: if (!LogicalTapeSeek(state->tapeset, state->result_tape, state->markpos_block, state->markpos_offset)) elog(ERROR, "tuplesort_restorepos failed"); state->eof_reached = state->markpos_eof; break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. * * The heap lives in state->memtuples[], with parallel data storage * for indexes in state->memtupindex[]. If checkIndex is true, use * the tuple index as the front of the sort key; otherwise, no. */#define HEAPCOMPARE(tup1,index1,tup2,index2) \ (checkIndex && (index1 != index2) ? (index1) - (index2) : \ COMPARETUP(state, tup1, tup2))/* * Insert a new tuple into an empty or existing heap, maintaining the * heap invariant. */static voidtuplesort_heap_insert(Tuplesortstate *state, void *tuple, int tupleindex, bool checkIndex){ void **memtuples; int *memtupindex; int j; /* * Make sure memtuples[] can handle another entry. */ if (state->memtupcount >= state->memtupsize) { FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); FREEMEM(state, GetMemoryChunkSpace(state->memtupindex)); state->memtupsize *= 2; state->memtuples = (void **) repalloc(state->memtuples, state->memtupsize * sizeof(void *)); state->memtupindex = (int *) repalloc(state->memtupindex, state->memtupsize * sizeof(int)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); USEMEM(state, GetMemoryChunkSpace(state->memtupindex)); } memtuples = state->memtuples; memtupindex = state->memtupindex; /* * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is * using 1-based array indexes, not 0-based. */ j = state->memtupcount++; while (j > 0) { int i = (j - 1) >> 1; if (HEAPCOMPARE(tuple, tupleindex, memtuples[i], memtupindex[i]) >= 0) break; memtuples[j] = memtuples[i]; memtupindex[j] = memtupindex[i]; j = i; } memtuples[j] = tuple; memtupindex[j] = tupleindex;}/* * The tuple at state->memtuples[0] has been removed from the heap. * Decrement memtupcount, and sift up to maintain the heap invariant. */static voidtuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex){ void **memtuples = state->memtuples; int *memtupindex = state->memtupindex; void *tuple; int tupindex, i, n; if (--state->memtupcount <= 0) return; n = state->memtupcount; tuple = memtuples[n]; /* tuple that must be reinserted */ tupindex = memtupindex[n]; i = 0; /* i is where the "hole" is */ for (;;) { int j = 2 * i + 1; if (j >= n) break; if (j + 1 < n && HEAPCOMPARE(memtuples[j], memtupindex[j], memtuples[j + 1], memtupindex[j + 1]) > 0) j++; if (HEAPCOMPARE(tuple, tupindex,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -