⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marsschedfile.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
字号:
/**
 *This is the source code for Mars, a MapReduce framework on graphics
 *processors.
 *Author: Wenbin Fang (HKUST), Bingsheng He (HKUST)
 *Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
 *Wang (Sina.com).
 *If you have any question on the code, please contact us at {saven,
 *wenbin, luo}@cse.ust.hk.
 *The copyright is held by HKUST. Mars is provided "as is" without any 
 *guarantees of any kind.
 */

//-------------------------------------------------------
//for gpu only or cpu only
//-------------------------------------------------------
static void FillSortInfo(SortInfo_t *sortInfo, ChunkInfo_t *chunk)
{
	BEN_ASSERT(sortInfo != NULL);
	BEN_ASSERT(chunk != NULL);

	if (chunk->recCount <= 0 || chunk->diffKeyCount <= 0) return;

	size_t cur = sortInfo->realChunkCount;
	static size_t cursorOffset = 0;

	if (cur != 0) 
		sortInfo->chunks[cur].keyOffset = sortInfo->chunks[cur-1].keyOffset +
			sortInfo->chunks[cur-1].keySize;
	else
		sortInfo->chunks[cur].keyOffset = 0;
	sortInfo->chunks[cur].keySize = chunk->keySize;

	if (cur != 0) 
		sortInfo->chunks[cur].valOffset = sortInfo->chunks[cur-1].valOffset +
			sortInfo->chunks[cur-1].valSize;
	else
		sortInfo->chunks[cur].valOffset = 0;
	sortInfo->chunks[cur].valSize = chunk->valSize;

	if (cur != 0) 
		sortInfo->chunks[cur].indexOffset = sortInfo->chunks[cur-1].indexOffset +
			sortInfo->chunks[cur-1].indexSize;
	else
		sortInfo->chunks[cur].indexOffset = 0;
	sortInfo->chunks[cur].indexSize = chunk->indexSize;
	sortInfo->chunks[cur].recCount = chunk->recCount;

	if (cur != 0) 
		sortInfo->chunks[cur].rangeOffset = sortInfo->chunks[cur-1].rangeOffset +
			sortInfo->chunks[cur-1].rangeSize;
	else
		sortInfo->chunks[cur].rangeOffset = 0;
	sortInfo->chunks[cur].rangeSize = chunk->rangeSize;
	sortInfo->chunks[cur].diffKeyCount = chunk->diffKeyCount;
	sortInfo->chunks[cur].cursor = cursorOffset;
	cursorOffset += chunk->diffKeyCount;

	sortInfo->realChunkCount++;
	if (sortInfo->realChunkCount >= sortInfo->fullChunkCount)
	{
		sortInfo->chunks = (SortChunk_t*)BenRealloc(sortInfo->chunks, 
			sortInfo->fullChunkCount*sizeof(SortChunk_t),
			(sortInfo->fullChunkCount*2)*sizeof(SortChunk_t));
		sortInfo->fullChunkCount *= 2;
	}
}

static void SingleMapFile(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	Schedule_t *sched = NULL;
	if ((spec->mode & GPU) &&
		!(spec->mode & CPU))
	{
		sched = spec->gpuSched;
		BenMemset(sched, 0, sizeof(Schedule_t));
		sched->gpuMapGridDim = spec->gpuMapGridDim;
		sched->gpuMapBlockDim = spec->gpuMapBlockDim;
		sched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
	}
	else if (!(spec->mode & GPU) &&
			 (spec->mode & CPU))
	{
		sched = spec->cpuSched;
		BenMemset(sched, 0, sizeof(Schedule_t));
		sched->cpuMapThreadNum = spec->cpuMapThreadNum;
	}
	else
	{
		BenLog("Error: please use single processor!\n");
		return ;
	}

	while(spec->inputChunk->fileCursor < spec->totalInputRecCount)
	{
		ReadChunkFromFile(spec->inputChunk, &spec->inputFile, spec->totalInputRecCount,
			spec->inputChunk->fileCursor, spec->flushThreshhold);
		BenMemcpy(&(sched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
		if (spec->mode & GPU)	
			StartGPUMap(sched, spec->mode);
		else
			StartCPUMap(sched, spec->mode);

		spec->inputChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
		spec->inputChunk->keyOffset += spec->inputChunk->keySize;
		spec->inputChunk->valOffset += spec->inputChunk->valSize;

		if (!(spec->mode & MAP_ONLY))
		{
			RearrangeKeyVal(&(sched->outputSmallChunk));
			FillSortInfo(spec->sortInfo, &sched->outputSmallChunk);
			WriteChunkToFile(&(sched->outputSmallChunk), 
				&spec->interFile, &spec->totalInterRecCount, spec->mode);
		}
		else
		{
			WriteChunkToFile(&(sched->outputSmallChunk), 
				&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
		}

		FreeChunk(&(sched->inputSmallChunk));
		sched->outputSmallChunk.recCount = 0;
	}
}

static void SwapFileName(FileName_t *file1, FileName_t *file2)
{
	char *tmp = NULL;

	tmp = file1->keyFile;
	file1->keyFile = file2->keyFile;
	file2->keyFile = tmp;

	tmp = file1->valFile;
	file1->valFile = file2->valFile;
	file2->valFile = tmp;

	tmp = file1->indexFile;
	file1->indexFile = file2->indexFile;
	file2->indexFile = tmp;

	tmp = file1->rangeFile;
	file1->rangeFile = file2->rangeFile;
	file2->rangeFile = tmp;
}

static void SingleMergeInterFile(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	if (spec->mode & MAP_SORT)
	{
		SwapFileName(&spec->interFile, &spec->outputFile);
		spec->totalOutputRecCount = spec->totalInterRecCount;
		MergeSortFile(&spec->outputFile, &spec->tmpFile, spec->sortInfo, 
			spec->flushThreshhold, &spec->totalOutputRecCount, &spec->totalDiffKeyCount);
		SwapFileName(&spec->outputFile, &spec->tmpFile);
		//!!!
		//delete tmpFile
		//!!!
	}
	else if (spec->mode & MAP_SORT_REDUCE)
	{
		MergeSortFile(&spec->interFile, &spec->tmpFile, spec->sortInfo, 
			spec->flushThreshhold, &spec->totalInterRecCount, &spec->totalDiffKeyCount);
		SwapFileName(&spec->interFile, &spec->tmpFile);
		//!!!
		//delete tmpFile
		//!!!
	}
}

static void SingleReduceFile(Spec_t *spec)
{
	BEN_ASSERT(spec);
	Schedule_t *sched = NULL;
	if ((spec->mode & GPU) &&
		!(spec->mode & CPU))
	{
		sched = spec->gpuSched;
		BenMemset(sched, 0, sizeof(Schedule_t));
		sched->gpuReduceGridDim = spec->gpuReduceGridDim;
		sched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
		sched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
	}
	else if (!(spec->mode & GPU) &&
			 (spec->mode & CPU))
	{
		sched = spec->cpuSched;
		BenMemset(sched, 0, sizeof(Schedule_t));
		sched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
	}
	else
	{
		BenLog("Error: please use single processor!\n");
		return ;
	}

	while(spec->interChunk->fileCursor < spec->totalDiffKeyCount)
	{
		ReadGroupChunkFromFile(spec->interChunk, &spec->interFile, spec->totalDiffKeyCount,
			spec->interChunk->fileCursor, spec->flushThreshhold);
		
		BenMemcpy(&(sched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
		if (spec->mode & GPU)	
			StartGPUReduce(sched, spec->mode);
		else
			StartCPUReduce(sched, spec->mode);

		spec->interChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
		spec->interChunk->keyOffset += spec->interChunk->keySize;
		spec->interChunk->valOffset += spec->interChunk->valSize;

		if (!(spec->mode & MAP_ONLY))
		{
			RearrangeKeyVal(&(sched->outputSmallChunk));
			//FillSortInfo(spec->sortInfo, &sched->outputSmallChunk);
		}
		WriteChunkToFile(&(sched->outputSmallChunk), 
			&spec->outputFile, &spec->totalOutputRecCount, spec->mode);

		FreeChunk(spec->interChunk);
		sched->outputSmallChunk.recCount = 0;
	}
}

static void SingleMergeOutputFile(Spec_t *spec)
{
	BEN_ASSERT(spec);
}

//-------------------------------------------------------
//for gpu and cpu co-processing
//-------------------------------------------------------
static void CoprocessMapFile(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	while(spec->inputChunk->fileCursor < spec->totalInputRecCount)
	{
		//read from file
		ReadChunkFromFile(spec->inputChunk, &spec->inputFile, spec->totalInputRecCount,
			spec->inputChunk->fileCursor, spec->flushThreshhold);

		//set up gpu schedule configuration
		BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
		spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
		spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
		spec->gpuSched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
		BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
		spec->gpuSched->inputSmallChunk.recCount *= spec->gpuInputRatio;

		//set up cpu schedule configuration
		BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
		spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
		BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
		spec->cpuSched->inputSmallChunk.index += spec->gpuSched->inputSmallChunk.recCount;
		spec->cpuSched->inputSmallChunk.recCount -= spec->gpuSched->inputSmallChunk.recCount;

		//launch gpu and cpu processing
		BenThread_t tp[2];
		tp[0] = BenNewThread(gpuMapWorkerMem, (void*)spec);
		tp[1] = BenNewThread(cpuMapWorkerMem, (void*)spec);
		BenWaitForMul(tp, 2);
	
		//adjust cursors
		spec->inputChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
		spec->inputChunk->keyOffset += spec->inputChunk->keySize;
		spec->inputChunk->valOffset += spec->inputChunk->valSize;

		PrintRecords(&(spec->gpuSched->outputSmallChunk), NULL,
				(spec->gpuSched->outputSmallChunk.recCount), spec, INT, INT, 100);

		PrintRecords(&(spec->cpuSched->outputSmallChunk), NULL,
				(spec->cpuSched->outputSmallChunk.recCount), spec, INT, INT, 100);
		//in memory merge to interChunk
		//unsorted
		if (spec->mode & MAP_ONLY)
		{
			MergeUnsortMem(spec->interChunk, 
				&spec->gpuSched->outputSmallChunk,
				&spec->cpuSched->outputSmallChunk);
		
			//write interChunk to output file
			WriteChunkToFile(spec->interChunk, 
				&spec->outputFile, &spec->totalOutputRecCount, spec->mode);	
		}
		//sorted
		else  if (spec->mode & MAP_SORT ||
				spec->mode & MAP_SORT_REDUCE)
		{
			MergeSortMem(spec->interChunk, 
				&spec->gpuSched->outputSmallChunk,
				&spec->cpuSched->outputSmallChunk);
			spec->interChunk->rangeSize = spec->interChunk->diffKeyCount*sizeof(int2);

			RearrangeKeyVal(spec->interChunk);
			FillSortInfo(spec->sortInfo, spec->interChunk);

			size_t bk = spec->interChunk->keyOffset;
			size_t bv = spec->interChunk->valOffset;
			spec->interChunk->keyOffset = 0;
			spec->interChunk->valOffset = 0;
			PrintRecords(spec->interChunk, NULL,
				spec->interChunk->recCount, spec, INT, INT, 100);
			spec->interChunk->keyOffset = bk;
			spec->interChunk->valOffset = bv;

			if (spec->mode & MAP_SORT_REDUCE)
			{
				WriteChunkToFile(spec->interChunk, 
					&spec->interFile, &spec->totalInterRecCount, spec->mode);
			}
			else if (spec->mode & MAP_SORT)
			{
				WriteChunkToFile(spec->interChunk, 
					&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
			}
		}

		FreeChunk(spec->inputChunk);
		spec->inputChunk->recCount = 0;
		spec->gpuSched->outputSmallChunk.recCount = 0;
		spec->cpuSched->outputSmallChunk.recCount = 0;
	}//while*/
}

static void CoprocessMergeInterFile(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	if (spec->mode & MAP_SORT)
	{
		MergeSortFile(&spec->outputFile, &spec->tmpFile, spec->sortInfo, 
			spec->flushThreshhold, &spec->totalOutputRecCount, &spec->totalDiffKeyCount);
		SwapFileName(&spec->outputFile, &spec->tmpFile);
	}
	else if (spec->mode & MAP_SORT_REDUCE)
	{
		MergeSortFile(&spec->interFile, &spec->tmpFile, spec->sortInfo, 
			spec->flushThreshhold, &spec->totalInterRecCount, &spec->totalDiffKeyCount);
		SwapFileName(&spec->interFile, &spec->tmpFile);
	}
}

static void CoprocessReduceFile(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	//set up gpu schedule configuration
	while(spec->interChunk->fileCursor < spec->totalDiffKeyCount)
	{
		ReadGroupChunkFromFile(spec->interChunk, &spec->interFile, spec->totalDiffKeyCount,
			spec->interChunk->fileCursor, spec->flushThreshhold);

		BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
		spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
		spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
		spec->gpuSched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
		BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
		spec->gpuSched->inputSmallChunk.diffKeyCount *= spec->gpuInputRatio;
			
		//set up cpu schedule configuration
		BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
		spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
		BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
		spec->cpuSched->inputSmallChunk.keyListRange += spec->gpuSched->inputSmallChunk.diffKeyCount;
		spec->cpuSched->inputSmallChunk.diffKeyCount -= spec->gpuSched->inputSmallChunk.diffKeyCount;

		//launch gpu and cpu processing
		BenThread_t tp[2];
		tp[0] = BenNewThread(gpuReduceWorkerMem, (void*)spec);
		tp[1] = BenNewThread(cpuReduceWorkerMem, (void*)spec);
		BenWaitForMul(tp, 2);

//		gpuReduceWorkerMem(spec);
//		cpuReduceWorkerMem(spec);

		spec->interChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
		spec->interChunk->keyOffset += spec->interChunk->keySize;
		spec->interChunk->valOffset += spec->interChunk->valSize;

		MergeUnsortMem(spec->outputChunk, 
			&spec->gpuSched->outputSmallChunk,
			&spec->cpuSched->outputSmallChunk);

		WriteChunkToFile(spec->outputChunk, 
			&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
		
		FreeChunk(spec->interChunk);
	}//while
}

static void CoprocessMergeOutputFile(Spec_t *spec)
{

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -