tuplesort.c

来自「PostgreSQL7.4.6 for Linux」· C语言 代码 · 共 2,177 行 · 第 1/5 页

C
2,177
字号
				state->mergelast[srcTape] = 0;			state->memtupindex[tupIndex] = state->mergefreelist;			state->mergefreelist = tupIndex;			tuplesort_heap_insert(state, tup, srcTape, false);		}	}}/* * mergepreread - load tuples from merge input tapes * * This routine exists to improve sequentiality of reads during a merge pass, * as explained in the header comments of this file.  Load tuples from each * active source tape until the tape's run is exhausted or it has used up * its fair share of available memory.	In any case, we guarantee that there * is at one preread tuple available from each unexhausted input tape. */static voidmergepreread(Tuplesortstate *state){	int			srcTape;	unsigned int tuplen;	void	   *tup;	int			tupIndex;	long		priorAvail,				spaceUsed;	for (srcTape = 0; srcTape < MAXTAPES; srcTape++)	{		if (!state->mergeactive[srcTape])			continue;		/*		 * Skip reading from any tape that still has at least half of its		 * target memory filled with tuples (threshold fraction may need		 * adjustment?).  This avoids reading just a few tuples when the		 * incoming runs are not being consumed evenly.		 */		if (state->mergenext[srcTape] != 0 &&			state->mergeavailmem[srcTape] <= state->spacePerTape / 2)			continue;		/*		 * Read tuples from this tape until it has used up its free		 * memory, but ensure that we have at least one.		 */		priorAvail = state->availMem;		state->availMem = state->mergeavailmem[srcTape];		while (!LACKMEM(state) || state->mergenext[srcTape] == 0)		{			/* read next tuple, if any */			if ((tuplen = getlen(state, srcTape, true)) == 0)			{				state->mergeactive[srcTape] = false;				break;			}			tup = READTUP(state, srcTape, tuplen);			/* find or make a free slot in memtuples[] for it */			tupIndex = state->mergefreelist;			if (tupIndex)				state->mergefreelist = state->memtupindex[tupIndex];			else			{				tupIndex = state->mergefirstfree++;				/* Might need to enlarge arrays! */				if (tupIndex >= state->memtupsize)				{					FREEMEM(state, GetMemoryChunkSpace(state->memtuples));					FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));					state->memtupsize *= 2;					state->memtuples = (void **)						repalloc(state->memtuples,								 state->memtupsize * sizeof(void *));					state->memtupindex = (int *)						repalloc(state->memtupindex,								 state->memtupsize * sizeof(int));					USEMEM(state, GetMemoryChunkSpace(state->memtuples));					USEMEM(state, GetMemoryChunkSpace(state->memtupindex));				}			}			/* store tuple, append to list for its tape */			state->memtuples[tupIndex] = tup;			state->memtupindex[tupIndex] = 0;			if (state->mergelast[srcTape])				state->memtupindex[state->mergelast[srcTape]] = tupIndex;			else				state->mergenext[srcTape] = tupIndex;			state->mergelast[srcTape] = tupIndex;		}		/* update per-tape and global availmem counts */		spaceUsed = state->mergeavailmem[srcTape] - state->availMem;		state->mergeavailmem[srcTape] = state->availMem;		state->availMem = priorAvail - spaceUsed;	}}/* * dumptuples - remove tuples from heap and write to tape * * This is used during initial-run building, but not during merging. * * When alltuples = false, dump only enough tuples to get under the * availMem limit (and leave at least one tuple in the heap in any case, * since puttuple assumes it always has a tuple to compare to). * * When alltuples = true, dump everything currently in memory. * (This case is only used at end of input data.) * * If we empty the heap, close out the current run and return (this should * only happen at end of input data).  If we see that the tuple run number * at the top of the heap has changed, start a new run. */static voiddumptuples(Tuplesortstate *state, bool alltuples){	while (alltuples ||		   (LACKMEM(state) && state->memtupcount > 1))	{		/*		 * Dump the heap's frontmost entry, and sift up to remove it from		 * the heap.		 */		Assert(state->memtupcount > 0);		WRITETUP(state, state->tp_tapenum[state->destTape],				 state->memtuples[0]);		tuplesort_heap_siftup(state, true);		/*		 * If the heap is empty *or* top run number has changed, we've		 * finished the current run.		 */		if (state->memtupcount == 0 ||			state->currentRun != state->memtupindex[0])		{			markrunend(state, state->tp_tapenum[state->destTape]);			state->currentRun++;			state->tp_runs[state->destTape]++;			state->tp_dummy[state->destTape]--; /* per Alg D step D2 */			/*			 * Done if heap is empty, else prepare for new run.			 */			if (state->memtupcount == 0)				break;			Assert(state->currentRun == state->memtupindex[0]);			selectnewtape(state);		}	}}/* * tuplesort_rescan		- rewind and replay the scan */voidtuplesort_rescan(Tuplesortstate *state){	Assert(state->randomAccess);	switch (state->status)	{		case TSS_SORTEDINMEM:			state->current = 0;			state->eof_reached = false;			state->markpos_offset = 0;			state->markpos_eof = false;			break;		case TSS_SORTEDONTAPE:			LogicalTapeRewind(state->tapeset,							  state->result_tape,							  false);			state->eof_reached = false;			state->markpos_block = 0L;			state->markpos_offset = 0;			state->markpos_eof = false;			break;		default:			elog(ERROR, "invalid tuplesort state");			break;	}}/* * tuplesort_markpos	- saves current position in the merged sort file */voidtuplesort_markpos(Tuplesortstate *state){	Assert(state->randomAccess);	switch (state->status)	{		case TSS_SORTEDINMEM:			state->markpos_offset = state->current;			state->markpos_eof = state->eof_reached;			break;		case TSS_SORTEDONTAPE:			LogicalTapeTell(state->tapeset,							state->result_tape,							&state->markpos_block,							&state->markpos_offset);			state->markpos_eof = state->eof_reached;			break;		default:			elog(ERROR, "invalid tuplesort state");			break;	}}/* * tuplesort_restorepos - restores current position in merged sort file to *						  last saved position */voidtuplesort_restorepos(Tuplesortstate *state){	Assert(state->randomAccess);	switch (state->status)	{		case TSS_SORTEDINMEM:			state->current = state->markpos_offset;			state->eof_reached = state->markpos_eof;			break;		case TSS_SORTEDONTAPE:			if (!LogicalTapeSeek(state->tapeset,								 state->result_tape,								 state->markpos_block,								 state->markpos_offset))				elog(ERROR, "tuplesort_restorepos failed");			state->eof_reached = state->markpos_eof;			break;		default:			elog(ERROR, "invalid tuplesort state");			break;	}}/* * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. * * The heap lives in state->memtuples[], with parallel data storage * for indexes in state->memtupindex[].  If checkIndex is true, use * the tuple index as the front of the sort key; otherwise, no. */#define HEAPCOMPARE(tup1,index1,tup2,index2) \	(checkIndex && (index1 != index2) ? index1 - index2 : \	 COMPARETUP(state, tup1, tup2))/* * Insert a new tuple into an empty or existing heap, maintaining the * heap invariant. */static voidtuplesort_heap_insert(Tuplesortstate *state, void *tuple,					  int tupleindex, bool checkIndex){	void	  **memtuples;	int		   *memtupindex;	int			j;	/*	 * Make sure memtuples[] can handle another entry.	 */	if (state->memtupcount >= state->memtupsize)	{		FREEMEM(state, GetMemoryChunkSpace(state->memtuples));		FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));		state->memtupsize *= 2;		state->memtuples = (void **)			repalloc(state->memtuples,					 state->memtupsize * sizeof(void *));		state->memtupindex = (int *)			repalloc(state->memtupindex,					 state->memtupsize * sizeof(int));		USEMEM(state, GetMemoryChunkSpace(state->memtuples));		USEMEM(state, GetMemoryChunkSpace(state->memtupindex));	}	memtuples = state->memtuples;	memtupindex = state->memtupindex;	/*	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth	 * is using 1-based array indexes, not 0-based.	 */	j = state->memtupcount++;	while (j > 0)	{		int			i = (j - 1) >> 1;		if (HEAPCOMPARE(tuple, tupleindex,						memtuples[i], memtupindex[i]) >= 0)			break;		memtuples[j] = memtuples[i];		memtupindex[j] = memtupindex[i];		j = i;	}	memtuples[j] = tuple;	memtupindex[j] = tupleindex;}/* * The tuple at state->memtuples[0] has been removed from the heap. * Decrement memtupcount, and sift up to maintain the heap invariant. */static voidtuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex){	void	  **memtuples = state->memtuples;	int		   *memtupindex = state->memtupindex;	void	   *tuple;	int			tupindex,				i,				n;	if (--state->memtupcount <= 0)		return;	n = state->memtupcount;	tuple = memtuples[n];		/* tuple that must be reinserted */	tupindex = memtupindex[n];	i = 0;						/* i is where the "hole" is */	for (;;)	{		int			j = 2 * i + 1;		if (j >= n)			break;		if (j + 1 < n &&			HEAPCOMPARE(memtuples[j], memtupindex[j],						memtuples[j + 1], memtupindex[j + 1]) > 0)			j++;		if (HEAPCOMPARE(tuple, tupindex,						memtuples[j], memtupindex[j]) <= 0)			break;		memtuples[i] = memtuples[j];		memtupindex[i] = memtupindex[j];		i = j;	}	memtuples[i] = tuple;	memtupindex[i] = tupindex;}/* * Tape interface routines */static unsigned intgetlen(Tuplesortstate *state, int tapenum, bool eofOK){	unsigned int len;	if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,						sizeof(len)) != sizeof(len))		elog(ERROR, "unexpected end of tape");	if (len == 0 && !eofOK)		elog(ERROR, "unexpected end of data");	return len;}static voidmarkrunend(Tuplesortstate *state, int tapenum){	unsigned int len = 0;	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));}/* * qsort interface */static intqsort_comparetup(const void *a, const void *b){	/* The passed pointers are pointers to void * ... */	return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);}/* * This routine selects an appropriate sorting function to implement * a sort operator as efficiently as possible.	The straightforward * method is to use the operator's implementation proc --- ie, "<" * comparison.	However, that way often requires two calls of the function * per comparison.	If we can find a btree three-way comparator function * associated with the operator, we can use it to do the comparisons * more efficiently.  We also support the possibility that the operator * is ">" (descending sort), in which case we have to reverse the output * of the btree comparator. * * Possibly this should live somewhere else (backend/catalog/, maybe?). */voidSelectSortFunction(Oid sortOperator,				   RegProcedure *sortFunction,				   SortFunctionKind *kind){	CatCList   *catlist;	int			i;	HeapTuple	tuple;	Form_pg_operator optup;	Oid			opclass = InvalidOid;	/*	 * Search pg_amop to see if the target operator is registered as the	 * "<" or ">" operator of any btree opclass.  It's possible that it	 * might be registered both ways (eg, if someone were to build a	 * "reverse sort" opclass for some reason); prefer the "<" case if so.	 * If the operator is registered the same way in multiple opclasses,	 * assume we can use the associated comparator function from any one.	 */	catlist = SearchSysCacheList(AMOPOPID, 1,								 ObjectIdGetDatum(sortOperator),								 0, 0, 0);	for (i = 0; i < catlist->n_members; i++)	{		Form_pg_amop aform;		tuple = &catlist->members[i]->tuple;		aform = (Form_pg_amop) GETSTRUCT(tuple);		if (!opclass_is_btree(aform->amopclaid))			continue;		if (aform->amopstrategy == BTLessStrategyNumber)		{			opclass = aform->amopclaid;			*kind = SORTFUNC_CMP;			break;				/* done looking */		}		else if (aform->amopstrategy == BTGreaterStrategyNumber)		{

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?