📄 psort.c
字号:
{ tup = heap_copytuple(cr_slot->val); ExecClearTuple(cr_slot); PS(node)->tupcount++; } IncrProcessed(); USEMEM(node, tup->t_len); TRACEMEM(createrun); if (lasttuple != NULL && tuplecmp(tup, lasttuple, &PS(node)->treeContext)) { if (t_free <= 0) { t_free = 1000; memtuples = repalloc(memtuples, (t_last + t_free + 1) * sizeof(HeapTuple)); } t_last++; t_free--; memtuples[t_last] = tup; } else puttuple(&PS(node)->Tuples, tup, 0, &PS(node)->treeContext); } if (lasttuple != NULL) { FREEMEM(node, lasttuple->t_len); FREE(lasttuple); TRACEMEM(createrun); } dumptuples(file, node); ENDRUN(file); /* delimit the end of the run */ t_last++; /* put tuples for the next run into leftist tree */ if (t_last >= 1) { int t; PsortTupDesc = PS(node)->treeContext.tupDesc; PsortKeys = PS(node)->treeContext.scanKeys; PsortNkeys = PS(node)->treeContext.nKeys; qsort(memtuples, t_last, sizeof(HeapTuple), (int (*) (const void *, const void *)) _psort_cmp); for (t = t_last - 1; t >= 0; t--) puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext); } pfree(memtuples); return !foundeor;}/* * mergeruns - merges all runs from input tapes * (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271) * * Returns: * file of tuples in order */static BufFile *mergeruns(Sort *node){ struct tape *tp; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); Assert(PS(node)->using_tape_files == true); tp = PS(node)->Tape + PS(node)->TapeRange; merge(node, tp); rewind(tp->tp_file); while (--PS(node)->Level != 0) { tp = tp->tp_prev; rewind(tp->tp_file); merge(node, tp); rewind(tp->tp_file); } return tp->tp_file;}/* * merge - handles a single merge of the tape * (polyphase merge Alg.D(D5)--Knuth, Vol.3, p271) */static voidmerge(Sort *node, struct tape * dest){ HeapTuple tup; struct tape *lasttp; /* (TAPE[P]) */ struct tape *tp; struct leftist *tuples; BufFile *destfile; int times; /* runs left to merge */ int outdummy; /* complete dummy runs */ short fromtape; unsigned int tuplen; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); Assert(PS(node)->using_tape_files == true); lasttp = dest->tp_prev; times = lasttp->tp_fib; for (tp = lasttp; tp != dest; tp = tp->tp_prev) tp->tp_fib -= times; tp->tp_fib += times; /* Tape[].tp_fib (A[]) is set to proper exit values */ if (PS(node)->TotalDummy < PS(node)->TapeRange) /* no complete dummy * runs */ outdummy = 0; else { outdummy = PS(node)->TotalDummy; /* a large positive number */ for (tp = lasttp; tp != dest; tp = tp->tp_prev) if (outdummy > tp->tp_dummy) outdummy = tp->tp_dummy; for (tp = lasttp; tp != dest; tp = tp->tp_prev) tp->tp_dummy -= outdummy; tp->tp_dummy += outdummy; PS(node)->TotalDummy -= outdummy * PS(node)->TapeRange; /* do not add the outdummy runs yet */ times -= outdummy; } destfile = dest->tp_file; while (times-- != 0) { /* merge one run */ tuples = NULL; if (PS(node)->TotalDummy == 0) for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) { GETLEN(tuplen, tp->tp_file); tup = ALLOCTUP(tuplen); USEMEM(node, tuplen); TRACEMEM(merge); SETTUPLEN(tup, tuplen); GETTUP(node, tup, tuplen, tp->tp_file); puttuple(&tuples, tup, tp - PS(node)->Tape, &PS(node)->treeContext); } else { for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) { if (tp->tp_dummy != 0) { tp->tp_dummy--; PS(node)->TotalDummy--; } else { GETLEN(tuplen, tp->tp_file); tup = ALLOCTUP(tuplen); USEMEM(node, tuplen); TRACEMEM(merge); SETTUPLEN(tup, tuplen); GETTUP(node, tup, tuplen, tp->tp_file); puttuple(&tuples, tup, tp - PS(node)->Tape, &PS(node)->treeContext); } } } while (tuples != NULL) { /* possible optimization by using count in tuples */ tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext); PUTTUP(node, tup, destfile); FREEMEM(node, tup->t_len); FREE(tup); TRACEMEM(merge); GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file); if (tuplen == 0) ; else { tup = ALLOCTUP(tuplen); USEMEM(node, tuplen); TRACEMEM(merge); SETTUPLEN(tup, tuplen); GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file); puttuple(&tuples, tup, fromtape, &PS(node)->treeContext); } } ENDRUN(destfile); } PS(node)->TotalDummy += outdummy;}/* * dumptuples - stores all the tuples in tree into file */static voiddumptuples(BufFile *file, Sort *node){ struct leftist *tp; struct leftist *newp; struct leftist **treep = &PS(node)->Tuples; LeftistContext context = &PS(node)->treeContext; HeapTuple tup; Assert(PS(node)->using_tape_files); tp = *treep; while (tp != NULL) { tup = tp->lt_tuple; if (tp->lt_dist == 1) /* lt_right == NULL */ newp = tp->lt_left; else newp = lmerge(tp->lt_left, tp->lt_right, context); pfree(tp); PUTTUP(node, tup, file); FREEMEM(node, tup->t_len); FREE(tup); tp = newp; } *treep = NULL;}/* * psort_grabtuple - gets a tuple from the sorted file and returns it. * If there are no tuples left, returns NULL. * Should not call psort_end unless this has returned * a NULL indicating the last tuple has been processed. */HeapTuplepsort_grabtuple(Sort *node, bool *should_free){ HeapTuple tup; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); if (PS(node)->using_tape_files == true) { unsigned int tuplen; *should_free = true; if (ScanDirectionIsForward(node->plan.state->es_direction)) { if (PS(node)->all_fetched) return NULL; if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0) { tup = ALLOCTUP(tuplen); SETTUPLEN(tup, tuplen); GETTUP(node, tup, tuplen, PS(node)->psort_grab_file); /* Update current merged sort file position */ PS(node)->psort_current += tuplen + sizeof(tlendummy); return tup; } else { PS(node)->all_fetched = true; return NULL; } } /* Backward */ if (PS(node)->psort_current <= sizeof(tlendummy)) return NULL; /* * if all tuples are fetched already then we return last tuple, * else - tuple before last returned. */ if (PS(node)->all_fetched) { /* * psort_current is pointing to the zero tuplen at the end of * file */ BufFileSeek(PS(node)->psort_grab_file, PS(node)->psort_current - sizeof(tlendummy), SEEK_SET); GETLEN(tuplen, PS(node)->psort_grab_file); if (PS(node)->psort_current < tuplen) elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan"); PS(node)->all_fetched = false; } else { /* move to position of end tlen of prev tuple */ PS(node)->psort_current -= sizeof(tlendummy); BufFileSeek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET); GETLEN(tuplen, PS(node)->psort_grab_file); /* get tlen of prev * tuple */ if (tuplen == 0) elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan"); if (PS(node)->psort_current <= tuplen + sizeof(tlendummy)) { /* prev tuple should be first one */ if (PS(node)->psort_current != tuplen) elog(ERROR, "psort_grabtuple: first tuple expected in backward scan"); PS(node)->psort_current = 0; BufFileSeek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET); return NULL; } /* * Get position of prev tuple. This tuple becomes current * tuple now and we have to return previous one. */ PS(node)->psort_current -= tuplen; /* move to position of end tlen of prev tuple */ BufFileSeek(PS(node)->psort_grab_file, PS(node)->psort_current - sizeof(tlendummy), SEEK_SET); GETLEN(tuplen, PS(node)->psort_grab_file); if (PS(node)->psort_current < tuplen + sizeof(tlendummy)) elog(ERROR, "psort_grabtuple: too big tuple len in backward scan"); } /* * move to prev (or last) tuple start position + sizeof(t_len) */ BufFileSeek(PS(node)->psort_grab_file, PS(node)->psort_current - tuplen, SEEK_SET); tup = ALLOCTUP(tuplen); SETTUPLEN(tup, tuplen); GETTUP(node, tup, tuplen, PS(node)->psort_grab_file); return tup; /* file position is equal to psort_current */ } else { *should_free = false; if (ScanDirectionIsForward(node->plan.state->es_direction)) { if (PS(node)->psort_current < PS(node)->tupcount) return PS(node)->memtuples[PS(node)->psort_current++]; else { PS(node)->all_fetched = true; return NULL; } } /* Backward */ if (PS(node)->psort_current <= 0) return NULL; /* * if all tuples are fetched already then we return last tuple, * else - tuple before last returned. */ if (PS(node)->all_fetched) PS(node)->all_fetched = false; else { PS(node)->psort_current--; /* last returned tuple */ if (PS(node)->psort_current <= 0) return NULL; } return PS(node)->memtuples[PS(node)->psort_current - 1]; }}/* * psort_markpos - saves current position in the merged sort file */voidpsort_markpos(Sort *node){ Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); PS(node)->psort_saved = PS(node)->psort_current;}/* * psort_restorepos- restores current position in merged sort file to * last saved position */voidpsort_restorepos(Sort *node){ Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); if (PS(node)->using_tape_files == true) BufFileSeek(PS(node)->psort_grab_file, PS(node)->psort_saved, SEEK_SET); PS(node)->psort_current = PS(node)->psort_saved;}/* * psort_end - unlinks the tape files, and cleans up. Should not be * called unless psort_grabtuple has returned a NULL. */voidpsort_end(Sort *node){ struct tape *tp; if (!node->cleaned) { /* * I'm changing this because if we are sorting a relation with no * tuples, psortstate is NULL. */ if (PS(node) != (Psortstate *) NULL) { if (PS(node)->using_tape_files == true) for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--) destroytape(tp->tp_file); else if (PS(node)->memtuples) pfree(PS(node)->memtuples); NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ); NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ); pfree((void *) node->psortstate); node->psortstate = NULL; node->cleaned = TRUE; } }}voidpsort_rescan(Sort *node){ /* * If subnode is to be rescanned then free our previous results */ if (((Plan *) node)->lefttree->chgParam != NULL) { psort_end(node); node->cleaned = false; } else if (PS(node) != (Psortstate *) NULL) { PS(node)->all_fetched = false; PS(node)->psort_current = 0; PS(node)->psort_saved = 0; if (PS(node)->using_tape_files == true) rewind(PS(node)->psort_grab_file); }}/* * gettape - returns an open stream for writing/reading * * Returns: * Open stream for writing/reading. * NULL if unable to open temporary file. * * There used to be a lot of cruft here to try to ensure that we destroyed * all the tape files; but it didn't really work. Now we rely on fd.c to * clean up temp files if an error occurs. */static BufFile *gettape(){ File tfile; tfile = OpenTemporaryFile(); Assert(tfile >= 0); return BufFileCreate(tfile);}/* * destroytape - unlinks the tape */static voiddestroytape(BufFile *file){ BufFileClose(file);}static int_psort_cmp(HeapTuple *ltup, HeapTuple *rtup){ Datum lattr, rattr; int nkey; int result = 0; bool isnull1, isnull2; for (nkey = 0; nkey < PsortNkeys && !result; nkey++) { lattr = heap_getattr(*ltup, PsortKeys[nkey].sk_attno, PsortTupDesc, &isnull1); rattr = heap_getattr(*rtup, PsortKeys[nkey].sk_attno, PsortTupDesc, &isnull2); if (isnull1) { if (!isnull2) result = 1; } else if (isnull2) result = -1; else if (PsortKeys[nkey].sk_flags & SK_COMMUTE) { if (!(result = -(long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (rattr, lattr))) result = (long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (lattr, rattr); } else if (!(result = -(long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (lattr, rattr))) result = (long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (rattr, lattr); } return result;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -