⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marsutils.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
📖 第 1 页 / 共 3 页
字号:
		d_keyChunkSize = 0;
		d_valChunkSize = 0;
		d_indexChunkSize = 0;
	}
}
//---------------------------------------------------------
//main mapreduce procedure
//---------------------------------------------------------
double map_time = 0.0;
double merge_inter_time = 0.0;
double reduce_time = 0.0;
double merge_output_time = 0.0;
double group_time = 0.0;

void MapReduce(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	if (spec->mode & CPU)
	{
		spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
		spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
	}
	if (spec->mode & GPU)
	{
		spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
		spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
		spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
		spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
	}

	if (spec->mode & USE_MEM)
	{
		if (((spec->mode & CPU) &&
			!(spec->mode & GPU)) ||
			(!(spec->mode & CPU) &&
			(spec->mode & GPU)))
		{
			BenSetup(0);
			BenStart(0);
			if(!ScheduleSingleMem(spec, MAP)) return;
			BenStop(0);
			map_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if (!ScheduleSingleMem(spec, SORT)) return;
			BenStop(0);
			group_time += BenGetElapsedTime(0);
			
			BenReset(0);
			BenStart(0);
			if(!ScheduleSingleMem(spec, MERGE_INTER)) return;
			BenStop(0);
			merge_inter_time += BenGetElapsedTime(0);
			
			BenReset(0);
			BenStart(0);
			if(!ScheduleSingleMem(spec, REDUCE)) return;
			BenStop(0);
			reduce_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleSingleMem(spec, MERGE_OUTPUT)) return;
			BenStop(0);
			merge_output_time += BenGetElapsedTime(0);
		}
		else if ((spec->mode & CPU) && 
				(spec->mode & GPU))
		{
			BenSetup(0);
			BenStart(0);
			if(!ScheduleCoprocessMem(spec, MAP)) return;
			BenStop(0);
			map_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleCoprocessMem(spec, MERGE_INTER)) return;
			BenStop(0);
			merge_inter_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleCoprocessMem(spec, REDUCE)) return;
			BenStop(0);
			reduce_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleCoprocessMem(spec, MERGE_OUTPUT)) return;
			BenStop(0);
			merge_output_time += BenGetElapsedTime(0);
		}
	}
	else if (spec->mode & USE_FILE)
	{
		if (((spec->mode & CPU) &&
			!(spec->mode & GPU)) ||
			(!(spec->mode & CPU) &&
			(spec->mode & GPU)))
		{
			BenSetup(0);
			BenStart(0);
			if(!ScheduleSingleFile(spec, MAP)) return;
			BenStop(0);
			map_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleSingleFile(spec, MERGE_INTER)) return;
			BenStop(0);
			merge_inter_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleSingleFile(spec, REDUCE)) return;
			BenStop(0);
			reduce_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleSingleFile(spec, MERGE_OUTPUT)) return;
			BenStop(0);
			merge_output_time += BenGetElapsedTime(0);
		}
		else if ((spec->mode & CPU) && 
				(spec->mode & GPU))
		{
			BenSetup(0);
			BenStart(0);
			if(!ScheduleCoprocessFile(spec, MAP)) return;
			BenStop(0);
			map_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);
			if(!ScheduleCoprocessFile(spec, MERGE_INTER)) return;
			BenStop(0);
			merge_inter_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);			
			if(!ScheduleCoprocessFile(spec, REDUCE)) return;
			BenStop(0);
			reduce_time += BenGetElapsedTime(0);

			BenReset(0);
			BenStart(0);			
			if(!ScheduleCoprocessFile(spec, MERGE_OUTPUT)) return;
			BenStop(0);
			merge_output_time += BenGetElapsedTime(0);
		}
	}
	else 
	{
		BenLog("Error: please specify USE_MEM or USE_FILE\n");
		return ;
	}
}

//---------------------------------------------------------
//Clear buffer
//---------------------------------------------------------
void FinishMapReduce(Spec_t *spec)
{
	//EnterFunc("FinishMapReduce");
	BEN_ASSERT(spec != NULL);

	BenFree((char**)&(spec->inputFile.keyFile), BenStrLen(spec->inputFile.keyFile)+1);
	BenFree((char**)&(spec->inputFile.valFile), BenStrLen(spec->inputFile.valFile)+1);
	BenFree((char**)&(spec->inputFile.indexFile), BenStrLen(spec->inputFile.indexFile)+1);

	BenFree((char**)&(spec->interFile.keyFile), BenStrLen(spec->interFile.keyFile)+1);
	BenFree((char**)&(spec->interFile.valFile), BenStrLen(spec->interFile.valFile)+1);
	BenFree((char**)&(spec->interFile.indexFile), BenStrLen(spec->interFile.indexFile)+1);
	BenFree((char**)&(spec->interFile.rangeFile), BenStrLen(spec->interFile.rangeFile)+1);
	
	BenFree((char**)&(spec->outputFile.keyFile), BenStrLen(spec->outputFile.keyFile)+1);
	BenFree((char**)&(spec->outputFile.valFile), BenStrLen(spec->outputFile.valFile)+1);
	BenFree((char**)&(spec->outputFile.indexFile), BenStrLen(spec->outputFile.indexFile)+1);
	BenFree((char**)&(spec->outputFile.rangeFile), BenStrLen(spec->outputFile.rangeFile)+1);

	BenFree((char**)&(spec->tmpFile.keyFile), BenStrLen(spec->tmpFile.keyFile)+1);
	BenFree((char**)&(spec->tmpFile.valFile), BenStrLen(spec->tmpFile.valFile)+1);
	BenFree((char**)&(spec->tmpFile.indexFile), BenStrLen(spec->tmpFile.indexFile)+1);
	BenFree((char**)&(spec->tmpFile.rangeFile), BenStrLen(spec->tmpFile.rangeFile)+1);
	 	
	BenFree((char**)&(spec->inputChunk->keys), d_keyChunkSize);
	BenFree((char**)&(spec->inputChunk->vals), d_valChunkSize);
	BenFree((char**)&(spec->inputChunk->index), d_indexChunkSize);
	
	if (spec->mode & MAP_SORT)
	{
		BenFree((char**)&(spec->outputChunk->keyListRange), sizeof(int2)*spec->outputChunk->diffKeyCount);
	//	BenFree((char**)&(spec->outputChunk->groupInfo), sizeof(int4)*spec->outputChunk->recCount);
	}
	else if (spec->mode & MAP_SORT_REDUCE)
	{
		BenFree((char**)&(spec->interChunk->keyListRange), sizeof(int2)*spec->interChunk->diffKeyCount);	
//		BenFree((char**)&(spec->interChunk->groupInfo), sizeof(int4)*spec->outputChunk->recCount);	
	}
	BenFree((char**)&(spec->outputChunk->keys), spec->outputChunk->keySize);
	BenFree((char**)&spec->outputChunk->vals, spec->outputChunk->valSize);
	BenFree((char**)&spec->outputChunk->index, spec->outputChunk->indexSize);

	BenFree((char**)&(spec->interChunk->keys), spec->interChunk->keySize);
	BenFree((char**)&(spec->interChunk->vals), spec->interChunk->valSize);
	BenFree((char**)&(spec->interChunk->index), spec->interChunk->indexSize);
	
	BenFree((char**)&(spec->inputChunk), sizeof(ChunkInfo_t));
	BenFree((char**)&(spec->interChunk), sizeof(ChunkInfo_t));
	BenFree((char**)&(spec->outputChunk), sizeof(ChunkInfo_t));

	BenFree((char**)&spec->sortInfo->chunks, sizeof(SortChunk_t)*spec->sortInfo->fullChunkCount);
	BenFree((char**)&spec->sortInfo, sizeof(SortInfo_t));

	BenFree((char**)&spec->cpuSched, sizeof(Schedule_t));
	BenFree((char**)&spec->gpuSched, sizeof(Schedule_t));
	BenFree((char**)&spec, sizeof(Spec_t));

	d_keyChunkSize = 0;
	d_valChunkSize = 0;
	d_indexChunkSize = 0;

	//LeaveFunc("FinishMapReduce");
}

//-------------------------------------------------------
//for the ease to free key/val/index buffers in a chunk
//-------------------------------------------------------
void FreeChunk(ChunkInfo_t *chunk)
{
	BEN_ASSERT(chunk != NULL);
	BenFree((char**)&chunk->keys, chunk->keySize);
	BenFree((char**)&chunk->vals, chunk->valSize);
	BenFree((char**)&chunk->index, chunk->indexSize);
	BenFree((char**)&chunk->keyListRange, chunk->rangeSize);
	chunk->keySize = 0;
	chunk->valSize = 0;
	chunk->indexSize = 0;
	chunk->rangeSize = 0;
	chunk->diffKeyCount = 0;
	chunk->recCount = 0;
}

//========================================================
//Iterators
//========================================================
//--------------------------------------------------------
//input iterator
//--------------------------------------------------------
RecIterator_t *InitIterator(ChunkInfo_t *chunk, FileName_t *file, size_t totalRec, Spec_t *spec)
{
	RecIterator_t *it = (RecIterator_t*)BenMalloc(sizeof(RecIterator_t));
	
	//read from file
	if (spec->mode & USE_FILE)
	{
		it->file = file;
		it->chunk = chunk;
		it->totalRecCount = totalRec;
		it->chunkSize = spec->flushThreshhold;
	}
	//read from main memory
	else
	{
		it->chunk = chunk;
		it->totalRecCount = totalRec;
	}
	return it;
}

GroupIterator_t *InitGroupIterator(ChunkInfo_t *chunk, FileName_t *file, size_t totalGroup, Spec_t *spec)
{
	GroupIterator_t *it = (GroupIterator_t*)BenMalloc(sizeof(GroupIterator_t));
	//read from file
	if (spec->mode & USE_FILE)
	{
		it->file = file;
		it->chunk = chunk;
		it->chunkSize = spec->flushThreshhold;
		it->totalGroup = totalGroup;
	}
	//read from main memory
	else
	{
		it->chunk = chunk;
	}
	return it;
}

//---------------------------------------------------------
//get small chunk from file
//@param: totalRec -- total number of records in file
//@param: cursor -- current record cursor in index file
//@param: chunkSize -- the number of records to read
//---------------------------------------------------------
bool ReadChunkFromFile(ChunkInfo_t *chunk, FileName_t *file, 
					  size_t totalRec, size_t cursor, size_t chunkSize)
{
	BEN_ASSERT(file != NULL);

	if (cursor >= totalRec) return false;

	//get indexSize
	size_t recCount = chunkSize;
	if (recCount + cursor >= totalRec)
		recCount = totalRec - cursor;
	chunk->recCount = recCount;

	//read from index file
	size_t indexOffset = cursor * sizeof(int4);
	size_t indexSize = recCount * sizeof(int4);
	//chunk->index = (int4*)BenMalloc(indexSize);
	chunk->index = (int4*)BenReadFile(file->indexFile, indexOffset, indexSize);
	chunk->indexSize = indexSize;

	//get keySize & keyOffset & allocate key buffer
	size_t keyOffset = chunk->index[0].x;
	size_t keySize = chunk->index[recCount-1].x + 
		chunk->index[recCount-1].y - chunk->index[0].x;
	//chunk->keys = (char*)BenMalloc(keySize);
	chunk->keys = BenReadFile(file->keyFile, keyOffset, keySize);
	chunk->keySize = keySize;
	chunk->keyOffset = keyOffset;

	//get valSize & valOffset & allocate val buffer
	size_t valOffset = chunk->index[0].z;
	size_t valSize = chunk->index[recCount-1].z +
		chunk->index[recCount-1].w - chunk->index[0].z;
	//chunk->vals = (char*)BenMalloc(valSize);
	chunk->vals = BenReadFile(file->valFile, valOffset, valSize);
	chunk->valSize = valSize;
	chunk->valOffset = valOffset;

	return true;
}

//---------------------------------------------------------
//get small group chunk from file
//@param: totalRec -- total number of records in file
//@param: cursor -- current record cursor in index file
//@param: chunkSize -- the number of records to read
//---------------------------------------------------------
bool ReadGroupChunkFromFile(ChunkInfo_t *chunk, FileName_t *file, 
					  size_t totalGroup, size_t cursor, size_t chunkSize)
{
	BEN_ASSERT(file != NULL);

	if (cursor >= totalGroup) return false;

	//get indexSize
	size_t groupCount = chunkSize;
	if (groupCount + cursor >= totalGroup)
		groupCount = totalGroup - cursor;
	chunk->diffKeyCount = groupCount;

	//read from range file
	size_t rangeOffset = cursor * sizeof(int2);
	size_t rangeSize = groupCount * sizeof(int2);
	chunk->keyListRange = (int2*)BenReadFile(file->rangeFile, rangeOffset, rangeSize);
	chunk->rangeSize = rangeSize;

	//read from index file
	size_t recCount = chunk->keyListRange[groupCount-1].y - chunk->keyListRange->x;
	size_t indexOffset = chunk->keyListRange->x * sizeof(int4);
	size_t indexSize = recCount * sizeof(int4);
	chunk->index = (int4*)BenReadFile(file->indexFile, indexOffset, indexSize);
	chunk->indexSize = indexSize;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -