tuplesort.c
来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,177 行 · 第 1/5 页
C
2,177 行
state->mergelast[srcTape] = 0; state->memtupindex[tupIndex] = state->mergefreelist; state->mergefreelist = tupIndex; tuplesort_heap_insert(state, tup, srcTape, false); } }}/* * mergepreread - load tuples from merge input tapes * * This routine exists to improve sequentiality of reads during a merge pass, * as explained in the header comments of this file. Load tuples from each * active source tape until the tape's run is exhausted or it has used up * its fair share of available memory. In any case, we guarantee that there * is at one preread tuple available from each unexhausted input tape. */static voidmergepreread(Tuplesortstate *state){ int srcTape; unsigned int tuplen; void *tup; int tupIndex; long priorAvail, spaceUsed; for (srcTape = 0; srcTape < MAXTAPES; srcTape++) { if (!state->mergeactive[srcTape]) continue; /* * Skip reading from any tape that still has at least half of its * target memory filled with tuples (threshold fraction may need * adjustment?). This avoids reading just a few tuples when the * incoming runs are not being consumed evenly. */ if (state->mergenext[srcTape] != 0 && state->mergeavailmem[srcTape] <= state->spacePerTape / 2) continue; /* * Read tuples from this tape until it has used up its free * memory, but ensure that we have at least one. */ priorAvail = state->availMem; state->availMem = state->mergeavailmem[srcTape]; while (!LACKMEM(state) || state->mergenext[srcTape] == 0) { /* read next tuple, if any */ if ((tuplen = getlen(state, srcTape, true)) == 0) { state->mergeactive[srcTape] = false; break; } tup = READTUP(state, srcTape, tuplen); /* find or make a free slot in memtuples[] for it */ tupIndex = state->mergefreelist; if (tupIndex) state->mergefreelist = state->memtupindex[tupIndex]; else { tupIndex = state->mergefirstfree++; /* Might need to enlarge arrays! */ if (tupIndex >= state->memtupsize) { FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); FREEMEM(state, GetMemoryChunkSpace(state->memtupindex)); state->memtupsize *= 2; state->memtuples = (void **) repalloc(state->memtuples, state->memtupsize * sizeof(void *)); state->memtupindex = (int *) repalloc(state->memtupindex, state->memtupsize * sizeof(int)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); USEMEM(state, GetMemoryChunkSpace(state->memtupindex)); } } /* store tuple, append to list for its tape */ state->memtuples[tupIndex] = tup; state->memtupindex[tupIndex] = 0; if (state->mergelast[srcTape]) state->memtupindex[state->mergelast[srcTape]] = tupIndex; else state->mergenext[srcTape] = tupIndex; state->mergelast[srcTape] = tupIndex; } /* update per-tape and global availmem counts */ spaceUsed = state->mergeavailmem[srcTape] - state->availMem; state->mergeavailmem[srcTape] = state->availMem; state->availMem = priorAvail - spaceUsed; }}/* * dumptuples - remove tuples from heap and write to tape * * This is used during initial-run building, but not during merging. * * When alltuples = false, dump only enough tuples to get under the * availMem limit (and leave at least one tuple in the heap in any case, * since puttuple assumes it always has a tuple to compare to). * * When alltuples = true, dump everything currently in memory. * (This case is only used at end of input data.) * * If we empty the heap, close out the current run and return (this should * only happen at end of input data). If we see that the tuple run number * at the top of the heap has changed, start a new run. */static voiddumptuples(Tuplesortstate *state, bool alltuples){ while (alltuples || (LACKMEM(state) && state->memtupcount > 1)) { /* * Dump the heap's frontmost entry, and sift up to remove it from * the heap. */ Assert(state->memtupcount > 0); WRITETUP(state, state->tp_tapenum[state->destTape], state->memtuples[0]); tuplesort_heap_siftup(state, true); /* * If the heap is empty *or* top run number has changed, we've * finished the current run. */ if (state->memtupcount == 0 || state->currentRun != state->memtupindex[0]) { markrunend(state, state->tp_tapenum[state->destTape]); state->currentRun++; state->tp_runs[state->destTape]++; state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ /* * Done if heap is empty, else prepare for new run. */ if (state->memtupcount == 0) break; Assert(state->currentRun == state->memtupindex[0]); selectnewtape(state); } }}/* * tuplesort_rescan - rewind and replay the scan */voidtuplesort_rescan(Tuplesortstate *state){ Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->current = 0; state->eof_reached = false; state->markpos_offset = 0; state->markpos_eof = false; break; case TSS_SORTEDONTAPE: LogicalTapeRewind(state->tapeset, state->result_tape, false); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; state->markpos_eof = false; break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * tuplesort_markpos - saves current position in the merged sort file */voidtuplesort_markpos(Tuplesortstate *state){ Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->markpos_offset = state->current; state->markpos_eof = state->eof_reached; break; case TSS_SORTEDONTAPE: LogicalTapeTell(state->tapeset, state->result_tape, &state->markpos_block, &state->markpos_offset); state->markpos_eof = state->eof_reached; break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * tuplesort_restorepos - restores current position in merged sort file to * last saved position */voidtuplesort_restorepos(Tuplesortstate *state){ Assert(state->randomAccess); switch (state->status) { case TSS_SORTEDINMEM: state->current = state->markpos_offset; state->eof_reached = state->markpos_eof; break; case TSS_SORTEDONTAPE: if (!LogicalTapeSeek(state->tapeset, state->result_tape, state->markpos_block, state->markpos_offset)) elog(ERROR, "tuplesort_restorepos failed"); state->eof_reached = state->markpos_eof; break; default: elog(ERROR, "invalid tuplesort state"); break; }}/* * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. * * The heap lives in state->memtuples[], with parallel data storage * for indexes in state->memtupindex[]. If checkIndex is true, use * the tuple index as the front of the sort key; otherwise, no. */#define HEAPCOMPARE(tup1,index1,tup2,index2) \ (checkIndex && (index1 != index2) ? index1 - index2 : \ COMPARETUP(state, tup1, tup2))/* * Insert a new tuple into an empty or existing heap, maintaining the * heap invariant. */static voidtuplesort_heap_insert(Tuplesortstate *state, void *tuple, int tupleindex, bool checkIndex){ void **memtuples; int *memtupindex; int j; /* * Make sure memtuples[] can handle another entry. */ if (state->memtupcount >= state->memtupsize) { FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); FREEMEM(state, GetMemoryChunkSpace(state->memtupindex)); state->memtupsize *= 2; state->memtuples = (void **) repalloc(state->memtuples, state->memtupsize * sizeof(void *)); state->memtupindex = (int *) repalloc(state->memtupindex, state->memtupsize * sizeof(int)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); USEMEM(state, GetMemoryChunkSpace(state->memtupindex)); } memtuples = state->memtuples; memtupindex = state->memtupindex; /* * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth * is using 1-based array indexes, not 0-based. */ j = state->memtupcount++; while (j > 0) { int i = (j - 1) >> 1; if (HEAPCOMPARE(tuple, tupleindex, memtuples[i], memtupindex[i]) >= 0) break; memtuples[j] = memtuples[i]; memtupindex[j] = memtupindex[i]; j = i; } memtuples[j] = tuple; memtupindex[j] = tupleindex;}/* * The tuple at state->memtuples[0] has been removed from the heap. * Decrement memtupcount, and sift up to maintain the heap invariant. */static voidtuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex){ void **memtuples = state->memtuples; int *memtupindex = state->memtupindex; void *tuple; int tupindex, i, n; if (--state->memtupcount <= 0) return; n = state->memtupcount; tuple = memtuples[n]; /* tuple that must be reinserted */ tupindex = memtupindex[n]; i = 0; /* i is where the "hole" is */ for (;;) { int j = 2 * i + 1; if (j >= n) break; if (j + 1 < n && HEAPCOMPARE(memtuples[j], memtupindex[j], memtuples[j + 1], memtupindex[j + 1]) > 0) j++; if (HEAPCOMPARE(tuple, tupindex, memtuples[j], memtupindex[j]) <= 0) break; memtuples[i] = memtuples[j]; memtupindex[i] = memtupindex[j]; i = j; } memtuples[i] = tuple; memtupindex[i] = tupindex;}/* * Tape interface routines */static unsigned intgetlen(Tuplesortstate *state, int tapenum, bool eofOK){ unsigned int len; if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len, sizeof(len)) != sizeof(len)) elog(ERROR, "unexpected end of tape"); if (len == 0 && !eofOK) elog(ERROR, "unexpected end of data"); return len;}static voidmarkrunend(Tuplesortstate *state, int tapenum){ unsigned int len = 0; LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));}/* * qsort interface */static intqsort_comparetup(const void *a, const void *b){ /* The passed pointers are pointers to void * ... */ return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);}/* * This routine selects an appropriate sorting function to implement * a sort operator as efficiently as possible. The straightforward * method is to use the operator's implementation proc --- ie, "<" * comparison. However, that way often requires two calls of the function * per comparison. If we can find a btree three-way comparator function * associated with the operator, we can use it to do the comparisons * more efficiently. We also support the possibility that the operator * is ">" (descending sort), in which case we have to reverse the output * of the btree comparator. * * Possibly this should live somewhere else (backend/catalog/, maybe?). */voidSelectSortFunction(Oid sortOperator, RegProcedure *sortFunction, SortFunctionKind *kind){ CatCList *catlist; int i; HeapTuple tuple; Form_pg_operator optup; Oid opclass = InvalidOid; /* * Search pg_amop to see if the target operator is registered as the * "<" or ">" operator of any btree opclass. It's possible that it * might be registered both ways (eg, if someone were to build a * "reverse sort" opclass for some reason); prefer the "<" case if so. * If the operator is registered the same way in multiple opclasses, * assume we can use the associated comparator function from any one. */ catlist = SearchSysCacheList(AMOPOPID, 1, ObjectIdGetDatum(sortOperator), 0, 0, 0); for (i = 0; i < catlist->n_members; i++) { Form_pg_amop aform; tuple = &catlist->members[i]->tuple; aform = (Form_pg_amop) GETSTRUCT(tuple); if (!opclass_is_btree(aform->amopclaid)) continue; if (aform->amopstrategy == BTLessStrategyNumber) { opclass = aform->amopclaid; *kind = SORTFUNC_CMP; break; /* done looking */ } else if (aform->amopstrategy == BTGreaterStrategyNumber) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?