⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marsgpulib.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
📖 第 1 页 / 共 2 页
字号:
void StartGPUSort(Schedule_t *sched, char mode)
{
	char *d_interKeys = sched->outputSmallChunk.keys;
	char *d_interVals = sched->outputSmallChunk.vals;
	int4 *d_interIndex = (int4*)sched->outputSmallChunk.index;

	size_t allKeySize = sched->outputSmallChunk.keySize;
	size_t allValSize = sched->outputSmallChunk.valSize;
	size_t allCounts = sched->outputSmallChunk.recCount;

	char *d_outputKeys = d_interKeys;
	char *d_outputVals = d_interVals;
	int4 *d_outputIndex = NULL;
	int2 **outputKeyListRange;

	size_t interDiffKeyCount = 0;

	if (mode & MAP_SORT || mode & MAP_SORT_REDUCE)
	{
		d_outputIndex = (int4*)D_MALLOC(sizeof(int4)*allCounts);
		outputKeyListRange = (int2**)BenMalloc(sizeof(int2*));

		interDiffKeyCount = 
			GPUBitonicSortMem (d_interKeys, 
					  allKeySize, 
					  d_interVals, 
					  allValSize, 
					  d_interIndex, 
					  allCounts, 
					  d_outputKeys, 
					  d_outputVals, 
					  d_outputIndex, 
					  outputKeyListRange);
	}

	//---------------------------------------------
	//output
	//---------------------------------------------
	char *interKeys = (char*)BenMalloc(allKeySize);
	char *interVals = (char*)BenMalloc(allValSize);
	int4 *interIndex = (int4*)BenMalloc(sizeof(int4)*allCounts);
	
	if (mode & MAP_SORT || mode & MAP_SORT_REDUCE)
	{
		D_MEMCPY_D2H(interKeys, d_outputKeys, allKeySize);
		D_MEMCPY_D2H(interVals, d_outputVals, allValSize);
		D_MEMCPY_D2H(interIndex, d_outputIndex, sizeof(int4)*allCounts);
	
		sched->outputSmallChunk.diffKeyCount = interDiffKeyCount;
		sched->outputSmallChunk.keyListRange = *outputKeyListRange;	
		BenFree((char**)&outputKeyListRange, sizeof(int2**));
		//D_FREE(d_outputKeys, allKeySize);
		//D_FREE(d_outputVals, allValSize);
		D_FREE(d_outputIndex, sizeof(int4)*allCounts);
	}
	else
	{
		D_MEMCPY_D2H(interKeys, d_interKeys, allKeySize);
		D_MEMCPY_D2H(interVals, d_interVals, allValSize);
		D_MEMCPY_D2H(interIndex, (void*)d_interIndex, sizeof(int4)*allCounts);
	}
	
	sched->outputSmallChunk.keys = interKeys;
	sched->outputSmallChunk.vals = interVals;
	sched->outputSmallChunk.index = interIndex;

	D_FREE(d_interKeys, allKeySize);
	D_FREE(d_interVals, allValSize);
	D_FREE(d_interIndex, sizeof(int4)*allCounts);
}

__global__	void gpuReduceCount(char*		interKeys,
							  char*	    interVals,
							  int4*	    interOffsetSizes,
							  int2*		interKeyListRange,
							  size_t*   outputKeysSizePerTask,
							  size_t*   outputValsSizePerTask,
							  size_t*   outputCountPerTask,
							  size_t    recordNum, 
							  size_t    recordsPerTask,
							  size_t	taskNum,
							  size_t	keyOffset,
							  size_t	valOffset)
{
	size_t index = (blockIdx.x * blockDim.x + threadIdx.x);

	for (int i = 0; i <= recordsPerTask; i++)
	{
		int cindex = i*taskNum+index;
		if (cindex >= recordNum) return;
	
		int valStartIndex = interKeyListRange[cindex].x;
		int valCount = interKeyListRange[cindex].y - interKeyListRange[cindex].x;

		size_t keySize = interOffsetSizes[interKeyListRange[cindex].x].y;

		char *key = gpuGetRecordFromBuf(interKeys, 
			interOffsetSizes, valStartIndex, 0, keyOffset, valOffset);
		char *vals = gpuGetRecordFromBuf(interVals, 
			interOffsetSizes, valStartIndex, 1, keyOffset, valOffset);

		gpu_reduce_count(key,
		             vals,
				     keySize,
					 valCount,
					 interOffsetSizes,
				     outputKeysSizePerTask,
				     outputValsSizePerTask,
				     outputCountPerTask,
					 valStartIndex);
	}
}

__global__ void gpuReduce(char*		interKeys,
						char*		interVals,
						int4*		interOffsetSizes,
						int2*		interKeyListRange,
					    size_t*		psKeySizes,
					    size_t*		psValSizes,
					    size_t*		psCounts,
						char*		outputKeys,
						char*		outputVals,
						int4*		outputOffsetSizes,
						int2*		keyValOffsets,
						size_t*		curIndex,
						size_t		recordNum, 
						size_t		recordsPerTask,
						size_t		taskNum,
						size_t		keyOffset,
						size_t		valOffset)
{
	size_t index = (blockIdx.x * blockDim.x + threadIdx.x);

	outputOffsetSizes[psCounts[index]].x = psKeySizes[index];
	outputOffsetSizes[psCounts[index]].z = psValSizes[index];

	for (int i = 0; i <= recordsPerTask; i++)
	{
		int cindex = i*taskNum+index;
		if (cindex >= recordNum) return;
	 
		int valStartIndex = interKeyListRange[cindex].x;
		int valCount = interKeyListRange[cindex].y - interKeyListRange[cindex].x;

		size_t keySize = interOffsetSizes[interKeyListRange[cindex].x].y;

		char *key = gpuGetRecordFromBuf(interKeys, 
			interOffsetSizes, valStartIndex, 0, keyOffset, valOffset);
		char *vals = gpuGetRecordFromBuf(interVals, 
			interOffsetSizes, valStartIndex, 1, keyOffset, valOffset);

		gpu_reduce(key,
			   vals,
			   keySize,
			   valCount,
			   psKeySizes,
			   psValSizes,
			   psCounts,
			   keyValOffsets,
			   interOffsetSizes,
			   outputKeys,
			   outputVals,
			   outputOffsetSizes,
			   curIndex,
			   valStartIndex);
	}
}

void StartGPUReduce(Schedule_t *sched, char mode)
{
	//D_ENTER_FUNC("StartGPUReduce");
	BEN_ASSERT(sched != NULL);

	//-------------------------------------------------------
	//get reduce input data
	//-------------------------------------------------------
	//!!!
	size_t	interRecCount = sched->inputSmallChunk.recCount;
	size_t	interDiffKeyCount = sched->inputSmallChunk.diffKeyCount;
	size_t	interKeySize = sched->inputSmallChunk.keySize;
	size_t	interValSize = sched->inputSmallChunk.valSize;
	size_t	interKeyOffset = sched->inputSmallChunk.keyOffset;
	size_t	interValOffset = sched->inputSmallChunk.valOffset;
	//!!!

	if (interRecCount <= 0) return;

	//!!!
	char *interKeys = sched->inputSmallChunk.keys;
	char *interVals = sched->inputSmallChunk.vals;
	int4 *interIndex = sched->inputSmallChunk.index;
	int2 *interKeyListRange = sched->inputSmallChunk.keyListRange;
	//!!!

	//----------------------------------------------
	//determine the number of threads to run
	//----------------------------------------------
	size_t gridDim = sched->gpuReduceGridDim;
	size_t blockDim = sched->gpuReduceBlockDim;
	size_t sharedMemSize = sched->gpuReduceSharedMemSize;
	size_t threadNum = gridDim*blockDim;
	size_t recPerThread = interRecCount / threadNum;
	if (0 == recPerThread)
		recPerThread = 1;

	//---------------------------------------------------
	//transfer data to gpu device memory
	//---------------------------------------------------
	char*	d_interKeys = D_MALLOC(interKeySize);
	D_MEMCPY_H2D(d_interKeys, interKeys, interKeySize);

	char*	d_interVals = D_MALLOC(interValSize);
	D_MEMCPY_H2D(d_interVals, interVals, interValSize);

	int4*	d_interIndex = (int4*)D_MALLOC(sizeof(int4)*interRecCount);
	D_MEMCPY_H2D(d_interIndex, interIndex, sizeof(int4)*interRecCount);

	int2*	d_interKeyListRange = (int2*)D_MALLOC(sizeof(int2)*interDiffKeyCount);
	D_MEMCPY_H2D(d_interKeyListRange, interKeyListRange, sizeof(int2)*interDiffKeyCount);

	//----------------------------------------------
	//calculate output data keys'buf size 
	//	 and values' buf size
	//----------------------------------------------
	size_t*	d_outputKeySizePerThread = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);

	size_t*	d_outputValSizePerThread = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);

	size_t*	d_outputCountPerThread = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);

	gpuReduceCount<<<gridDim, blockDim, sharedMemSize>>>(d_interKeys,
										    d_interVals,
										    d_interIndex,
											d_interKeyListRange,
										    d_outputKeySizePerThread,
										    d_outputValSizePerThread,
										    d_outputCountPerThread,
										    interDiffKeyCount, 
										    recPerThread,
											threadNum,
											interKeyOffset,
											interValOffset);

	//-------------------------------------------
	//do prefix sum
	//-------------------------------------------
	size_t *d_psKeySizes = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);
	size_t allKeySize = prefexSum((int*)d_outputKeySizePerThread, (int*)d_psKeySizes, threadNum);

	size_t *d_psValSizes = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);
	size_t allValSize = prefexSum((int*)d_outputValSizePerThread, (int*)d_psValSizes, threadNum);

	size_t *d_psCounts = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);
	size_t allCounts = prefexSum((int*)d_outputCountPerThread, (int*)d_psCounts, threadNum);

	//----------------------------------------------------
	//allocate output buffer
	//----------------------------------------------------
	char*	d_outputKeys = D_MALLOC(allKeySize);
	char*	d_outputVals = D_MALLOC(allValSize);
	int4*	d_outputIndex = (int4*)D_MALLOC(sizeof(int4)*allCounts);

	//----------------------------------------------------
	//start reduce
	//----------------------------------------------------		
	int2*	d_keyValOffsets = (int2*)D_MALLOC(sizeof(int2)*threadNum);

	size_t*	d_curIndex = (size_t*)D_MALLOC(sizeof(size_t)*threadNum);
	
	gpuReduce<<<gridDim, blockDim, sharedMemSize>>>(d_interKeys,
									   d_interVals,
									   d_interIndex,
									   d_interKeyListRange,
									   d_psKeySizes,
									   d_psValSizes,
									   d_psCounts,
									   d_outputKeys,
									   d_outputVals,
									   d_outputIndex,
									   d_keyValOffsets,
									   d_curIndex,
									   interDiffKeyCount, 
									   recPerThread,
									   threadNum,
									   interKeyOffset,
									   interValOffset);
 
	//----------------------------------------------------
	//output
	//----------------------------------------------------
	char *outputKeys = (char*)BenMalloc(allKeySize);
	char *outputVals = (char*)BenMalloc(allValSize);
	int4 *outputIndex = (int4*)BenMalloc(sizeof(int4)*allCounts);

	D_MEMCPY_D2H(outputKeys, d_outputKeys, allKeySize);
	D_MEMCPY_D2H(outputVals, d_outputVals, allValSize);
	D_MEMCPY_D2H(outputIndex, d_outputIndex, sizeof(int4)*allCounts);

	sched->outputSmallChunk.keys = outputKeys;
	sched->outputSmallChunk.vals = outputVals;
	sched->outputSmallChunk.index = outputIndex;
	sched->outputSmallChunk.keySize = allKeySize;
	sched->outputSmallChunk.valSize = allValSize;
	sched->outputSmallChunk.indexSize = allCounts*sizeof(int4);
	sched->outputSmallChunk.recCount = allCounts;
	sched->outputSmallChunk.rangeSize = sched->outputSmallChunk.diffKeyCount*sizeof(int2);

	//----------------------------------------------------
	//clean
	//----------------------------------------------------
	D_FREE(d_interKeys, interKeySize);
	D_FREE(d_interVals, interValSize);
	D_FREE(d_interIndex, interRecCount * sizeof(int4));
	D_FREE(d_interKeyListRange, sizeof(int2)*interDiffKeyCount);

	D_FREE(d_outputKeySizePerThread, sizeof(size_t)*threadNum);
	D_FREE(d_outputValSizePerThread, sizeof(size_t)*threadNum);
	D_FREE(d_outputCountPerThread, sizeof(size_t)*threadNum);

	D_FREE(d_psKeySizes, sizeof(size_t)*threadNum);
	D_FREE(d_psValSizes, sizeof(size_t)*threadNum);
	D_FREE(d_psCounts, sizeof(size_t)*threadNum);

	D_FREE(d_outputKeys, allKeySize);
	D_FREE(d_outputVals, allValSize);
	D_FREE(d_outputIndex, allCounts*sizeof(int4));

	D_FREE(d_keyValOffsets, sizeof(int2)*threadNum);
	D_FREE(d_curIndex, sizeof(size_t)*threadNum);

	//D_LEAVE_FUNC("StartGPUReduce");
}

__device__ void gpuEmitCount(size_t		keySize,
						  size_t		valSize,
						  size_t*		outputKeysSizePerTask,
						  size_t*		outputValsSizePerTask,
						  size_t*		outputCountPerTask)
{
	size_t index = (blockIdx.x * blockDim.x + threadIdx.x);	

	outputKeysSizePerTask[index] += keySize;
	outputValsSizePerTask[index] += valSize;
	outputCountPerTask[index]++;
}

__device__ void gpuEmit  (char*		key, 
					   char*		val, 
					   size_t		keySize, 
					   size_t		valSize,
					   size_t*		psKeySizes, 
					   size_t*		psValSizes, 
					   size_t*		psCounts, 
					   int2*		keyValOffsets, 
					   char*		outputKeys,
					   char*		outputVals,
					   int4*		outputOffsetSizes,
					   size_t*		curIndex)
{
	size_t index = (blockIdx.x * blockDim.x + threadIdx.x);

	char4 *pKeySet = (char4*)(outputKeys + psKeySizes[index] + keyValOffsets[index].x);
	char4 *pValSet = (char4*)(outputVals + psValSizes[index] + keyValOffsets[index].y);

	copyData(pKeySet, (char4*)key, keySize);
	copyData(pValSet, (char4*)val, valSize);

	keyValOffsets[index].x += keySize;
	keyValOffsets[index].y += valSize;

	if (curIndex[index] != 0)
	{
	outputOffsetSizes[psCounts[index] + curIndex[index]].x = 
		(outputOffsetSizes[psCounts[index] + curIndex[index] - 1].x + 
		 outputOffsetSizes[psCounts[index] + curIndex[index] - 1].y);
	outputOffsetSizes[psCounts[index] + curIndex[index]].z = 
		(outputOffsetSizes[psCounts[index] + curIndex[index] - 1].z + 
		 outputOffsetSizes[psCounts[index] + curIndex[index] - 1].w);
	}
	
	outputOffsetSizes[psCounts[index] + curIndex[index]].y = keySize;
	
	outputOffsetSizes[psCounts[index] + curIndex[index]].w = valSize;

	curIndex[index]++;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -