📄 tuplesort.c
字号:
} break; case TSS_SORTEDONTAPE: Assert(forward || state->randomAccess); *should_free = true; if (forward) { if (state->eof_reached) return NULL; if ((tuplen = getlen(state, state->result_tape, true)) != 0) { tup = READTUP(state, state->result_tape, tuplen); return tup; } else { state->eof_reached = true; return NULL; } } /* * Backward. * * if all tuples are fetched already then we return last tuple, * else - tuple before last returned. */ if (state->eof_reached) { /* * Seek position is pointing just past the zero tuplen at the * end of file; back up to fetch last tuple's ending length * word. If seek fails we must have a completely empty file. */ if (!LogicalTapeBackspace(state->tapeset, state->result_tape, 2 * sizeof(unsigned int))) return NULL; state->eof_reached = false; } else { /* * Back up and fetch previously-returned tuple's ending length * word. If seek fails, assume we are at start of file. */ if (!LogicalTapeBackspace(state->tapeset, state->result_tape, sizeof(unsigned int))) return NULL; tuplen = getlen(state, state->result_tape, false); /* * Back up to get ending length word of tuple before it. */ if (!LogicalTapeBackspace(state->tapeset, state->result_tape, tuplen + 2 * sizeof(unsigned int))) { /* * If that fails, presumably the prev tuple is the first * in the file. Back up so that it becomes next to read * in forward direction (not obviously right, but that is * what in-memory case does). */ if (!LogicalTapeBackspace(state->tapeset, state->result_tape, tuplen + sizeof(unsigned int))) elog(ERROR, "bogus tuple length in backward scan"); return NULL; } } tuplen = getlen(state, state->result_tape, false); /* * Now we have the length of the prior tuple, back up and read it. * Note: READTUP expects we are positioned after the initial * length word of the tuple, so back up to that point. */ if (!LogicalTapeBackspace(state->tapeset, state->result_tape, tuplen)) elog(ERROR, "bogus tuple length in backward scan"); tup = READTUP(state, state->result_tape, tuplen); return tup; case TSS_FINALMERGE: Assert(forward); *should_free = true; /* * This code should match the inner loop of mergeonerun(). */ if (state->memtupcount > 0) { int srcTape = state->memtupindex[0]; Size tuplen; int tupIndex; void *newtup; tup = state->memtuples[0]; /* returned tuple is no longer counted in our memory space */ tuplen = GetMemoryChunkSpace(tup); state->availMem += tuplen; state->mergeavailmem[srcTape] += tuplen; tuplesort_heap_siftup(state, false); if ((tupIndex = state->mergenext[srcTape]) == 0) { /* * out of preloaded data on this tape, try to read more */ mergepreread(state); /* * if still no data, we've reached end of run on this tape */ if ((tupIndex = state->mergenext[srcTape]) == 0) return tup; } /* pull next preread tuple from list, insert in heap */ newtup = state->memtuples[tupIndex]; state->mergenext[srcTape] = state->memtupindex[tupIndex]; if (state->mergenext[srcTape] == 0) state->mergelast[srcTape] = 0; state->memtupindex[tupIndex] = state->mergefreelist; state->mergefreelist = tupIndex; tuplesort_heap_insert(state, newtup, srcTape, false); return tup; } return NULL; default: elog(ERROR, "invalid tuplesort state"); return NULL; /* keep compiler quiet */ }}/* * Fetch the next Datum in either forward or back direction. * Returns FALSE if no more datums. * * If the Datum is pass-by-ref type, the returned value is freshly palloc'd * and is now owned by the caller. */booltuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull){ DatumTuple *tuple; bool should_free; tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free); if (tuple == NULL) return false; if (tuple->isNull || state->datumTypeByVal) { *val = tuple->val; *isNull = tuple->isNull; } else { *val = datumCopy(tuple->val, false, state->datumTypeLen); *isNull = false; } if (should_free) pfree(tuple); return true;}/* * inittapes - initialize for tape sorting. * * This is called only if we have found we don't have room to sort in memory. */static voidinittapes(Tuplesortstate *state){ int ntuples, j;#ifdef TRACE_SORT if (trace_sort) elog(LOG, "switching to external sort: %s", pg_rusage_show(&state->ru_start));#endif state->tapeset = LogicalTapeSetCreate(MAXTAPES); /* * Allocate the memtupindex array, same size as memtuples. */ state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int)); USEMEM(state, GetMemoryChunkSpace(state->memtupindex)); /* * Convert the unsorted contents of memtuples[] into a heap. Each tuple is * marked as belonging to run number zero. * * NOTE: we pass false for checkIndex since there's no point in comparing * indexes in this step, even though we do intend the indexes to be part * of the sort key... */ ntuples = state->memtupcount; state->memtupcount = 0; /* make the heap empty */ for (j = 0; j < ntuples; j++) tuplesort_heap_insert(state, state->memtuples[j], 0, false); Assert(state->memtupcount == ntuples); state->currentRun = 0; /* * Initialize variables of Algorithm D (step D1). */ for (j = 0; j < MAXTAPES; j++) { state->tp_fib[j] = 1; state->tp_runs[j] = 0; state->tp_dummy[j] = 1; state->tp_tapenum[j] = j; } state->tp_fib[TAPERANGE] = 0; state->tp_dummy[TAPERANGE] = 0; state->Level = 1; state->destTape = 0; state->status = TSS_BUILDRUNS;}/* * selectnewtape -- select new tape for new initial run. * * This is called after finishing a run when we know another run * must be started. This implements steps D3, D4 of Algorithm D. */static voidselectnewtape(Tuplesortstate *state){ int j; int a; /* Step D3: advance j (destTape) */ if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) { state->destTape++; return; } if (state->tp_dummy[state->destTape] != 0) { state->destTape = 0; return; } /* Step D4: increase level */ state->Level++; a = state->tp_fib[0]; for (j = 0; j < TAPERANGE; j++) { state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; state->tp_fib[j] = a + state->tp_fib[j + 1]; } state->destTape = 0;}/* * mergeruns -- merge all the completed initial runs. * * This implements steps D5, D6 of Algorithm D. All input data has * already been written to initial runs on tape (see dumptuples). */static voidmergeruns(Tuplesortstate *state){ int tapenum, svTape, svRuns, svDummy; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); /* * If we produced only one initial run (quite likely if the total data * volume is between 1X and 2X workMem), we can just use that tape as the * finished output, rather than doing a useless merge. */ if (state->currentRun == 1) { state->result_tape = state->tp_tapenum[state->destTape]; /* must freeze and rewind the finished output tape */ LogicalTapeFreeze(state->tapeset, state->result_tape); state->status = TSS_SORTEDONTAPE; return; } /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < TAPERANGE; tapenum++) LogicalTapeRewind(state->tapeset, tapenum, false); for (;;) { /* Step D5: merge runs onto tape[T] until tape[P] is empty */ while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1]) { bool allDummy = true; bool allOneRun = true; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) { if (state->tp_dummy[tapenum] == 0) allDummy = false; if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) allOneRun = false; } /* * If we don't have to produce a materialized sorted tape, quit as * soon as we're down to one real/dummy run per tape. */ if (!state->randomAccess && allOneRun) { Assert(!allDummy); /* Initialize for the final merge pass */ beginmerge(state); state->status = TSS_FINALMERGE; return; } if (allDummy) { state->tp_dummy[TAPERANGE]++; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) state->tp_dummy[tapenum]--; } else mergeonerun(state); } /* Step D6: decrease level */ if (--state->Level == 0) break; /* rewind output tape T to use as new input */ LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE], false); /* rewind used-up input tape P, and prepare it for write pass */ LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1], true); state->tp_runs[TAPERANGE - 1] = 0; /* * reassign tape units per step D6; note we no longer care about A[] */ svTape = state->tp_tapenum[TAPERANGE]; svDummy = state->tp_dummy[TAPERANGE]; svRuns = state->tp_runs[TAPERANGE]; for (tapenum = TAPERANGE; tapenum > 0; tapenum--) { state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; } state->tp_tapenum[0] = svTape; state->tp_dummy[0] = svDummy; state->tp_runs[0] = svRuns; } /* * Done. Knuth says that the result is on TAPE[1], but since we exited * the loop without performing the last iteration of step D6, we have not * rearranged the tape unit assignment, and therefore the result is on * TAPE[T]. We need to do it this way so that we can freeze the final * output tape while rewinding it. The last iteration of step D6 would be * a waste of cycles anyway... */ state->result_tape = state->tp_tapenum[TAPERANGE]; LogicalTapeFreeze(state->tapeset, state->result_tape); state->status = TSS_SORTEDONTAPE;}/* * Merge one run from each input tape, except ones with dummy runs. * * This is the inner loop of Algorithm D step D5. We know that the * output tape is TAPE[T]. */static voidmergeonerun(Tuplesortstate *state){ int destTape = state->tp_tapenum[TAPERANGE]; int srcTape; int tupIndex; void *tup; long priorAvail, spaceFreed; /* * Start the merge by loading one tuple from each active source tape into * the heap. We can also decrease the input run/dummy run counts. */ beginmerge(state); /* * Execute merge by repeatedly extracting lowest tuple in heap, writing it * out, and replacing it with next tuple from same tape (if there is * another one). */ while (state->memtupcount > 0) { CHECK_FOR_INTERRUPTS(); /* write the tuple to destTape */ priorAvail = state->availMem; srcTape = state->memtupindex[0]; WRITETUP(state, destTape, state->memtuples[0]); /* writetup adjusted total free space, now fix per-tape space */ spaceFreed = state->availMem - priorAvail; state->mergeavailmem[srcTape] += spaceFreed; /* compact the heap */ tuplesort_heap_siftup(state, false); if ((tupIndex = state->mergenext[srcTape]) == 0) { /* out of preloaded data on this tape, try to read more */ mergepreread(state); /* if still no data, we've reached end of run on this tape */ if ((tupIndex = state->mergenext[srcTape]) == 0) continue; } /* pull next preread tuple from list, insert in heap */ tup = state->memtuples[tupIndex];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -