⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marslib.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
字号:
/** 
 *This is the source code for Mars, a MapReduce framework on graphics
 *processors.
 *Author: Wenbin Fang ( HKUST), Bingsheng He (HKUST)
 *Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
 *Wang (Sina.com).
 *If you have any question on the code, please contact us at {saven,
 *wenbin, luo}@cse.ust.hk.
 *The copyright is held by HKUST. Mars is provided "as is" without any 
 *guarantees of any kind.
 */
    
//scheduler
#include "MarsInc.h"
#include "MarsSchedMem.cu"
#include "MarsSchedFile.cu"
 
//------------------------------------------------------------
//validate runtime configurations
//------------------------------------------------------------
void InitMapReduce(Spec_t *spec)
{   
	BEN_ASSERT(spec != NULL);

	if (spec->mode & GPU)
		D_ANYDEVICE();
 
	//validate spec->mode
	if (!(spec->mode & USE_MEM) && !(spec->mode & USE_FILE))
		spec->mode |= USE_MEM;
	if (!(spec->mode & GPU) && !(spec->mode & CPU))
		spec->mode |= CPU;
	if (!(MAP_ONLY) && !(MAP_SORT) & !(MAP_SORT_REDUCE))
		spec->mode |= MAP_ONLY;
   
	//validate filename
	if (spec->interFile.keyFile == NULL)
		spec->interFile.keyFile = BenStrDup(DEFAULT_INTER_KEY_FILE);
	if (spec->interFile.valFile == NULL)
		spec->interFile.valFile = BenStrDup(DEFAULT_INTER_VAL_FILE);
	if (spec->interFile.indexFile == NULL)
		spec->interFile.indexFile = BenStrDup(DEFAULT_INTER_INDEX_FILE);
	if (spec->interFile.rangeFile == NULL)
		spec->interFile.rangeFile = BenStrDup(DEFAULT_INTER_RANGE_FILE);
	
	if (spec->outputFile.keyFile == NULL)
		spec->outputFile.keyFile = BenStrDup(DEFAULT_OUTPUT_KEY_FILE);
	if (spec->outputFile.valFile == NULL)
		spec->outputFile.valFile = BenStrDup(DEFAULT_OUTPUT_VAL_FILE);
	if (spec->outputFile.indexFile == NULL)
		spec->outputFile.indexFile = BenStrDup(DEFAULT_OUTPUT_INDEX_FILE);
	if (spec->outputFile.rangeFile == NULL)
		spec->outputFile.rangeFile = BenStrDup(DEFAULT_OUTPUT_RANGE_FILE);

	if (spec->tmpFile.keyFile == NULL)
		spec->tmpFile.keyFile = BenStrDup(DEFAULT_TMP_KEY_FILE);
	if (spec->tmpFile.valFile == NULL)
		spec->tmpFile.valFile = BenStrDup(DEFAULT_TMP_VAL_FILE);
	if (spec->tmpFile.indexFile == NULL)
		spec->tmpFile.indexFile = BenStrDup(DEFAULT_TMP_INDEX_FILE);
	if (spec->tmpFile.rangeFile == NULL)
		spec->tmpFile.rangeFile = BenStrDup(DEFAULT_TMP_RANGE_FILE);;
 
	//validate thread number
	if (spec->cpuMapThreadNum <= 0) 
		spec->cpuMapThreadNum = DEFAULT_CPU_THREAD_NUM;
	if (spec->cpuReduceThreadNum <= 0) 
		spec->cpuReduceThreadNum = DEFAULT_CPU_THREAD_NUM;
	if (spec->gpuMapGridDim <= 0)
		spec->gpuMapGridDim = DEFAULT_GPU_DIM;
	if (spec->gpuMapBlockDim <= 0)
		spec->gpuMapBlockDim = DEFAULT_GPU_DIM;
	if (spec->gpuReduceGridDim <= 0)
		spec->gpuReduceGridDim = DEFAULT_GPU_DIM;
	if (spec->gpuReduceBlockDim <= 0)
		spec->gpuReduceBlockDim = DEFAULT_GPU_DIM;
 
	//validate gpu share memory size
	if (spec->gpuMapSharedMemSize > GPU_SHARED_MEM_SIZE)
		spec->gpuMapSharedMemSize = 0;
	if (spec->gpuReduceSharedMemSize > GPU_SHARED_MEM_SIZE)
		spec->gpuReduceSharedMemSize = 0;

	if (spec->flushThreshhold <= 0)
		spec->flushThreshhold = DEFAULT_THRESHHOLD;

	//validate gpu input record ratio in all input records
	//range from 0.0 to 1.0
	if (spec->mode & GPU &&
		spec->mode & CPU)
	{   
		if (abs(spec->gpuInputRatio - 0.0f) < FLOAT_TINY)
		{
			spec->mode &= ~GPU;
			spec->mode |= CPU;
		}
		else if (abs(spec->gpuInputRatio - 1.0f) < FLOAT_TINY)
		{
			spec->mode &= ~GPU;
			spec->mode |= CPU;
		}
		else if (spec->gpuInputRatio < 0.0f || spec->gpuInputRatio > 1.0f)
		{
			spec->gpuInputRatio = DEFAULT_GPU_RATIO;
		}
	}
       
	FlushInputToDisk(spec);
}
 

//-----------------------------------------------------------
//Scheduler -- CPU or GPU processing, USE_MEM
//Param: phase -- MAP or REDUCE
//Return: 1 for success, 0 for failure
//-----------------------------------------------------------
char ScheduleSingleMem(Spec_t *spec, char phase)
{ 
	if (phase == MAP)
	{
		SingleMapMem(spec);
		return 1; 
	}   
	else if (phase == SORT)
	{
		SingleSortMem(spec);
		return 1;
	}
	else if (phase == REDUCE)
	{     
		if (spec->mode & MAP_SORT_REDUCE)
			SingleReduceMem(spec);
		return 1;
	}   
	else if (phase == MERGE_INTER)
	{
			SingleMergeInterMem(spec);
		return 1;
	}
	else if (phase == MERGE_OUTPUT)
	{
		if (spec->mode & MAP_SORT_REDUCE)
			SingleMergeOutputMem(spec);
		return 1;
	}
	
	return 0;
}
  
//-----------------------------------------------------------
//Scheduler -- CPU-GPU co-processing, USE_MEM
//Param: phase -- MAP or REDUCE
//Return: 1 for success, 0 for failure
//-----------------------------------------------------------
char ScheduleCoprocessMem(Spec_t *spec, char phase)
{ 
	if (phase == MAP)
	{
		CoprocessMapMem(spec);
		return 1;
	}
	else if (phase == REDUCE)
	{
		if (spec->mode & MAP_SORT_REDUCE)
			CoprocessReduceMem(spec);
		return 1;  
	} 
	else if (phase == MERGE_INTER)
	{  
		CoprocessMergeInterMem(spec);
 		return 1;
	}
	else if (phase == MERGE_OUTPUT)
	{
		if (spec->mode & MAP_SORT_REDUCE)
			CoprocessMergeOutputMem(spec);
		return 1;
	} 
	
	return 0;
} 
               
//-----------------------------------------------------------
//Scheduler -- CPU or GPU processing, USE_FILE
//Param: phase -- MAP or REDUCE
//Return: 1 for success, 0 for failure
//-----------------------------------------------------------
char ScheduleSingleFile(Spec_t *spec, char phase)
{       
	if (phase == MAP)
	{
		SingleMapFile(spec);
		return 1;
	}
	else if (phase == REDUCE)
	{ 
		if (spec->mode & MAP_SORT_REDUCE)
			SingleReduceFile(spec);
		return 1; 
	} 
	else if (phase == MERGE_INTER)
	{ 
		SingleMergeInterFile(spec); 
		return 1; 
	}
	else if (phase == MERGE_OUTPUT)
	{
		if (spec->mode & MAP_SORT_REDUCE)
			SingleMergeOutputFile(spec);
		return 1;
	}

	return 0;
}  
  
//-----------------------------------------------------------
//Scheduler -- CPU-GPU co-processing, USE_FILE
//Param: phase -- MAP or REDUCE
//Return: 1 for success, 0 for failure
//-----------------------------------------------------------
char ScheduleCoprocessFile(Spec_t *spec, char phase)
{ 
	if (phase == MAP)
	{   
		CoprocessMapFile(spec);
		return 1;
	}
	else if (phase == REDUCE)
	{   
		if (spec->mode & MAP_SORT_REDUCE)
			CoprocessReduceFile(spec);
		return 1; 
	}  
	else if (phase == MERGE_INTER)
	{     
		CoprocessMergeInterFile(spec);
		return 1;
	}   
	else if (phase == MERGE_OUTPUT)
	{
		if (spec->mode & MAP_SORT_REDUCE)
			CoprocessMergeOutputFile(spec);
		return 1;
	}
	 
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -