⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 marsschedmem.cu

📁 GPU实现的MapReduce framework,对于学习并行编程和cuda平台的编程方面有着极好的参考价值
💻 CU
字号:
/**
 *This is the source code for Mars, a MapReduce framework on graphics
 *processors.
 *Author: Wenbin Fang (HKUST), Bingsheng He (HKUST)
 *Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
 *Wang (Sina.com).
 *If you have any question on the code, please contact us at {saven,
 *wenbin, luo}@cse.ust.hk.
 *The copyright is held by HKUST. Mars is provided "as is" without any 
 *guarantees of any kind.
 */

//---------------------------------------------------------
//for GPU-only or CPU-only
//---------------------------------------------------------
static void SingleMapMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	if ((spec->mode & GPU) &&
		!(spec->mode & CPU))
	{
		BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
		BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
		spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
		spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
		spec->gpuSched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
		StartGPUMap(spec->gpuSched, spec->mode);
	}
	else if (!(spec->mode & GPU) &&
			 (spec->mode & CPU))
	{
		BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
		BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
		spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
		StartCPUMap(spec->cpuSched, spec->mode);
	}
	else
	{
		BenLog("Error: please use single processor!\n");
		return;
	}
}

static void SingleSortMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);
//	if (spec->mode & MAP_SORT ||
//		spec->mode & MAP_SORT_REDUCE)
	{
		if (spec->mode & GPU)
		{
			if (!spec->cpuSort)
				StartGPUSort(spec->gpuSched, spec->mode);
			else
				StartGPUSort_cpu(spec->gpuSched, spec->mode);
		}
		else if (spec->mode & CPU)
		{
			StartCPUSort(spec->cpuSched, spec->mode);
		}
	}
}

static void SingleMergeInterMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	Schedule_t *sched = NULL;
	if ((spec->mode & GPU) &&
		!(spec->mode & CPU))
	{
		sched = spec->gpuSched;
	}
	else if (!(spec->mode & GPU) &&
			(spec->mode & CPU))
	{
		sched = spec->cpuSched;
	}
	else
	{
		BenLog("Error: please use single processor!\n");
		return;
	}

	if (spec->mode & MAP_ONLY ||
		spec->mode & MAP_SORT)
	{
		spec->totalOutputRecCount = sched->outputSmallChunk.recCount;
		spec->totalDiffKeyCount = sched->outputSmallChunk.diffKeyCount;
		BenMemcpy(spec->outputChunk, &(sched->outputSmallChunk), sizeof(ChunkInfo_t));
	}
	else
	{
		BenMemcpy(spec->interChunk, &(sched->outputSmallChunk), sizeof(ChunkInfo_t));
	}
}

static void SingleReduceMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	if ((spec->mode & GPU) &&
		!(spec->mode & CPU))
	{
		BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
		BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
		spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
		spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
		spec->gpuSched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
		StartGPUReduce(spec->gpuSched, spec->mode);
	}
	else if (!(spec->mode & GPU) &&
			 (spec->mode & CPU))
	{
		BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
		BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
		spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
		StartCPUReduce(spec->cpuSched, spec->mode);
	}
	else
	{
		BenLog("Error: please use single processor!\n");
		return;
	}
}

static void SingleMergeOutputMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	Schedule_t *sched = NULL;
	if ((spec->mode & GPU) &&
		!(spec->mode & CPU))
	{
		sched = spec->gpuSched;
	}
	else if (!(spec->mode & GPU) &&
			(spec->mode & CPU))
	{
		sched = spec->cpuSched;
	}
	else
	{
		BenLog("Error: please use single processor!\n");
		return;
	}

	spec->totalOutputRecCount = sched->outputSmallChunk.recCount;
	BenMemcpy(spec->outputChunk, &(sched->outputSmallChunk), sizeof(ChunkInfo_t));
}

//-------------------------------------------------------
//for gpu and cpu co-processing
//-------------------------------------------------------
static void *gpuMapWorkerMem(void *args)
{
	Spec_t *spec = (Spec_t*)args;
	BEN_ASSERT(spec != NULL);

	StartGPUMap(spec->gpuSched, spec->mode);
	StartGPUSort(spec->gpuSched, spec->mode);
	return 0;
}

static void *cpuMapWorkerMem(void *args)
{
	Spec_t *spec = (Spec_t*)args;
	BEN_ASSERT(spec != NULL);

	StartCPUMap(spec->cpuSched, spec->mode);
	StartCPUSort(spec->cpuSched, spec->mode);
	return 0;
}

static void CoprocessMapMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	//set up gpu schedule configuration
	BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
	spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
	spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
	spec->gpuSched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
	BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
	spec->gpuSched->inputSmallChunk.recCount *= spec->gpuInputRatio;
	
	//set up cpu schedule configuration
	BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
	spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
	BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
	spec->cpuSched->inputSmallChunk.index += spec->gpuSched->inputSmallChunk.recCount;
	spec->cpuSched->inputSmallChunk.recCount -= spec->gpuSched->inputSmallChunk.recCount;

	//launch gpu and cpu processing
	BenThread_t tp[2];
	tp[0] = BenNewThread(gpuMapWorkerMem, (void*)spec);
	tp[1] = BenNewThread(cpuMapWorkerMem, (void*)spec);
	BenWaitForMul(tp, 2);

	spec->totalInterRecCount += spec->gpuSched->outputSmallChunk.recCount;
	spec->totalInterRecCount += spec->cpuSched->outputSmallChunk.recCount;
}

static void MergeUnsortMem(ChunkInfo_t *dest, ChunkInfo_t *src1, ChunkInfo_t *src2)
{
	BEN_ASSERT(dest != NULL);
	BEN_ASSERT(src1 != NULL);
	BEN_ASSERT(src2 != NULL);

	dest->keySize = src1->keySize + src2->keySize;
	dest->valSize = src1->valSize + src2->valSize;
	dest->indexSize = src1->indexSize + src2->indexSize;
	dest->recCount = src1->recCount + src2->recCount;

	if (dest->recCount <= 0) return;

	dest->keys = (char*)BenMalloc(dest->keySize);
	dest->vals = (char*)BenMalloc(dest->valSize);
	dest->index = (int4*)BenMalloc(dest->indexSize);

	BenMemcpy(dest->keys, src1->keys, src1->keySize);
	BenMemcpy(dest->vals, src1->vals, src1->valSize);
	BenMemcpy(dest->index, src1->index, src1->indexSize);

	//adjust key/val offset
	for (int i = 0; i < src2->recCount; i++)
	{
		src2->index[i].x += src1->keySize;
		src2->index[i].z += src1->valSize;
	}
	BenMemcpy(dest->keys+src1->keySize, src2->keys, src2->keySize);
	BenMemcpy(dest->vals+src1->valSize, src2->vals, src2->valSize);	
	BenMemcpy(dest->index+src1->recCount, src2->index, src2->indexSize);

	BenFree((char**)&src1->keys, src1->keySize);
	BenFree((char**)&src1->vals, src1->valSize);
	BenFree((char**)&src1->index, src1->indexSize);
	BenFree((char**)&src2->keys, src2->keySize);
	BenFree((char**)&src2->vals, src2->valSize);
	BenFree((char**)&src2->index, src2->indexSize);
}

static void MergeSortMem(ChunkInfo_t *dest, ChunkInfo_t *src1, ChunkInfo_t *src2)
{
	BEN_ASSERT(dest != NULL);
	BEN_ASSERT(src1 != NULL);
	BEN_ASSERT(src2 != NULL);

	dest->keySize = src1->keySize + src2->keySize;
	dest->valSize = src1->valSize + src2->valSize;
	dest->indexSize = src1->indexSize + src2->indexSize;
	dest->recCount = src1->recCount + src2->recCount;

	dest->keys = (char*)BenMalloc(dest->keySize);
	dest->vals = (char*)BenMalloc(dest->valSize);
	dest->index = (int4*)BenMalloc(dest->indexSize);

	BenMemcpy(dest->keys, src1->keys, src1->keySize);
	BenMemcpy(dest->vals, src1->vals, src1->valSize);
	BenMemcpy(dest->keys+src1->keySize, src2->keys, src2->keySize);
	BenMemcpy(dest->vals+src1->valSize, src2->vals, src2->valSize);	

	//adjust key/val offset
	for (int i = 0; i < src2->recCount; i++)
	{
		src2->index[i].x += src1->keySize;
		src2->index[i].z += src1->valSize;
	}

/*	InitGroupMem(dest);

	int j, k, l;
	int2 *keyListRange1 = src1->keyListRange;
	int2 *keyListRange2 = src2->keyListRange;
	for (j = 0, k = 0, l = 0; 
		j < src1->recCount || k < src2->recCount;
		 )
	{
		if (j < src1->recCount &&
			k < src2->recCount)
		{
			if (cmp_wrap(src1->index+j, src2->index+k) <= 0)
			{
				size_t count = keyListRange1->y-keyListRange1->x;
				BenMemcpy(dest->index+l, src1->index+j, sizeof(int4)*count);
				j+=count;
				l+=count;
				keyListRange1++;
			}
			else
			{
				size_t count = keyListRange2->y-keyListRange2->x;
				BenMemcpy(dest->index+l, src2->index+k, sizeof(int4)*count);
				k+=count;
				l+=count;
				keyListRange2++;
			}
		}

		if (j >= src1->recCount &&
			k < src2->recCount)
		{
			size_t count = src2->recCount - k;
			BenMemcpy(dest->index+l, src2->index+k, sizeof(int4)*count);
			break;
		}

		if (j < src1->recCount &&
			k >= src2->recCount)
		{
			size_t count = src1->recCount - j;
			BenMemcpy(dest->index+l, src1->index+j, sizeof(int4)*count);
			break;
		}
	}*/
	//!!!
	BenMemcpy(dest->index, src1->index, src1->indexSize);
	BenMemcpy(dest->index+src1->recCount, src2->index, src2->indexSize);	
	QuickSortMem(dest);
	//!!!
	GroupByMem(dest);

	BenFree((char**)&src1->keys, src1->keySize);
	BenFree((char**)&src1->vals, src1->valSize);
	BenFree((char**)&src1->index, src1->indexSize);
	BenFree((char**)&src1->keyListRange, src1->diffKeyCount*sizeof(int2));
	BenFree((char**)&src2->keys, src2->keySize);
	BenFree((char**)&src2->vals, src2->valSize);
	BenFree((char**)&src2->index, src2->indexSize);
	BenFree((char**)&src2->keyListRange, src2->diffKeyCount*sizeof(int2));
}

static void CoprocessMergeInterMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	if (spec->mode & MAP_ONLY)
	{
		MergeUnsortMem(spec->outputChunk, &spec->gpuSched->outputSmallChunk,
			&spec->cpuSched->outputSmallChunk);
		spec->totalOutputRecCount = spec->cpuSched->outputSmallChunk.recCount +
			spec->gpuSched->outputSmallChunk.recCount;
	}
	else if (spec->mode & MAP_SORT)
	{
		MergeSortMem(spec->outputChunk, &spec->gpuSched->outputSmallChunk,
			&spec->cpuSched->outputSmallChunk);
		spec->totalOutputRecCount = spec->totalInterRecCount;
		spec->totalDiffKeyCount = spec->outputChunk->diffKeyCount;
	}
	else if (spec->mode & MAP_SORT_REDUCE)
	{
		MergeSortMem(spec->interChunk, &spec->gpuSched->outputSmallChunk,
			&spec->cpuSched->outputSmallChunk);
		spec->totalOutputRecCount = spec->cpuSched->outputSmallChunk.recCount +
			spec->gpuSched->outputSmallChunk.recCount;
	}
}

static void *gpuReduceWorkerMem(void *args)
{
	Spec_t *spec = (Spec_t*)args;
	BEN_ASSERT(spec != NULL);

	StartGPUReduce(spec->gpuSched, spec->mode);

	return 0;
}

static void *cpuReduceWorkerMem(void *args)
{
	Spec_t *spec = (Spec_t*)args;
	BEN_ASSERT(spec != NULL);

	StartCPUReduce(spec->cpuSched, spec->mode);

	return 0;
}

static void CoprocessReduceMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);

	//set up gpu schedule configuration
	BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
	spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
	spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
	spec->gpuSched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
	BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
	spec->gpuSched->inputSmallChunk.diffKeyCount *= spec->gpuInputRatio;
		
	//set up cpu schedule configuration
	BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
	spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
	BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
	spec->cpuSched->inputSmallChunk.keyListRange += spec->gpuSched->inputSmallChunk.diffKeyCount;
	spec->cpuSched->inputSmallChunk.diffKeyCount -= spec->gpuSched->inputSmallChunk.diffKeyCount;

	//launch gpu and cpu processing
	BenThread_t tp[2];
	tp[0] = BenNewThread(gpuReduceWorkerMem, (void*)spec);
	tp[1] = BenNewThread(cpuReduceWorkerMem, (void*)spec);
	BenWaitForMul(tp, 2);
	//gpuReduceWorkerMem(spec);
	//cpuReduceWorkerMem(spec);
}

static void CoprocessMergeOutputMem(Spec_t *spec)
{
	BEN_ASSERT(spec != NULL);
	MergeUnsortMem(spec->outputChunk, &spec->gpuSched->outputSmallChunk,
		&spec->cpuSched->outputSmallChunk);
	spec->totalOutputRecCount = spec->outputChunk->recCount;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -