tuplesort.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,177 行 · 第 1/5 页

C
2,177
字号
				elog(ERROR, "bogus tuple length in backward scan");			tup = READTUP(state, state->result_tape, tuplen);			return tup;		case TSS_FINALMERGE:			Assert(forward);			*should_free = true;			/*			 * This code should match the inner loop of mergeonerun().			 */			if (state->memtupcount > 0)			{				int			srcTape = state->memtupindex[0];				Size		tuplen;				int			tupIndex;				void	   *newtup;				tup = state->memtuples[0];				/* returned tuple is no longer counted in our memory space */				tuplen = GetMemoryChunkSpace(tup);				state->availMem += tuplen;				state->mergeavailmem[srcTape] += tuplen;				tuplesort_heap_siftup(state, false);				if ((tupIndex = state->mergenext[srcTape]) == 0)				{					/*					 * out of preloaded data on this tape, try to read					 * more					 */					mergepreread(state);					/*					 * if still no data, we've reached end of run on this					 * tape					 */					if ((tupIndex = state->mergenext[srcTape]) == 0)						return tup;				}				/* pull next preread tuple from list, insert in heap */				newtup = state->memtuples[tupIndex];				state->mergenext[srcTape] = state->memtupindex[tupIndex];				if (state->mergenext[srcTape] == 0)					state->mergelast[srcTape] = 0;				state->memtupindex[tupIndex] = state->mergefreelist;				state->mergefreelist = tupIndex;				tuplesort_heap_insert(state, newtup, srcTape, false);				return tup;			}			return NULL;		default:			elog(ERROR, "invalid tuplesort state");			return NULL;		/* keep compiler quiet */	}}/* * Fetch the next Datum in either forward or back direction. * Returns FALSE if no more datums. * * If the Datum is pass-by-ref type, the returned value is freshly palloc'd * and is now owned by the caller. */booltuplesort_getdatum(Tuplesortstate *state, bool forward,				   Datum *val, bool *isNull){	DatumTuple *tuple;	bool		should_free;	tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);	if (tuple == NULL)		return false;	if (tuple->isNull || state->datumTypeByVal)	{		*val = tuple->val;		*isNull = tuple->isNull;	}	else	{		*val = datumCopy(tuple->val, false, state->datumTypeLen);		*isNull = false;	}	if (should_free)		pfree(tuple);	return true;}/* * inittapes - initialize for tape sorting. * * This is called only if we have found we don't have room to sort in memory. */static voidinittapes(Tuplesortstate *state){	int			ntuples,				j;	state->tapeset = LogicalTapeSetCreate(MAXTAPES);	/*	 * Allocate the memtupindex array, same size as memtuples.	 */	state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));	USEMEM(state, GetMemoryChunkSpace(state->memtupindex));	/*	 * Convert the unsorted contents of memtuples[] into a heap. Each	 * tuple is marked as belonging to run number zero.	 *	 * NOTE: we pass false for checkIndex since there's no point in comparing	 * indexes in this step, even though we do intend the indexes to be	 * part of the sort key...	 */	ntuples = state->memtupcount;	state->memtupcount = 0;		/* make the heap empty */	for (j = 0; j < ntuples; j++)		tuplesort_heap_insert(state, state->memtuples[j], 0, false);	Assert(state->memtupcount == ntuples);	state->currentRun = 0;	/*	 * Initialize variables of Algorithm D (step D1).	 */	for (j = 0; j < MAXTAPES; j++)	{		state->tp_fib[j] = 1;		state->tp_runs[j] = 0;		state->tp_dummy[j] = 1;		state->tp_tapenum[j] = j;	}	state->tp_fib[TAPERANGE] = 0;	state->tp_dummy[TAPERANGE] = 0;	state->Level = 1;	state->destTape = 0;	state->status = TSS_BUILDRUNS;}/* * selectnewtape -- select new tape for new initial run. * * This is called after finishing a run when we know another run * must be started.  This implements steps D3, D4 of Algorithm D. */static voidselectnewtape(Tuplesortstate *state){	int			j;	int			a;	/* Step D3: advance j (destTape) */	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])	{		state->destTape++;		return;	}	if (state->tp_dummy[state->destTape] != 0)	{		state->destTape = 0;		return;	}	/* Step D4: increase level */	state->Level++;	a = state->tp_fib[0];	for (j = 0; j < TAPERANGE; j++)	{		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];		state->tp_fib[j] = a + state->tp_fib[j + 1];	}	state->destTape = 0;}/* * mergeruns -- merge all the completed initial runs. * * This implements steps D5, D6 of Algorithm D.  All input data has * already been written to initial runs on tape (see dumptuples). */static voidmergeruns(Tuplesortstate *state){	int			tapenum,				svTape,				svRuns,				svDummy;	Assert(state->status == TSS_BUILDRUNS);	Assert(state->memtupcount == 0);	/*	 * If we produced only one initial run (quite likely if the total data	 * volume is between 1X and 2X SortMem), we can just use that tape as	 * the finished output, rather than doing a useless merge.	 */	if (state->currentRun == 1)	{		state->result_tape = state->tp_tapenum[state->destTape];		/* must freeze and rewind the finished output tape */		LogicalTapeFreeze(state->tapeset, state->result_tape);		state->status = TSS_SORTEDONTAPE;		return;	}	/* End of step D2: rewind all output tapes to prepare for merging */	for (tapenum = 0; tapenum < TAPERANGE; tapenum++)		LogicalTapeRewind(state->tapeset, tapenum, false);	for (;;)	{		/* Step D5: merge runs onto tape[T] until tape[P] is empty */		while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])		{			bool		allDummy = true;			bool		allOneRun = true;			for (tapenum = 0; tapenum < TAPERANGE; tapenum++)			{				if (state->tp_dummy[tapenum] == 0)					allDummy = false;				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)					allOneRun = false;			}			/*			 * If we don't have to produce a materialized sorted tape,			 * quit as soon as we're down to one real/dummy run per tape.			 */			if (!state->randomAccess && allOneRun)			{				Assert(!allDummy);				/* Initialize for the final merge pass */				beginmerge(state);				state->status = TSS_FINALMERGE;				return;			}			if (allDummy)			{				state->tp_dummy[TAPERANGE]++;				for (tapenum = 0; tapenum < TAPERANGE; tapenum++)					state->tp_dummy[tapenum]--;			}			else				mergeonerun(state);		}		/* Step D6: decrease level */		if (--state->Level == 0)			break;		/* rewind output tape T to use as new input */		LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],						  false);		/* rewind used-up input tape P, and prepare it for write pass */		LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],						  true);		state->tp_runs[TAPERANGE - 1] = 0;		/*		 * reassign tape units per step D6; note we no longer care about		 * A[]		 */		svTape = state->tp_tapenum[TAPERANGE];		svDummy = state->tp_dummy[TAPERANGE];		svRuns = state->tp_runs[TAPERANGE];		for (tapenum = TAPERANGE; tapenum > 0; tapenum--)		{			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];		}		state->tp_tapenum[0] = svTape;		state->tp_dummy[0] = svDummy;		state->tp_runs[0] = svRuns;	}	/*	 * Done.  Knuth says that the result is on TAPE[1], but since we	 * exited the loop without performing the last iteration of step D6,	 * we have not rearranged the tape unit assignment, and therefore the	 * result is on TAPE[T].  We need to do it this way so that we can	 * freeze the final output tape while rewinding it.  The last	 * iteration of step D6 would be a waste of cycles anyway...	 */	state->result_tape = state->tp_tapenum[TAPERANGE];	LogicalTapeFreeze(state->tapeset, state->result_tape);	state->status = TSS_SORTEDONTAPE;}/* * Merge one run from each input tape, except ones with dummy runs. * * This is the inner loop of Algorithm D step D5.  We know that the * output tape is TAPE[T]. */static voidmergeonerun(Tuplesortstate *state){	int			destTape = state->tp_tapenum[TAPERANGE];	int			srcTape;	int			tupIndex;	void	   *tup;	long		priorAvail,				spaceFreed;	/*	 * Start the merge by loading one tuple from each active source tape	 * into the heap.  We can also decrease the input run/dummy run	 * counts.	 */	beginmerge(state);	/*	 * Execute merge by repeatedly extracting lowest tuple in heap,	 * writing it out, and replacing it with next tuple from same tape (if	 * there is another one).	 */	while (state->memtupcount > 0)	{		CHECK_FOR_INTERRUPTS();		/* write the tuple to destTape */		priorAvail = state->availMem;		srcTape = state->memtupindex[0];		WRITETUP(state, destTape, state->memtuples[0]);		/* writetup adjusted total free space, now fix per-tape space */		spaceFreed = state->availMem - priorAvail;		state->mergeavailmem[srcTape] += spaceFreed;		/* compact the heap */		tuplesort_heap_siftup(state, false);		if ((tupIndex = state->mergenext[srcTape]) == 0)		{			/* out of preloaded data on this tape, try to read more */			mergepreread(state);			/* if still no data, we've reached end of run on this tape */			if ((tupIndex = state->mergenext[srcTape]) == 0)				continue;		}		/* pull next preread tuple from list, insert in heap */		tup = state->memtuples[tupIndex];		state->mergenext[srcTape] = state->memtupindex[tupIndex];		if (state->mergenext[srcTape] == 0)			state->mergelast[srcTape] = 0;		state->memtupindex[tupIndex] = state->mergefreelist;		state->mergefreelist = tupIndex;		tuplesort_heap_insert(state, tup, srcTape, false);	}	/*	 * When the heap empties, we're done.  Write an end-of-run marker on	 * the output tape, and increment its count of real runs.	 */	markrunend(state, destTape);	state->tp_runs[TAPERANGE]++;}/* * beginmerge - initialize for a merge pass * * We decrease the counts of real and dummy runs for each tape, and mark * which tapes contain active input runs in mergeactive[].	Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. */static voidbeginmerge(Tuplesortstate *state){	int			activeTapes;	int			tapenum;	int			srcTape;	/* Heap should be empty here */	Assert(state->memtupcount == 0);	/* Clear merge-pass state variables */	memset(state->mergeactive, 0, sizeof(state->mergeactive));	memset(state->mergenext, 0, sizeof(state->mergenext));	memset(state->mergelast, 0, sizeof(state->mergelast));	memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));	state->mergefreelist = 0;	/* nothing in the freelist */	state->mergefirstfree = MAXTAPES;	/* first slot available for										 * preread */	/* Adjust run counts and mark the active tapes */	activeTapes = 0;	for (tapenum = 0; tapenum < TAPERANGE; tapenum++)	{		if (state->tp_dummy[tapenum] > 0)			state->tp_dummy[tapenum]--;		else		{			Assert(state->tp_runs[tapenum] > 0);			state->tp_runs[tapenum]--;			srcTape = state->tp_tapenum[tapenum];			state->mergeactive[srcTape] = true;			activeTapes++;		}	}	/*	 * Initialize space allocation to let each active input tape have an	 * equal share of preread space.	 */	Assert(activeTapes > 0);	state->spacePerTape = state->availMem / activeTapes;	for (srcTape = 0; srcTape < MAXTAPES; srcTape++)	{		if (state->mergeactive[srcTape])			state->mergeavailmem[srcTape] = state->spacePerTape;	}	/*	 * Preread as many tuples as possible (and at least one) from each	 * active tape	 */	mergepreread(state);	/* Load the merge heap with the first tuple from each input tape */	for (srcTape = 0; srcTape < MAXTAPES; srcTape++)	{		int			tupIndex = state->mergenext[srcTape];		void	   *tup;		if (tupIndex)		{			tup = state->memtuples[tupIndex];			state->mergenext[srcTape] = state->memtupindex[tupIndex];			if (state->mergenext[srcTape] == 0)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?