⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marscpulib.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
📖 第 1 页 / 共 2 页
字号:

	//-----------------------------------------------
	//clean
	//-----------------------------------------------
CPU_MAP_EXIT:
	BenFree((char**)&keyValOffsets, sizeof(int2)*threadNum);
	BenFree((char**)&curIndex, sizeof(size_t)*threadNum);
	BenFree((char**)&g_map, sizeof(WorkerArg_t));

	BenFree((char**)&interKeySizePerThread, sizeof(size_t)*threadNum);
	BenFree((char**)&interValSizePerThread, sizeof(size_t)*threadNum);
	BenFree((char**)&interCountPerThread, sizeof(size_t)*threadNum);

	BenFree((char**)&g_mapcount, sizeof(CountArg_t));
	BenFree((char**)&tp, sizeof(BenThread_t)*threadNum);

	BenFree((char**)&psKeySizes, sizeof(size_t)*threadNum);
	BenFree((char**)&psValSizes, sizeof(size_t)*threadNum);
	BenFree((char**)&psCounts, sizeof(size_t)*threadNum);

	//LeaveFunc("StartCPUFunc");
}   

void StartCPUSort(Schedule_t *sched, char mode)
{
	if (mode & MAP_SORT || mode & MAP_SORT_REDUCE)
	{
		if (mode & USE_FILE)
			sched->outputSmallChunk.keyOffset = 0;
		QuickSortMem(&sched->outputSmallChunk);
		GroupByMem(&sched->outputSmallChunk);
		sched->outputSmallChunk.rangeSize = sched->outputSmallChunk.diffKeyCount*sizeof(int2);
	}      
}

void cpuEmitCount(size_t		keySize,
	  size_t		valSize,
	  size_t*		outputKeysSizePerTask,
	  size_t*		outputValsSizePerTask,
	  size_t*		outputCountPerTask,
	  int			index)
{
	outputKeysSizePerTask[index] += keySize;
	outputValsSizePerTask[index] += valSize;
	outputCountPerTask[index]++;
}

void cpuEmit  (char*		key, 
	   char*		val, 
	   size_t		keySize, 
	   size_t		valSize,
	   size_t*		psKeySizes, 
	   size_t*		psValSizes, 
	   size_t*		psCounts, 
	   int2*		keyValOffsets, 
	   char*		outputKeys,
	   char*		outputVals,
	   int4*		outputOffsetSizes,
	   size_t*		curIndex,
		int		index)
{ 
	char *pKeySet = (char*)(outputKeys +  psKeySizes[index]  + keyValOffsets[index].x);
	char *pValSet = (char*)(outputVals + psValSizes[index]   + keyValOffsets[index].y);
 
	BenMemcpy(pKeySet, (char*)key, keySize);
	BenMemcpy(pValSet, (char*)val, valSize);

	keyValOffsets[index].x += keySize;
	keyValOffsets[index].y += valSize;

	if (curIndex[index] != 0)
	{
	outputOffsetSizes[psCounts[index] + curIndex[index]].x = 
		(outputOffsetSizes[psCounts[index] + curIndex[index] - 1].x + 
		 outputOffsetSizes[psCounts[index] + curIndex[index] - 1].y);
	outputOffsetSizes[psCounts[index] + curIndex[index]].z = 
		(outputOffsetSizes[psCounts[index] +curIndex[index] - 1].z + 
		 outputOffsetSizes[psCounts[index] + curIndex[index] - 1].w);
	}
	
	outputOffsetSizes[psCounts[index] + curIndex[index]].y = keySize;
	
	outputOffsetSizes[psCounts[index] + curIndex[index]].w = valSize;

	curIndex[index]++;
}

void *cpuReduceCount(void *i)
{
	int index = (int)i;

	size_t keyOffset = g_reducecount->keyOffset;
	size_t valOffset = g_reducecount->valOffset;

	for (int i = 0; i <= g_reducecount->recPerThread; i++)
	{
		int cindex = i*g_reducecount->threadNum+index;
		if (cindex >= g_reducecount->recCount) return 0;
	
		int valStartIndex = g_reducecount->inKeyListRange[cindex].x;
		int valCount = g_reducecount->inKeyListRange[cindex].y - g_reducecount->inKeyListRange[cindex].x;

		size_t keySize = g_reducecount->inIndex[valStartIndex].y;

		char *key = cpuGetRecordFromBuf(g_reducecount->inKeys, 
			g_reducecount->inIndex, valStartIndex, 0, keyOffset, valOffset);
		char *vals = cpuGetRecordFromBuf(g_reducecount->inVals, 
			g_reducecount->inIndex, valStartIndex, 1, keyOffset, valOffset);

		cpu_reduce_count(key,
				vals,
				keySize,
				valCount,
				g_reducecount->inIndex,
				g_reducecount->interKeySizePerThread,
				g_reducecount->interValSizePerThread,
				g_reducecount->interCountPerThread,
				index,
				valStartIndex);
	}
	return 0;
}

void *cpuReduce(void *args)
{
	int index = (int)args;

	g_reduce->outIndex[g_reduce->psCounts[index]].x = g_reduce->psKeySizes[index];
	g_reduce->outIndex[g_reduce->psCounts[index]].z = g_reduce->psValSizes[index];

	for (int i = 0; i <= g_reduce->recPerThread; i++)
	{
		int cindex = i*g_reduce->threadNum+index;
		if (cindex >= g_reduce->recCount) return 0;
	
		int valStartIndex = g_reduce->inKeyListRange[cindex].x;
		int valCount = g_reduce->inKeyListRange[cindex].y - g_reduce->inKeyListRange[cindex].x;

		size_t keySize = g_reduce->inIndex[g_reduce->inKeyListRange[cindex].x].y;

		char *key = cpuGetRecordFromBuf(g_reduce->inKeys, 
			g_reduce->inIndex, valStartIndex, 0, g_reducecount->keyOffset, g_reducecount->valOffset);
		char *vals = cpuGetRecordFromBuf(g_reduce->inVals, 
			g_reduce->inIndex, valStartIndex, 1,
			g_reducecount->keyOffset, g_reducecount->valOffset);

		cpu_reduce(key,
			   vals,
			   keySize,
			   valCount,
			   g_reduce->psKeySizes,
			   g_reduce->psValSizes,
			   g_reduce->psCounts,
			   g_reduce->keyValOffsets,
			   g_reduce->inIndex,
			   g_reduce->outKeys,
			   g_reduce->outVals,
			   g_reduce->outIndex,
			   g_reduce->curIndex, 
				index,
				valStartIndex);
	}
	return 0;
}
//-------------------------------------------------
//main cpu reduce procedure
//-------------------------------------------------
void StartCPUReduce(Schedule_t *sched, char mode)
{
	BEN_ASSERT(sched != NULL);

	//-------------------------------------------------------
	//get reduce input data
	//-------------------------------------------------------
	//!!!
	size_t	interRecCount = sched->inputSmallChunk.diffKeyCount;
	//!!!

	if (interRecCount <= 0) return;

	//!!!
	char *interKeys = sched->inputSmallChunk.keys;
	char *interVals = sched->inputSmallChunk.vals;
	int4 *interIndex = sched->inputSmallChunk.index;
	int2 *interKeyListRange = sched->inputSmallChunk.keyListRange;
	//!!!

	//----------------------------------------------
	//determine the number of threads to run
	//----------------------------------------------
	size_t threadNum = sched->cpuReduceThreadNum;
	size_t recPerThread = interRecCount / threadNum;
	if (0 == recPerThread)
		recPerThread = 1;

	//----------------------------------------------
	//calculate intermediate data keys'buf size 
	//	 and values' buf size
	//----------------------------------------------
	size_t*	outputKeySizePerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
	size_t*	outputValSizePerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
	size_t*	outputCountPerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);

	g_reducecount = (CountArg_t*)BenMalloc(sizeof(CountArg_t));
	g_reducecount->inKeys = interKeys;
	g_reducecount->inVals = interVals;
	g_reducecount->inIndex = interIndex;
	g_reducecount->inKeyListRange = interKeyListRange;
	g_reducecount->interKeySizePerThread = outputKeySizePerThread;
	g_reducecount->interValSizePerThread = outputValSizePerThread;
	g_reducecount->interCountPerThread = outputCountPerThread;
	g_reducecount->recCount = interRecCount; 
	g_reducecount->recPerThread = recPerThread;
	g_reducecount->threadNum = threadNum;
	g_reducecount->keyOffset = 0;//!!!sched->inputSmallChunk.keyOffset;
	g_reducecount->valOffset = 0;//!!!sched->inputSmallChunk.valOffset;

	BenThread_t *tp = (BenThread_t*)BenMalloc(sizeof(BenThread_t)*threadNum);
	for (int i = 0; i < threadNum; i++)
	{
		tp[i] = BenNewThread(cpuReduceCount, (void*)i);
		//cpuReduceCount((void*)i);
	}
	BenWaitForMul(tp, threadNum);
 
	//-----------------------------------------------
	//prefix sum
	//-----------------------------------------------
	size_t allKeySize = outputKeySizePerThread[0];
	size_t allValSize = outputValSizePerThread[0];
	size_t allCounts = outputCountPerThread[0];

	size_t *psKeySizes = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
	size_t *psValSizes = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
	size_t *psCounts = (size_t*)BenMalloc(sizeof(size_t)*threadNum);

	for (int i = 1; i < threadNum; i++)
	{
		psKeySizes[i] += (outputKeySizePerThread[i-1] + psKeySizes[i-1]);
		psValSizes[i] += (outputValSizePerThread[i-1] + psValSizes[i-1]);
		psCounts[i] += (outputCountPerThread[i-1] + psCounts[i-1]);
	}

	allKeySize = (outputKeySizePerThread[threadNum-1]+psKeySizes[threadNum-1]);
	allValSize = outputValSizePerThread[threadNum-1]+psValSizes[threadNum-1];
	allCounts = outputCountPerThread[threadNum-1]+psCounts[threadNum-1];

	//-----------------------------------------------
	//allocate intermediate memory
	//-----------------------------------------------
	char*	outputKeys = (char*)BenMalloc(allKeySize);
	char*	outputVals = (char*)BenMalloc(allValSize);
	int4*	outputIndex = (int4*)BenMalloc(sizeof(int4)*allCounts);

	int2*	keyValOffsets = (int2*)BenMalloc(sizeof(int2)*threadNum);
	size_t*	curIndex = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
	//!!!size_t cur = sched->outputSmallCurCount;

	if (allCounts <= 0)
		goto CPU_REDUCE_EXIT;

	g_reduce = (WorkerArg_t*)BenMalloc(sizeof(WorkerArg_t));

	g_reduce->inKeys = interKeys;
	g_reduce->inVals = interVals;
	g_reduce->inIndex = interIndex;
	g_reduce->inKeyListRange = interKeyListRange;
	g_reduce->psKeySizes = psKeySizes;
	g_reduce->psValSizes = psValSizes;
	g_reduce->psCounts = psCounts;
	g_reduce->keyValOffsets = keyValOffsets;
	g_reduce->outKeys = outputKeys;
	g_reduce->outVals = outputVals;
	g_reduce->outIndex = outputIndex;
	g_reduce->curIndex = curIndex;
	g_reduce->recCount = interRecCount;
	g_reduce->recPerThread = recPerThread;
	g_reduce->threadNum = threadNum;

	for (int i = 0; i < threadNum; i++)
	{
		tp[i] = BenNewThread(cpuReduce, (void*)i);
		//cpuReduce((void*)i);
	}
	BenWaitForMul(tp, threadNum);

	//-----------------------------------------------
	//output
	//-----------------------------------------------

	sched->outputSmallChunk.keys = outputKeys;
	sched->outputSmallChunk.vals = outputVals;
	sched->outputSmallChunk.index = outputIndex;
	sched->outputSmallChunk.keySize = allKeySize;
	sched->outputSmallChunk.valSize = allValSize;
	sched->outputSmallChunk.indexSize = allCounts*sizeof(int4);
	sched->outputSmallChunk.recCount = allCounts;
 
	//-----------------------------------------------
	//clean
	//-----------------------------------------------
CPU_REDUCE_EXIT:
	BenFree((char**)&keyValOffsets, sizeof(int2)*threadNum);
	BenFree((char**)&curIndex, sizeof(size_t)*threadNum);
	BenFree((char**)&g_reduce, sizeof(WorkerArg_t));

	BenFree((char**)&outputKeySizePerThread, sizeof(size_t)*threadNum);
	BenFree((char**)&outputValSizePerThread, sizeof(size_t)*threadNum);
	BenFree((char**)&outputCountPerThread, sizeof(size_t)*threadNum);

	BenFree((char**)&g_reducecount, sizeof(CountArg_t));
	BenFree((char**)&tp, sizeof(BenThread_t)*threadNum);

	BenFree((char**)&psKeySizes, sizeof(size_t)*threadNum);
	BenFree((char**)&psValSizes, sizeof(size_t)*threadNum);
	BenFree((char**)&psCounts, sizeof(size_t)*threadNum);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -