⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplesort.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 5 页
字号:
			}			break;		case TSS_SORTEDONTAPE:			Assert(forward || state->randomAccess);			*should_free = true;			if (forward)			{				if (state->eof_reached)					return NULL;				if ((tuplen = getlen(state, state->result_tape, true)) != 0)				{					tup = READTUP(state, state->result_tape, tuplen);					return tup;				}				else				{					state->eof_reached = true;					return NULL;				}			}			/*			 * Backward.			 *			 * if all tuples are fetched already then we return last tuple,			 * else - tuple before last returned.			 */			if (state->eof_reached)			{				/*				 * Seek position is pointing just past the zero tuplen at the				 * end of file; back up to fetch last tuple's ending length				 * word.  If seek fails we must have a completely empty file.				 */				if (!LogicalTapeBackspace(state->tapeset,										  state->result_tape,										  2 * sizeof(unsigned int)))					return NULL;				state->eof_reached = false;			}			else			{				/*				 * Back up and fetch previously-returned tuple's ending length				 * word.  If seek fails, assume we are at start of file.				 */				if (!LogicalTapeBackspace(state->tapeset,										  state->result_tape,										  sizeof(unsigned int)))					return NULL;				tuplen = getlen(state, state->result_tape, false);				/*				 * Back up to get ending length word of tuple before it.				 */				if (!LogicalTapeBackspace(state->tapeset,										  state->result_tape,										  tuplen + 2 * sizeof(unsigned int)))				{					/*					 * If that fails, presumably the prev tuple is the first					 * in the file.  Back up so that it becomes next to read					 * in forward direction (not obviously right, but that is					 * what in-memory case does).					 */					if (!LogicalTapeBackspace(state->tapeset,											  state->result_tape,											  tuplen + sizeof(unsigned int)))						elog(ERROR, "bogus tuple length in backward scan");					return NULL;				}			}			tuplen = getlen(state, state->result_tape, false);			/*			 * Now we have the length of the prior tuple, back up and read it.			 * Note: READTUP expects we are positioned after the initial			 * length word of the tuple, so back up to that point.			 */			if (!LogicalTapeBackspace(state->tapeset,									  state->result_tape,									  tuplen))				elog(ERROR, "bogus tuple length in backward scan");			tup = READTUP(state, state->result_tape, tuplen);			return tup;		case TSS_FINALMERGE:			Assert(forward);			*should_free = true;			/*			 * This code should match the inner loop of mergeonerun().			 */			if (state->memtupcount > 0)			{				int			srcTape = state->memtupindex[0];				Size		tuplen;				int			tupIndex;				void	   *newtup;				tup = state->memtuples[0];				/* returned tuple is no longer counted in our memory space */				tuplen = GetMemoryChunkSpace(tup);				state->availMem += tuplen;				state->mergeavailmem[srcTape] += tuplen;				tuplesort_heap_siftup(state, false);				if ((tupIndex = state->mergenext[srcTape]) == 0)				{					/*					 * out of preloaded data on this tape, try to read more					 */					mergepreread(state);					/*					 * if still no data, we've reached end of run on this tape					 */					if ((tupIndex = state->mergenext[srcTape]) == 0)						return tup;				}				/* pull next preread tuple from list, insert in heap */				newtup = state->memtuples[tupIndex];				state->mergenext[srcTape] = state->memtupindex[tupIndex];				if (state->mergenext[srcTape] == 0)					state->mergelast[srcTape] = 0;				state->memtupindex[tupIndex] = state->mergefreelist;				state->mergefreelist = tupIndex;				tuplesort_heap_insert(state, newtup, srcTape, false);				return tup;			}			return NULL;		default:			elog(ERROR, "invalid tuplesort state");			return NULL;		/* keep compiler quiet */	}}/* * Fetch the next Datum in either forward or back direction. * Returns FALSE if no more datums. * * If the Datum is pass-by-ref type, the returned value is freshly palloc'd * and is now owned by the caller. */booltuplesort_getdatum(Tuplesortstate *state, bool forward,				   Datum *val, bool *isNull){	DatumTuple *tuple;	bool		should_free;	tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);	if (tuple == NULL)		return false;	if (tuple->isNull || state->datumTypeByVal)	{		*val = tuple->val;		*isNull = tuple->isNull;	}	else	{		*val = datumCopy(tuple->val, false, state->datumTypeLen);		*isNull = false;	}	if (should_free)		pfree(tuple);	return true;}/* * inittapes - initialize for tape sorting. * * This is called only if we have found we don't have room to sort in memory. */static voidinittapes(Tuplesortstate *state){	int			ntuples,				j;#ifdef TRACE_SORT	if (trace_sort)		elog(LOG, "switching to external sort: %s",			 pg_rusage_show(&state->ru_start));#endif	state->tapeset = LogicalTapeSetCreate(MAXTAPES);	/*	 * Allocate the memtupindex array, same size as memtuples.	 */	state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));	USEMEM(state, GetMemoryChunkSpace(state->memtupindex));	/*	 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is	 * marked as belonging to run number zero.	 *	 * NOTE: we pass false for checkIndex since there's no point in comparing	 * indexes in this step, even though we do intend the indexes to be part	 * of the sort key...	 */	ntuples = state->memtupcount;	state->memtupcount = 0;		/* make the heap empty */	for (j = 0; j < ntuples; j++)		tuplesort_heap_insert(state, state->memtuples[j], 0, false);	Assert(state->memtupcount == ntuples);	state->currentRun = 0;	/*	 * Initialize variables of Algorithm D (step D1).	 */	for (j = 0; j < MAXTAPES; j++)	{		state->tp_fib[j] = 1;		state->tp_runs[j] = 0;		state->tp_dummy[j] = 1;		state->tp_tapenum[j] = j;	}	state->tp_fib[TAPERANGE] = 0;	state->tp_dummy[TAPERANGE] = 0;	state->Level = 1;	state->destTape = 0;	state->status = TSS_BUILDRUNS;}/* * selectnewtape -- select new tape for new initial run. * * This is called after finishing a run when we know another run * must be started.  This implements steps D3, D4 of Algorithm D. */static voidselectnewtape(Tuplesortstate *state){	int			j;	int			a;	/* Step D3: advance j (destTape) */	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])	{		state->destTape++;		return;	}	if (state->tp_dummy[state->destTape] != 0)	{		state->destTape = 0;		return;	}	/* Step D4: increase level */	state->Level++;	a = state->tp_fib[0];	for (j = 0; j < TAPERANGE; j++)	{		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];		state->tp_fib[j] = a + state->tp_fib[j + 1];	}	state->destTape = 0;}/* * mergeruns -- merge all the completed initial runs. * * This implements steps D5, D6 of Algorithm D.  All input data has * already been written to initial runs on tape (see dumptuples). */static voidmergeruns(Tuplesortstate *state){	int			tapenum,				svTape,				svRuns,				svDummy;	Assert(state->status == TSS_BUILDRUNS);	Assert(state->memtupcount == 0);	/*	 * If we produced only one initial run (quite likely if the total data	 * volume is between 1X and 2X workMem), we can just use that tape as the	 * finished output, rather than doing a useless merge.	 */	if (state->currentRun == 1)	{		state->result_tape = state->tp_tapenum[state->destTape];		/* must freeze and rewind the finished output tape */		LogicalTapeFreeze(state->tapeset, state->result_tape);		state->status = TSS_SORTEDONTAPE;		return;	}	/* End of step D2: rewind all output tapes to prepare for merging */	for (tapenum = 0; tapenum < TAPERANGE; tapenum++)		LogicalTapeRewind(state->tapeset, tapenum, false);	for (;;)	{		/* Step D5: merge runs onto tape[T] until tape[P] is empty */		while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])		{			bool		allDummy = true;			bool		allOneRun = true;			for (tapenum = 0; tapenum < TAPERANGE; tapenum++)			{				if (state->tp_dummy[tapenum] == 0)					allDummy = false;				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)					allOneRun = false;			}			/*			 * If we don't have to produce a materialized sorted tape, quit as			 * soon as we're down to one real/dummy run per tape.			 */			if (!state->randomAccess && allOneRun)			{				Assert(!allDummy);				/* Initialize for the final merge pass */				beginmerge(state);				state->status = TSS_FINALMERGE;				return;			}			if (allDummy)			{				state->tp_dummy[TAPERANGE]++;				for (tapenum = 0; tapenum < TAPERANGE; tapenum++)					state->tp_dummy[tapenum]--;			}			else				mergeonerun(state);		}		/* Step D6: decrease level */		if (--state->Level == 0)			break;		/* rewind output tape T to use as new input */		LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],						  false);		/* rewind used-up input tape P, and prepare it for write pass */		LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],						  true);		state->tp_runs[TAPERANGE - 1] = 0;		/*		 * reassign tape units per step D6; note we no longer care about A[]		 */		svTape = state->tp_tapenum[TAPERANGE];		svDummy = state->tp_dummy[TAPERANGE];		svRuns = state->tp_runs[TAPERANGE];		for (tapenum = TAPERANGE; tapenum > 0; tapenum--)		{			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];		}		state->tp_tapenum[0] = svTape;		state->tp_dummy[0] = svDummy;		state->tp_runs[0] = svRuns;	}	/*	 * Done.  Knuth says that the result is on TAPE[1], but since we exited	 * the loop without performing the last iteration of step D6, we have not	 * rearranged the tape unit assignment, and therefore the result is on	 * TAPE[T].  We need to do it this way so that we can freeze the final	 * output tape while rewinding it.	The last iteration of step D6 would be	 * a waste of cycles anyway...	 */	state->result_tape = state->tp_tapenum[TAPERANGE];	LogicalTapeFreeze(state->tapeset, state->result_tape);	state->status = TSS_SORTEDONTAPE;}/* * Merge one run from each input tape, except ones with dummy runs. * * This is the inner loop of Algorithm D step D5.  We know that the * output tape is TAPE[T]. */static voidmergeonerun(Tuplesortstate *state){	int			destTape = state->tp_tapenum[TAPERANGE];	int			srcTape;	int			tupIndex;	void	   *tup;	long		priorAvail,				spaceFreed;	/*	 * Start the merge by loading one tuple from each active source tape into	 * the heap.  We can also decrease the input run/dummy run counts.	 */	beginmerge(state);	/*	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it	 * out, and replacing it with next tuple from same tape (if there is	 * another one).	 */	while (state->memtupcount > 0)	{		CHECK_FOR_INTERRUPTS();		/* write the tuple to destTape */		priorAvail = state->availMem;		srcTape = state->memtupindex[0];		WRITETUP(state, destTape, state->memtuples[0]);		/* writetup adjusted total free space, now fix per-tape space */		spaceFreed = state->availMem - priorAvail;		state->mergeavailmem[srcTape] += spaceFreed;		/* compact the heap */		tuplesort_heap_siftup(state, false);		if ((tupIndex = state->mergenext[srcTape]) == 0)		{			/* out of preloaded data on this tape, try to read more */			mergepreread(state);			/* if still no data, we've reached end of run on this tape */			if ((tupIndex = state->mergenext[srcTape]) == 0)				continue;		}		/* pull next preread tuple from list, insert in heap */		tup = state->memtuples[tupIndex];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -