📄 psort.c
字号:
/* * psort.c * Polyphase merge sort. * * Copyright (c) 1994, Regents of the University of California * * $Id: psort.c,v 1.53.2.1 1999/08/02 05:25:18 scrappy Exp $ * * NOTES * Sorts the first relation into the second relation. * * The old psort.c's routines formed a temporary relation from the merged * sort files. This version keeps the files around instead of generating the * relation from them, and provides interface functions to the file so that * you can grab tuples, mark a position in the file, restore a position in the * file. You must now explicitly call an interface function to end the sort, * psort_end, when you are done. * Now most of the global variables are stuck in the Sort nodes, and * accessed from there (they are passed to all the psort routines) so that * each sort running has its own separate state. This is facilitated by having * the Sort nodes passed in to all the interface functions. * The one global variable that all the sorts still share is SortMemory. * You should now be allowed to run two or more psorts concurrently, * so long as the memory they eat up is not greater than SORTMEM, the initial * value of SortMemory. -Rex 2.15.1995 * * Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future. * * Arguments? Variables? * MAXMERGE, MAXTAPES * */#include <math.h>#include <sys/types.h>#include <unistd.h>#include "postgres.h"#include "access/heapam.h"#include "executor/execdebug.h"#include "executor/executor.h"#include "miscadmin.h"#include "utils/psort.h"static bool createfirstrun(Sort *node);static bool createrun(Sort *node, BufFile *file);static void destroytape(BufFile *file);static void dumptuples(BufFile *file, Sort *node);static BufFile *gettape(void);static void initialrun(Sort *node);static void inittapes(Sort *node);static void merge(Sort *node, struct tape * dest);static BufFile *mergeruns(Sort *node);static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);/* * tlenzero used to delimit runs; both vars below must have * the same size as HeapTuple->t_len */static unsigned int tlenzero = 0;static unsigned int tlendummy;/* these are used by _psort_cmp, and are set just before calling qsort() */static TupleDesc PsortTupDesc;static ScanKey PsortKeys;static int PsortNkeys;/* * old psort global variables * * (These are the global variables from the old psort. They are still used, * but are now accessed from Sort nodes using the PS macro. Note that while * these variables will be accessed by PS(node)->whatever, they will still * be called by their original names within the comments! -Rex 2.10.1995) * * LeftistContextData treeContext; * * static int TapeRange; number of tapes - 1 (T) * static int Level; (l) * static int TotalDummy; summation of tp_dummy * static struct tape *Tape; * * static int BytesRead; to keep track of # of IO * static int BytesWritten; * * struct leftist *Tuples; current tuples in memory * * BufFile *psort_grab_file; this holds tuples grabbed * from merged sort runs * long psort_current; current file position * long psort_saved; file position saved for * mark and restore *//* * PS - Macro to access and cast psortstate from a Sort node */#define PS(N) ((Psortstate *)N->psortstate)/* * psort_begin - polyphase merge sort entry point. Sorts the subplan * into a temporary file psort_grab_file. After * this is called, calling the interface function * psort_grabtuple iteratively will get you the sorted * tuples. psort_end then finishes the sort off, after * all the tuples have been grabbed. * * Allocates and initializes sort node's psort state. */boolpsort_begin(Sort *node, int nkeys, ScanKey key){ node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate)); AssertArg(nkeys >= 1); AssertArg(key[0].sk_attno != 0); AssertArg(key[0].sk_procedure != 0); PS(node)->BytesRead = 0; PS(node)->BytesWritten = 0; PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node)); PS(node)->treeContext.nKeys = nkeys; PS(node)->treeContext.scanKeys = key; PS(node)->treeContext.sortMem = SortMem * 1024; PS(node)->Tuples = NULL; PS(node)->tupcount = 0; PS(node)->using_tape_files = false; PS(node)->all_fetched = false; PS(node)->psort_grab_file = NULL; PS(node)->memtuples = NULL; initialrun(node); if (PS(node)->tupcount == 0) return false; if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL) PS(node)->psort_grab_file = mergeruns(node); PS(node)->psort_current = 0; PS(node)->psort_saved = 0; return true;}/* * inittapes - initializes the tapes * - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270) * Returns: * number of allocated tapes */static voidinittapes(Sort *node){ int i; struct tape *tp; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); /* * ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid * number of tapes to initialize.\n"); */ tp = PS(node)->Tape; for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++) { tp->tp_dummy = 1; tp->tp_fib = 1; tp->tp_prev = tp - 1; tp++; } PS(node)->TapeRange = --tp - PS(node)->Tape; tp->tp_dummy = 0; tp->tp_fib = 0; PS(node)->Tape[0].tp_prev = tp; if (PS(node)->TapeRange <= 1) elog(ERROR, "inittapes: Could only allocate %d < 3 tapes\n", PS(node)->TapeRange + 1); PS(node)->Level = 1; PS(node)->TotalDummy = PS(node)->TapeRange; PS(node)->using_tape_files = true;}/* * PUTTUP - writes the next tuple * ENDRUN - mark end of run * GETLEN - reads the length of the next tuple * ALLOCTUP - returns space for the new tuple * SETTUPLEN - stores the length into the tuple * GETTUP - reads the tuple * * Note: * LEN field must be as HeapTuple->t_len; FP is a stream */#define PUTTUP(NODE, TUP, FP) \( \ (TUP)->t_len += HEAPTUPLESIZE, \ ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \ BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \ BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \ (TUP)->t_len -= HEAPTUPLESIZE \)#define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))#define FREE(x) pfree((char *) x)#define GETTUP(NODE, TUP, LEN, FP) \( \ IncrProcessed(), \ ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \ BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \ (TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \ BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \)#define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)#define rewind(FP) BufFileSeek(FP, 0L, SEEK_SET) /* * USEMEM - record use of memory FREEMEM - record * freeing of memory FULLMEM - 1 iff a tuple will fit */#define USEMEM(NODE,AMT) PS(node)->treeContext.sortMem -= (AMT)#define FREEMEM(NODE,AMT) PS(node)->treeContext.sortMem += (AMT)#define LACKMEM(NODE) (PS(node)->treeContext.sortMem <= BLCKSZ) /* not accurate */#define TRACEMEM(FUNC)#define TRACEOUT(FUNC, TUP)/* * initialrun - distributes tuples from the relation * - (replacement selection(R2-R3)--Knuth, Vol.3, p.257) * - (polyphase merge Alg.D(D2-D4)--Knuth, Vol.3, p.271) * * Explanation: * Tuples are distributed to the tapes as in Algorithm D. * A "tuple" with t_size == 0 is used to mark the end of a run. * * Note: * The replacement selection algorithm has been modified * to go from R1 directly to R3 skipping R2 the first time. * * Maybe should use closer(rdesc) before return * Perhaps should adjust the number of tapes if less than n. * used--v. likely to have problems in mergeruns(). * Must know if should open/close files before each * call to psort()? If should--messy?? * * Possible optimization: * put the first xxx runs in quickly--problem here since * I (perhaps prematurely) combined the 2 algorithms. * Also, perhaps allocate tapes when needed. Split into 2 funcs. */static voidinitialrun(Sort *node){ /* struct tuple *tup; */ struct tape *tp; int baseruns; /* D:(a) */ int extrapasses; /* EOF */ Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); tp = PS(node)->Tape; if (createfirstrun(node)) { Assert(PS(node)->using_tape_files); extrapasses = 0; } else/* all tuples fetched */ { if (!PS(node)->using_tape_files) /* empty or sorted in * memory */ return; /* * if PS(node)->Tuples == NULL then we have single (sorted) run * which can be used as result grab file! So, we may avoid * mergeruns - it will just copy this run to new file. */ if (PS(node)->Tuples == NULL) { PS(node)->psort_grab_file = PS(node)->Tape->tp_file; rewind(PS(node)->psort_grab_file); return; } extrapasses = 2; } for (;;) { tp->tp_dummy--; PS(node)->TotalDummy--; if (tp->tp_dummy < (tp + 1)->tp_dummy) tp++; else { if (tp->tp_dummy != 0) tp = PS(node)->Tape; else { PS(node)->Level++; baseruns = PS(node)->Tape[0].tp_fib; for (tp = PS(node)->Tape; tp - PS(node)->Tape < PS(node)->TapeRange; tp++) { PS(node)->TotalDummy += (tp->tp_dummy = baseruns + (tp + 1)->tp_fib - tp->tp_fib); tp->tp_fib = baseruns + (tp + 1)->tp_fib; } tp = PS(node)->Tape; /* D4 */ } /* D3 */ } if (extrapasses) { if (--extrapasses) { dumptuples(tp->tp_file, node); ENDRUN(tp->tp_file); continue; } else break; } if ((bool) createrun(node, tp->tp_file) == false) extrapasses = 1 + (PS(node)->Tuples != NULL); /* D2 */ } for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--) rewind(tp->tp_file); /* D. */}/* * createfirstrun - tries to sort tuples in memory using qsort * until LACKMEM; if not enough memory then switches * to tape method * * Returns: * FALSE iff process through end of relation * Tuples contains the tuples for the following run upon exit */static boolcreatefirstrun(Sort *node){ HeapTuple tup; bool foundeor = false; HeapTuple *memtuples; int t_last = -1; int t_free = 1000; TupleTableSlot *cr_slot; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); Assert(!PS(node)->using_tape_files); Assert(PS(node)->memtuples == NULL); Assert(PS(node)->tupcount == 0); if (LACKMEM(node)) elog(ERROR, "psort: LACKMEM in createfirstrun"); memtuples = palloc(t_free * sizeof(HeapTuple)); for (;;) { if (LACKMEM(node)) break; /* * About to call ExecProcNode, it can mess up the state if it * eventually calls another Sort node. So must stow it away here * for the meantime. -Rex * 2.2.1995 */ cr_slot = ExecProcNode(outerPlan((Plan *) node), (Plan *) node); if (TupIsNull(cr_slot)) { foundeor = true; break; } tup = heap_copytuple(cr_slot->val); ExecClearTuple(cr_slot); IncrProcessed(); USEMEM(node, tup->t_len); TRACEMEM(createfirstrun); if (t_free <= 0) { t_free = 1000; memtuples = repalloc(memtuples, (t_last + t_free + 1) * sizeof(HeapTuple)); } t_last++; t_free--; memtuples[t_last] = tup; } if (t_last < 0) /* empty */ { Assert(foundeor); pfree(memtuples); return false; } t_last++; PS(node)->tupcount = t_last; PsortTupDesc = PS(node)->treeContext.tupDesc; PsortKeys = PS(node)->treeContext.scanKeys; PsortNkeys = PS(node)->treeContext.nKeys; qsort(memtuples, t_last, sizeof(HeapTuple), (int (*) (const void *, const void *)) _psort_cmp); if (LACKMEM(node)) /* in-memory sort is impossible */ { int t; Assert(!foundeor); inittapes(node); /* put tuples into leftist tree for createrun */ for (t = t_last - 1; t >= 0; t--) puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext); pfree(memtuples); foundeor = !createrun(node, PS(node)->Tape->tp_file); } else { Assert(foundeor); PS(node)->memtuples = memtuples; } return !foundeor;}/* * createrun - places the next run on file, grabbing the tuples by * executing the subplan passed in * * Uses: * Tuples, which should contain any tuples for this run * * Returns: * FALSE iff process through end of relation * Tuples contains the tuples for the following run upon exit */static boolcreaterun(Sort *node, BufFile *file){ HeapTuple lasttuple; HeapTuple tup; TupleTableSlot *cr_slot; HeapTuple *memtuples; int t_last = -1; int t_free = 1000; bool foundeor = false; short junk; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); Assert(PS(node)->using_tape_files); lasttuple = NULL; memtuples = palloc(t_free * sizeof(HeapTuple)); for (;;) { while (LACKMEM(node) && PS(node)->Tuples != NULL) { if (lasttuple != NULL) { FREEMEM(node, lasttuple->t_len); FREE(lasttuple); TRACEMEM(createrun); } lasttuple = gettuple(&PS(node)->Tuples, &junk, &PS(node)->treeContext); PUTTUP(node, lasttuple, file); TRACEOUT(createrun, lasttuple); } if (LACKMEM(node)) break; /* * About to call ExecProcNode, it can mess up the state if it * eventually calls another Sort node. So must stow it away here * for the meantime. -Rex * 2.2.1995 */ cr_slot = ExecProcNode(outerPlan((Plan *) node), (Plan *) node); if (TupIsNull(cr_slot)) { foundeor = true; break; } else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -