⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marssort.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
📖 第 1 页 / 共 4 页
字号:
	size_t chunkSize = chunkRecCount / (sortInfo->realChunkCount+1);
	size_t *recOffsets = (size_t*)BenMalloc(sizeof(size_t)*sortInfo->realChunkCount);

	size_t totalKeyCount = 0;

	for (int i = 0; i < sortInfo->realChunkCount; i++)
	{
		//initialize small chunk
		GetSmallSortChunk(file, smallChunks, sortInfo->chunks, 
			chunkSize, bigChunkBitmap, i, recOffsets);
		totalKeyCount += sortInfo->chunks[i].diffKeyCount;
	}

	ChunkInfo_t *outputChunk = (ChunkInfo_t*)BenMalloc(sizeof(ChunkInfo_t));

	SortRec_t *minRec = (SortRec_t*)BenMalloc(sizeof(SortRec_t));
	char flag = 0;
	size_t which = 0;
	size_t only_one = 0;
	size_t finishedCount = 0;

	size_t actualRecCount = 0;
	SortRec_t tmpRec;
	while (1)
	{		
		finishedCount = 0;
		for (int i = 0; i < sortInfo->realChunkCount; i++)
		{
			if (bigChunkBitmap[i] == 0) 
			{
				finishedCount++;
				continue;
			}
			else
			{
				only_one = i;

				//run out small chunk?
				if (smallChunks[i].smallCursor >= smallChunks[i].diffKeyCount)
				{
					BenFree((char**)&smallChunks[i].keys, smallChunks[i].keySize);
					BenFree((char**)&smallChunks[i].vals, smallChunks[i].valSize);
					BenFree((char**)&smallChunks[i].index, smallChunks[i].indexSize);
					BenFree((char**)&smallChunks[i].keyListRange, smallChunks[i].rangeSize);

					if(!GetSmallSortChunk(file, smallChunks, sortInfo->chunks, 
						chunkSize, bigChunkBitmap, i, recOffsets))
						continue;
					smallChunks[i].smallCursor = 0;
				}

				if (flag == 0)
				{
					GetSortRec(minRec, smallChunks, i, recOffsets);
					flag = 1;
					which = i;
					continue;
				}
 
				//fetch record from small chunk
				GetSortRec(&tmpRec, smallChunks, i, recOffsets);

				//compare
				if (cpu_compare(minRec->key, minRec->keySize,
					tmpRec.key, tmpRec.keySize) <= 0)
				{ 
					continue;
				}
				else
				{
					BenMemcpy(minRec, &tmpRec, sizeof(SortRec_t));
					which = i;
				}
			}//if -- else --

		}//for

		if (finishedCount != (sortInfo->realChunkCount-1))
		{
			smallChunks[which].smallCursor++;	
			flag = 0;
		}
		else
		{
			break;
		}

		actualRecCount++;
		BenLog("*****************%d in %d chunk, \n", *(int*)minRec->key, which);

		//output records
		OutputAndGroupRecs(outputChunk, tmpfile, minRec, chunkSize);
	}//while(1)

	BenLog("actualRecCount:%d,totalKeyCount:%d\n", actualRecCount, totalKeyCount);

	//reminder
	if (actualRecCount < totalKeyCount)
	{
		do
		{
			if (smallChunks[only_one].smallCursor >= 
				smallChunks[only_one].diffKeyCount)
			{
				if(!GetSmallSortChunk(file, smallChunks, sortInfo->chunks, 
					chunkSize, bigChunkBitmap, only_one, recOffsets))
					break;
				smallChunks[only_one].smallCursor = 0;
			}
			
			for (int i = smallChunks[only_one].smallCursor; 
				i < smallChunks[only_one].diffKeyCount;
				i++)
			{
				actualRecCount++;
				GetSortRec(&tmpRec, smallChunks, only_one, recOffsets);
				smallChunks[only_one].smallCursor++;
				BenLog("*****************%d in %d chunk, \n", *(int*)minRec->key, which);

				OutputAndGroupRecs(outputChunk, tmpfile, &tmpRec, chunkSize);
			}

			BenFree((char**)&smallChunks[only_one].keys, smallChunks[only_one].keySize);
			BenFree((char**)&smallChunks[only_one].vals, smallChunks[only_one].valSize);
			BenFree((char**)&smallChunks[only_one].index, smallChunks[only_one].indexSize);
			BenFree((char**)&smallChunks[only_one].keyListRange, smallChunks[only_one].rangeSize);
		} while(smallChunks[only_one].cursor < sortInfo->chunks[only_one].diffKeyCount);
	}//if (actualRecCount < totalKeyCount)

	BenLog("actualRecCount:%d,totalKeyCount:%d,groupOffset:%d,totalRecCount:%d\n", 
		actualRecCount, totalKeyCount, groupOffset, totalRecCount);	

	FlushSortChunk(outputChunk, tmpfile);
	
	*totalOutputRecCount = totalRecCount;
	*totalOutputDiffKeyCount = totalGroupCount;

	BenFree((char**)&recOffsets, sizeof(size_t)*sortInfo->realChunkCount);
	BenFree((char**)&minRec, sizeof(SortRec_t));

	BenFree((char**)&outputChunk, sizeof(ChunkInfo_t));

	BenFree((char**)&bigChunkBitmap, sortInfo->realChunkCount);
	BenFree((char**)&smallChunks, 
		sizeof(SortChunk_t)*sortInfo->realChunkCount);

	//LeaveFunc("MergeSortFile");
}

//==============================================================
//GPU sort
//==============================================================
#define NUM_BLOCK_PER_CHUNK_BITONIC_SORT 512//b256
#define SHARED_MEM_INT2 256
#define NUM_BLOCKS_CHUNK 512
#define	NUM_THREADS_CHUNK 256//(256)
#define CHUNK_SIZE (NUM_BLOCKS_CHUNK*NUM_THREADS_CHUNK)
#define NUM_CHUNKS_R (NUM_RECORDS_R/CHUNK_SIZE)

__device__ int getCompareValue(void *d_rawData, cmp_type_t value1, cmp_type_t value2)
{
	int compareValue=0;
	int v1=value1.x;
	int v2=value2.x;
	if((v1==-1) || (v2==-1))
	{
		if(v1==v2)
			compareValue=0;
		else
			if(v1==-1)
				compareValue=-1;
			else
				compareValue=1;
	}
	else
		compareValue=gpu_compare((void*)(((char*)d_rawData)+v1), value1.y, (void*)(((char*)d_rawData)+v2), value2.y); 

	return compareValue;
}

void * s_qsRawData=NULL;


__global__ void
partBitonicSortKernel( void* d_rawData, int totalLenInBytes,cmp_type_t* d_R, unsigned int numRecords, int chunkIdx, int unitSize)
{
	__shared__ cmp_type_t shared[NUM_THREADS_CHUNK];

	int tx = threadIdx.x;
	int bx = blockIdx.x;

	//load the data
	int dataIdx = chunkIdx*CHUNK_SIZE+bx*blockDim.x+tx;
	int unitIdx = ((NUM_BLOCKS_CHUNK*chunkIdx + bx)/unitSize)&1;
	shared[tx] = d_R[dataIdx];
	__syncthreads();
	int ixj=0;
	int a=0;
	cmp_type_t temp1;
	cmp_type_t temp2;
	int k = NUM_THREADS_CHUNK;

	if(unitIdx == 0)
	{
		for (int j = (k>>1); j>0; j =(j>>1))
		{
			ixj = tx ^ j;
			//a = (shared[tx].y - shared[ixj].y);				
			temp1=shared[tx];
			temp2= shared[ixj];
			if (ixj > tx) {
				//a=temp1.y-temp2.y;
				//a=compareString((void*)(((char4*)d_rawData)+temp1.x),(void*)(((char4*)d_rawData)+temp2.x)); 
				a=getCompareValue(d_rawData, temp1, temp2);
				if ((tx & k) == 0) {
					if ( (a>0)) {
						shared[tx]=temp2;
						shared[ixj]=temp1;
					}
				}
				else {
					if ( (a<0)) {
						shared[tx]=temp2;
						shared[ixj]=temp1;
					}
				}
			}
				
			__syncthreads();
		}
	}
	else
	{
		for (int j = (k>>1); j>0; j =(j>>1))
		{
			ixj = tx ^ j;
			temp1=shared[tx];
			temp2= shared[ixj];
			
			if (ixj > tx) {					
				//a=temp1.y-temp2.y;					
				//a=compareString((void*)(((char4*)d_rawData)+temp1.x),(void*)(((char4*)d_rawData)+temp2.x));
				a=getCompareValue(d_rawData, temp1, temp2);
				if ((tx & k) == 0) {
					if( (a<0))
					{
						shared[tx]=temp2;
						shared[ixj]=temp1;
					}
				}
				else {
					if( (a>0))
					{
						shared[tx]=temp2;
						shared[ixj]=temp1;
					}
				}
			}
			
			__syncthreads();
		}
	}

	d_R[dataIdx] = shared[tx];
}

__global__ void
unitBitonicSortKernel(void* d_rawData, int totalLenInBytes, cmp_type_t* d_R, unsigned int numRecords, int chunkIdx )
{
	__shared__ cmp_type_t shared[NUM_THREADS_CHUNK];

	int tx = threadIdx.x;
	int bx = blockIdx.x;
	int unitIdx = (NUM_BLOCKS_CHUNK*chunkIdx + bx)&1;

	//load the data
	int dataIdx = chunkIdx*CHUNK_SIZE+bx*blockDim.x+tx;
	shared[tx] = d_R[dataIdx];
	__syncthreads();

	cmp_type_t temp1;
	cmp_type_t temp2;
	int ixj=0;
	int a=0;
	if(unitIdx == 0)
	{
		for (int k = 2; k <= NUM_THREADS_CHUNK; (k =k<<1))
		{
			// bitonic merge:
			for (int j = (k>>1); j>0; (j=j>>1))
			{
				ixj = tx ^ j;	
				temp1=shared[tx];
				temp2= shared[ixj];
				if (ixj > tx) {					
					//a=temp1.y-temp2.y;
					//a=compareString((void*)(((char4*)d_rawData)+temp1.x),(void*)(((char4*)d_rawData)+temp2.x));
					a=getCompareValue(d_rawData, temp1, temp2);
					if ((tx & k) == 0) {
						if ( (a>0)) {
							shared[tx]=temp2;
							shared[ixj]=temp1;
						}
					}
					else {
						if ( (a<0)) {
							shared[tx]=temp2;
							shared[ixj]=temp1;
						}
					}
				}
				
				__syncthreads();
			}
		}
	}
	else
	{
		for (int k = 2; k <= NUM_THREADS_CHUNK; (k =k<<1))
		{
			// bitonic merge:
			for (int j = (k>>1); j>0; (j=j>>1))
			{
				ixj = tx ^ j;
				temp1=shared[tx];
				temp2= shared[ixj];
				if (ixj > tx) {					
					//a=temp1.y-temp2.y;
					//a=compareString((void*)(((char4*)d_rawData)+temp1.x),(void*)(((char4*)d_rawData)+temp2.x));
					a=getCompareValue(d_rawData, temp1, temp2);
					if ((tx & k) == 0) {
						if( (a<0))
						{
							shared[tx]=temp2;
							shared[ixj]=temp1;
						}
					}
					else {
						if( (a>0))
						{
							shared[tx]=temp2;
							shared[ixj]=temp1;
						}
					}
				}
				
				__syncthreads();
			}
		}

	}

	d_R[dataIdx] = shared[tx];
}

__global__ void
bitonicKernel( void* d_rawData, int totalLenInBytes, cmp_type_t* d_R, unsigned int numRecords, int k, int j)
{
	int bx = blockIdx.x;
	int by = blockIdx.y;
	int tid = threadIdx.x;
	int dataIdx = by*gridDim.x*blockDim.x + bx*blockDim.x + tid;

	int ixj = dataIdx^j;

	if( ixj > dataIdx )
	{
		cmp_type_t tmpR = d_R[dataIdx];
		cmp_type_t tmpIxj = d_R[ixj];
		if( (dataIdx&k) == 0 )
		{
			//if( tmpR.y > tmpIxj.y )
			//if(compareString((void*)(((char4*)d_rawData)+tmpR.x),(void*)(((char4*)d_rawData)+tmpIxj.x))==1) 
			if(getCompareValue(d_rawData, tmpR, tmpIxj)==1)
			{
				d_R[dataIdx] = tmpIxj;
				d_R[ixj] = tmpR;
			}
		}
		else
		{
			//if( tmpR.y < tmpIxj.y )
			//if(compareString((void*)(((char4*)d_rawData)+tmpR.x),(void*)(((char4*)d_rawData)+tmpIxj.x))==-1) 
			if(getCompareValue(d_rawData, tmpR, tmpIxj)==-1)
			{
				d_R[dataIdx] = tmpIxj;
				d_R[ixj] = tmpR;
			}
		}
	}
}

__device__ inline void swap(cmp_type_t & a, cmp_type_t & b)
{
	// Alternative swap doesn't use a temporary register:
	// a ^= b;
	// b ^= a;
	// a ^= b;
	
    cmp_type_t tmp = a;
    a = b;
    b = tmp;
}

__global__ void bitonicSortSingleBlock_kernel(void* d_rawData, int totalLenInBytes, cmp_type_t * d_values, int rLen, cmp_type_t* d_output)
{
	__shared__ cmp_type_t bs_cmpbuf[SHARED_MEM_INT2];
	

    //const int by = blockIdx.y;
	//const int bx = blockIdx.x;
	const int tx = threadIdx.x;
	const int ty = threadIdx.y;	
	const int tid=tx+ty*blockDim.x;
	//const int bid=bx+by*gridDim.x;
	//const int numThread=blockDim.x;
	//const int resultID=(bx)*numThread+tid;
	
	if(tid<rLen)
	{
		bs_cmpbuf[tid] = d_values[tid];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -