⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tuplesort.c

📁 PostgreSQL 8.1.4的源码 适用于Linux下的开源数据库系统
💻 C
📖 第 1 页 / 共 5 页
字号:
		state->mergenext[srcTape] = state->memtupindex[tupIndex];		if (state->mergenext[srcTape] == 0)			state->mergelast[srcTape] = 0;		state->memtupindex[tupIndex] = state->mergefreelist;		state->mergefreelist = tupIndex;		tuplesort_heap_insert(state, tup, srcTape, false);	}	/*	 * When the heap empties, we're done.  Write an end-of-run marker on the	 * output tape, and increment its count of real runs.	 */	markrunend(state, destTape);	state->tp_runs[TAPERANGE]++;#ifdef TRACE_SORT	if (trace_sort)		elog(LOG, "finished merge step: %s",			 pg_rusage_show(&state->ru_start));#endif}/* * beginmerge - initialize for a merge pass * * We decrease the counts of real and dummy runs for each tape, and mark * which tapes contain active input runs in mergeactive[].	Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. */static voidbeginmerge(Tuplesortstate *state){	int			activeTapes;	int			tapenum;	int			srcTape;	/* Heap should be empty here */	Assert(state->memtupcount == 0);	/* Clear merge-pass state variables */	memset(state->mergeactive, 0, sizeof(state->mergeactive));	memset(state->mergenext, 0, sizeof(state->mergenext));	memset(state->mergelast, 0, sizeof(state->mergelast));	memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));	state->mergefreelist = 0;	/* nothing in the freelist */	state->mergefirstfree = MAXTAPES;	/* first slot available for preread */	/* Adjust run counts and mark the active tapes */	activeTapes = 0;	for (tapenum = 0; tapenum < TAPERANGE; tapenum++)	{		if (state->tp_dummy[tapenum] > 0)			state->tp_dummy[tapenum]--;		else		{			Assert(state->tp_runs[tapenum] > 0);			state->tp_runs[tapenum]--;			srcTape = state->tp_tapenum[tapenum];			state->mergeactive[srcTape] = true;			activeTapes++;		}	}	/*	 * Initialize space allocation to let each active input tape have an equal	 * share of preread space.	 */	Assert(activeTapes > 0);	state->spacePerTape = state->availMem / activeTapes;	for (srcTape = 0; srcTape < MAXTAPES; srcTape++)	{		if (state->mergeactive[srcTape])			state->mergeavailmem[srcTape] = state->spacePerTape;	}	/*	 * Preread as many tuples as possible (and at least one) from each active	 * tape	 */	mergepreread(state);	/* Load the merge heap with the first tuple from each input tape */	for (srcTape = 0; srcTape < MAXTAPES; srcTape++)	{		int			tupIndex = state->mergenext[srcTape];		void	   *tup;		if (tupIndex)		{			tup = state->memtuples[tupIndex];			state->mergenext[srcTape] = state->memtupindex[tupIndex];			if (state->mergenext[srcTape] == 0)				state->mergelast[srcTape] = 0;			state->memtupindex[tupIndex] = state->mergefreelist;			state->mergefreelist = tupIndex;			tuplesort_heap_insert(state, tup, srcTape, false);		}	}}/* * mergepreread - load tuples from merge input tapes * * This routine exists to improve sequentiality of reads during a merge pass, * as explained in the header comments of this file.  Load tuples from each * active source tape until the tape's run is exhausted or it has used up * its fair share of available memory.	In any case, we guarantee that there * is at least one preread tuple available from each unexhausted input tape. */static voidmergepreread(Tuplesortstate *state){	int			srcTape;	unsigned int tuplen;	void	   *tup;	int			tupIndex;	long		priorAvail,				spaceUsed;	for (srcTape = 0; srcTape < MAXTAPES; srcTape++)	{		if (!state->mergeactive[srcTape])			continue;		/*		 * Skip reading from any tape that still has at least half of its		 * target memory filled with tuples (threshold fraction may need		 * adjustment?).  This avoids reading just a few tuples when the		 * incoming runs are not being consumed evenly.		 */		if (state->mergenext[srcTape] != 0 &&			state->mergeavailmem[srcTape] <= state->spacePerTape / 2)			continue;		/*		 * Read tuples from this tape until it has used up its free memory,		 * but ensure that we have at least one.		 */		priorAvail = state->availMem;		state->availMem = state->mergeavailmem[srcTape];		while (!LACKMEM(state) || state->mergenext[srcTape] == 0)		{			/* read next tuple, if any */			if ((tuplen = getlen(state, srcTape, true)) == 0)			{				state->mergeactive[srcTape] = false;				break;			}			tup = READTUP(state, srcTape, tuplen);			/* find or make a free slot in memtuples[] for it */			tupIndex = state->mergefreelist;			if (tupIndex)				state->mergefreelist = state->memtupindex[tupIndex];			else			{				tupIndex = state->mergefirstfree++;				/* Might need to enlarge arrays! */				if (tupIndex >= state->memtupsize)				{					FREEMEM(state, GetMemoryChunkSpace(state->memtuples));					FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));					state->memtupsize *= 2;					state->memtuples = (void **)						repalloc(state->memtuples,								 state->memtupsize * sizeof(void *));					state->memtupindex = (int *)						repalloc(state->memtupindex,								 state->memtupsize * sizeof(int));					USEMEM(state, GetMemoryChunkSpace(state->memtuples));					USEMEM(state, GetMemoryChunkSpace(state->memtupindex));				}			}			/* store tuple, append to list for its tape */			state->memtuples[tupIndex] = tup;			state->memtupindex[tupIndex] = 0;			if (state->mergelast[srcTape])				state->memtupindex[state->mergelast[srcTape]] = tupIndex;			else				state->mergenext[srcTape] = tupIndex;			state->mergelast[srcTape] = tupIndex;		}		/* update per-tape and global availmem counts */		spaceUsed = state->mergeavailmem[srcTape] - state->availMem;		state->mergeavailmem[srcTape] = state->availMem;		state->availMem = priorAvail - spaceUsed;	}}/* * dumptuples - remove tuples from heap and write to tape * * This is used during initial-run building, but not during merging. * * When alltuples = false, dump only enough tuples to get under the * availMem limit (and leave at least one tuple in the heap in any case, * since puttuple assumes it always has a tuple to compare to). * * When alltuples = true, dump everything currently in memory. * (This case is only used at end of input data.) * * If we empty the heap, close out the current run and return (this should * only happen at end of input data).  If we see that the tuple run number * at the top of the heap has changed, start a new run. */static voiddumptuples(Tuplesortstate *state, bool alltuples){	while (alltuples ||		   (LACKMEM(state) && state->memtupcount > 1))	{		/*		 * Dump the heap's frontmost entry, and sift up to remove it from the		 * heap.		 */		Assert(state->memtupcount > 0);		WRITETUP(state, state->tp_tapenum[state->destTape],				 state->memtuples[0]);		tuplesort_heap_siftup(state, true);		/*		 * If the heap is empty *or* top run number has changed, we've		 * finished the current run.		 */		if (state->memtupcount == 0 ||			state->currentRun != state->memtupindex[0])		{			markrunend(state, state->tp_tapenum[state->destTape]);			state->currentRun++;			state->tp_runs[state->destTape]++;			state->tp_dummy[state->destTape]--; /* per Alg D step D2 */#ifdef TRACE_SORT			if (trace_sort)				elog(LOG, "finished writing%s run %d: %s",					 (state->memtupcount == 0) ? " final" : "",					 state->currentRun,					 pg_rusage_show(&state->ru_start));#endif			/*			 * Done if heap is empty, else prepare for new run.			 */			if (state->memtupcount == 0)				break;			Assert(state->currentRun == state->memtupindex[0]);			selectnewtape(state);		}	}}/* * tuplesort_rescan		- rewind and replay the scan */voidtuplesort_rescan(Tuplesortstate *state){	Assert(state->randomAccess);	switch (state->status)	{		case TSS_SORTEDINMEM:			state->current = 0;			state->eof_reached = false;			state->markpos_offset = 0;			state->markpos_eof = false;			break;		case TSS_SORTEDONTAPE:			LogicalTapeRewind(state->tapeset,							  state->result_tape,							  false);			state->eof_reached = false;			state->markpos_block = 0L;			state->markpos_offset = 0;			state->markpos_eof = false;			break;		default:			elog(ERROR, "invalid tuplesort state");			break;	}}/* * tuplesort_markpos	- saves current position in the merged sort file */voidtuplesort_markpos(Tuplesortstate *state){	Assert(state->randomAccess);	switch (state->status)	{		case TSS_SORTEDINMEM:			state->markpos_offset = state->current;			state->markpos_eof = state->eof_reached;			break;		case TSS_SORTEDONTAPE:			LogicalTapeTell(state->tapeset,							state->result_tape,							&state->markpos_block,							&state->markpos_offset);			state->markpos_eof = state->eof_reached;			break;		default:			elog(ERROR, "invalid tuplesort state");			break;	}}/* * tuplesort_restorepos - restores current position in merged sort file to *						  last saved position */voidtuplesort_restorepos(Tuplesortstate *state){	Assert(state->randomAccess);	switch (state->status)	{		case TSS_SORTEDINMEM:			state->current = state->markpos_offset;			state->eof_reached = state->markpos_eof;			break;		case TSS_SORTEDONTAPE:			if (!LogicalTapeSeek(state->tapeset,								 state->result_tape,								 state->markpos_block,								 state->markpos_offset))				elog(ERROR, "tuplesort_restorepos failed");			state->eof_reached = state->markpos_eof;			break;		default:			elog(ERROR, "invalid tuplesort state");			break;	}}/* * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. * * The heap lives in state->memtuples[], with parallel data storage * for indexes in state->memtupindex[].  If checkIndex is true, use * the tuple index as the front of the sort key; otherwise, no. */#define HEAPCOMPARE(tup1,index1,tup2,index2) \	(checkIndex && (index1 != index2) ? (index1) - (index2) : \	 COMPARETUP(state, tup1, tup2))/* * Insert a new tuple into an empty or existing heap, maintaining the * heap invariant. */static voidtuplesort_heap_insert(Tuplesortstate *state, void *tuple,					  int tupleindex, bool checkIndex){	void	  **memtuples;	int		   *memtupindex;	int			j;	/*	 * Make sure memtuples[] can handle another entry.	 */	if (state->memtupcount >= state->memtupsize)	{		FREEMEM(state, GetMemoryChunkSpace(state->memtuples));		FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));		state->memtupsize *= 2;		state->memtuples = (void **)			repalloc(state->memtuples,					 state->memtupsize * sizeof(void *));		state->memtupindex = (int *)			repalloc(state->memtupindex,					 state->memtupsize * sizeof(int));		USEMEM(state, GetMemoryChunkSpace(state->memtuples));		USEMEM(state, GetMemoryChunkSpace(state->memtupindex));	}	memtuples = state->memtuples;	memtupindex = state->memtupindex;	/*	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is	 * using 1-based array indexes, not 0-based.	 */	j = state->memtupcount++;	while (j > 0)	{		int			i = (j - 1) >> 1;		if (HEAPCOMPARE(tuple, tupleindex,						memtuples[i], memtupindex[i]) >= 0)			break;		memtuples[j] = memtuples[i];		memtupindex[j] = memtupindex[i];		j = i;	}	memtuples[j] = tuple;	memtupindex[j] = tupleindex;}/* * The tuple at state->memtuples[0] has been removed from the heap. * Decrement memtupcount, and sift up to maintain the heap invariant. */static voidtuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex){	void	  **memtuples = state->memtuples;	int		   *memtupindex = state->memtupindex;	void	   *tuple;	int			tupindex,				i,				n;	if (--state->memtupcount <= 0)		return;	n = state->memtupcount;	tuple = memtuples[n];		/* tuple that must be reinserted */	tupindex = memtupindex[n];	i = 0;						/* i is where the "hole" is */	for (;;)	{		int			j = 2 * i + 1;		if (j >= n)			break;		if (j + 1 < n &&			HEAPCOMPARE(memtuples[j], memtupindex[j],						memtuples[j + 1], memtupindex[j + 1]) > 0)			j++;		if (HEAPCOMPARE(tuple, tupindex,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -