tuplesort.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,177 行 · 第 1/5 页
C
2,177 行
elog(ERROR, "bogus tuple length in backward scan"); tup = READTUP(state, state->result_tape, tuplen); return tup; case TSS_FINALMERGE: Assert(forward); *should_free = true; /* * This code should match the inner loop of mergeonerun(). */ if (state->memtupcount > 0) { int srcTape = state->memtupindex[0]; Size tuplen; int tupIndex; void *newtup; tup = state->memtuples[0]; /* returned tuple is no longer counted in our memory space */ tuplen = GetMemoryChunkSpace(tup); state->availMem += tuplen; state->mergeavailmem[srcTape] += tuplen; tuplesort_heap_siftup(state, false); if ((tupIndex = state->mergenext[srcTape]) == 0) { /* * out of preloaded data on this tape, try to read * more */ mergepreread(state); /* * if still no data, we've reached end of run on this * tape */ if ((tupIndex = state->mergenext[srcTape]) == 0) return tup; } /* pull next preread tuple from list, insert in heap */ newtup = state->memtuples[tupIndex]; state->mergenext[srcTape] = state->memtupindex[tupIndex]; if (state->mergenext[srcTape] == 0) state->mergelast[srcTape] = 0; state->memtupindex[tupIndex] = state->mergefreelist; state->mergefreelist = tupIndex; tuplesort_heap_insert(state, newtup, srcTape, false); return tup; } return NULL; default: elog(ERROR, "invalid tuplesort state"); return NULL; /* keep compiler quiet */ }}/* * Fetch the next Datum in either forward or back direction. * Returns FALSE if no more datums. * * If the Datum is pass-by-ref type, the returned value is freshly palloc'd * and is now owned by the caller. */booltuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull){ DatumTuple *tuple; bool should_free; tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free); if (tuple == NULL) return false; if (tuple->isNull || state->datumTypeByVal) { *val = tuple->val; *isNull = tuple->isNull; } else { *val = datumCopy(tuple->val, false, state->datumTypeLen); *isNull = false; } if (should_free) pfree(tuple); return true;}/* * inittapes - initialize for tape sorting. * * This is called only if we have found we don't have room to sort in memory. */static voidinittapes(Tuplesortstate *state){ int ntuples, j; state->tapeset = LogicalTapeSetCreate(MAXTAPES); /* * Allocate the memtupindex array, same size as memtuples. */ state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int)); USEMEM(state, GetMemoryChunkSpace(state->memtupindex)); /* * Convert the unsorted contents of memtuples[] into a heap. Each * tuple is marked as belonging to run number zero. * * NOTE: we pass false for checkIndex since there's no point in comparing * indexes in this step, even though we do intend the indexes to be * part of the sort key... */ ntuples = state->memtupcount; state->memtupcount = 0; /* make the heap empty */ for (j = 0; j < ntuples; j++) tuplesort_heap_insert(state, state->memtuples[j], 0, false); Assert(state->memtupcount == ntuples); state->currentRun = 0; /* * Initialize variables of Algorithm D (step D1). */ for (j = 0; j < MAXTAPES; j++) { state->tp_fib[j] = 1; state->tp_runs[j] = 0; state->tp_dummy[j] = 1; state->tp_tapenum[j] = j; } state->tp_fib[TAPERANGE] = 0; state->tp_dummy[TAPERANGE] = 0; state->Level = 1; state->destTape = 0; state->status = TSS_BUILDRUNS;}/* * selectnewtape -- select new tape for new initial run. * * This is called after finishing a run when we know another run * must be started. This implements steps D3, D4 of Algorithm D. */static voidselectnewtape(Tuplesortstate *state){ int j; int a; /* Step D3: advance j (destTape) */ if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) { state->destTape++; return; } if (state->tp_dummy[state->destTape] != 0) { state->destTape = 0; return; } /* Step D4: increase level */ state->Level++; a = state->tp_fib[0]; for (j = 0; j < TAPERANGE; j++) { state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; state->tp_fib[j] = a + state->tp_fib[j + 1]; } state->destTape = 0;}/* * mergeruns -- merge all the completed initial runs. * * This implements steps D5, D6 of Algorithm D. All input data has * already been written to initial runs on tape (see dumptuples). */static voidmergeruns(Tuplesortstate *state){ int tapenum, svTape, svRuns, svDummy; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); /* * If we produced only one initial run (quite likely if the total data * volume is between 1X and 2X SortMem), we can just use that tape as * the finished output, rather than doing a useless merge. */ if (state->currentRun == 1) { state->result_tape = state->tp_tapenum[state->destTape]; /* must freeze and rewind the finished output tape */ LogicalTapeFreeze(state->tapeset, state->result_tape); state->status = TSS_SORTEDONTAPE; return; } /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < TAPERANGE; tapenum++) LogicalTapeRewind(state->tapeset, tapenum, false); for (;;) { /* Step D5: merge runs onto tape[T] until tape[P] is empty */ while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1]) { bool allDummy = true; bool allOneRun = true; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) { if (state->tp_dummy[tapenum] == 0) allDummy = false; if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) allOneRun = false; } /* * If we don't have to produce a materialized sorted tape, * quit as soon as we're down to one real/dummy run per tape. */ if (!state->randomAccess && allOneRun) { Assert(!allDummy); /* Initialize for the final merge pass */ beginmerge(state); state->status = TSS_FINALMERGE; return; } if (allDummy) { state->tp_dummy[TAPERANGE]++; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) state->tp_dummy[tapenum]--; } else mergeonerun(state); } /* Step D6: decrease level */ if (--state->Level == 0) break; /* rewind output tape T to use as new input */ LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE], false); /* rewind used-up input tape P, and prepare it for write pass */ LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1], true); state->tp_runs[TAPERANGE - 1] = 0; /* * reassign tape units per step D6; note we no longer care about * A[] */ svTape = state->tp_tapenum[TAPERANGE]; svDummy = state->tp_dummy[TAPERANGE]; svRuns = state->tp_runs[TAPERANGE]; for (tapenum = TAPERANGE; tapenum > 0; tapenum--) { state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; } state->tp_tapenum[0] = svTape; state->tp_dummy[0] = svDummy; state->tp_runs[0] = svRuns; } /* * Done. Knuth says that the result is on TAPE[1], but since we * exited the loop without performing the last iteration of step D6, * we have not rearranged the tape unit assignment, and therefore the * result is on TAPE[T]. We need to do it this way so that we can * freeze the final output tape while rewinding it. The last * iteration of step D6 would be a waste of cycles anyway... */ state->result_tape = state->tp_tapenum[TAPERANGE]; LogicalTapeFreeze(state->tapeset, state->result_tape); state->status = TSS_SORTEDONTAPE;}/* * Merge one run from each input tape, except ones with dummy runs. * * This is the inner loop of Algorithm D step D5. We know that the * output tape is TAPE[T]. */static voidmergeonerun(Tuplesortstate *state){ int destTape = state->tp_tapenum[TAPERANGE]; int srcTape; int tupIndex; void *tup; long priorAvail, spaceFreed; /* * Start the merge by loading one tuple from each active source tape * into the heap. We can also decrease the input run/dummy run * counts. */ beginmerge(state); /* * Execute merge by repeatedly extracting lowest tuple in heap, * writing it out, and replacing it with next tuple from same tape (if * there is another one). */ while (state->memtupcount > 0) { CHECK_FOR_INTERRUPTS(); /* write the tuple to destTape */ priorAvail = state->availMem; srcTape = state->memtupindex[0]; WRITETUP(state, destTape, state->memtuples[0]); /* writetup adjusted total free space, now fix per-tape space */ spaceFreed = state->availMem - priorAvail; state->mergeavailmem[srcTape] += spaceFreed; /* compact the heap */ tuplesort_heap_siftup(state, false); if ((tupIndex = state->mergenext[srcTape]) == 0) { /* out of preloaded data on this tape, try to read more */ mergepreread(state); /* if still no data, we've reached end of run on this tape */ if ((tupIndex = state->mergenext[srcTape]) == 0) continue; } /* pull next preread tuple from list, insert in heap */ tup = state->memtuples[tupIndex]; state->mergenext[srcTape] = state->memtupindex[tupIndex]; if (state->mergenext[srcTape] == 0) state->mergelast[srcTape] = 0; state->memtupindex[tupIndex] = state->mergefreelist; state->mergefreelist = tupIndex; tuplesort_heap_insert(state, tup, srcTape, false); } /* * When the heap empties, we're done. Write an end-of-run marker on * the output tape, and increment its count of real runs. */ markrunend(state, destTape); state->tp_runs[TAPERANGE]++;}/* * beginmerge - initialize for a merge pass * * We decrease the counts of real and dummy runs for each tape, and mark * which tapes contain active input runs in mergeactive[]. Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. */static voidbeginmerge(Tuplesortstate *state){ int activeTapes; int tapenum; int srcTape; /* Heap should be empty here */ Assert(state->memtupcount == 0); /* Clear merge-pass state variables */ memset(state->mergeactive, 0, sizeof(state->mergeactive)); memset(state->mergenext, 0, sizeof(state->mergenext)); memset(state->mergelast, 0, sizeof(state->mergelast)); memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem)); state->mergefreelist = 0; /* nothing in the freelist */ state->mergefirstfree = MAXTAPES; /* first slot available for * preread */ /* Adjust run counts and mark the active tapes */ activeTapes = 0; for (tapenum = 0; tapenum < TAPERANGE; tapenum++) { if (state->tp_dummy[tapenum] > 0) state->tp_dummy[tapenum]--; else { Assert(state->tp_runs[tapenum] > 0); state->tp_runs[tapenum]--; srcTape = state->tp_tapenum[tapenum]; state->mergeactive[srcTape] = true; activeTapes++; } } /* * Initialize space allocation to let each active input tape have an * equal share of preread space. */ Assert(activeTapes > 0); state->spacePerTape = state->availMem / activeTapes; for (srcTape = 0; srcTape < MAXTAPES; srcTape++) { if (state->mergeactive[srcTape]) state->mergeavailmem[srcTape] = state->spacePerTape; } /* * Preread as many tuples as possible (and at least one) from each * active tape */ mergepreread(state); /* Load the merge heap with the first tuple from each input tape */ for (srcTape = 0; srcTape < MAXTAPES; srcTape++) { int tupIndex = state->mergenext[srcTape]; void *tup; if (tupIndex) { tup = state->memtuples[tupIndex]; state->mergenext[srcTape] = state->memtupindex[tupIndex]; if (state->mergenext[srcTape] == 0)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?