⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 shmemlockedqueue.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "ShmemLockedQueue.h"#include <stdio.h>#include "lock.h"#define SHM_Q_INITIALIZED 0x12345678// Function name	: ShmemLockedQueue::ShmemLockedQueue// Description	    : // Return type		: ShmemLockedQueue::ShmemLockedQueue(){	m_dwMaxMsgSize = 0;	m_dwSize = 0;	m_hMapping = NULL;	m_hMsgAvailableEvent = NULL;	m_pBase = NULL;	m_pBottom = NULL;	m_pEnd = NULL;	m_plMsgAvailableTrigger = NULL;	m_plQEmptyTrigger = NULL;	m_plQMutex = NULL;	m_pProgressPollFunction = NULL;	char pszTemp[100];	if (GetEnvironmentVariable("MPICH_USE_POLLING", pszTemp, 100))		m_bUseEvent = false;	else		m_bUseEvent = true;}// Function name	: ShmemLockedQueue::~ShmemLockedQueue// Description	    : // Return type		: ShmemLockedQueue::~ShmemLockedQueue(){	if (m_hMapping != NULL)	{		if (m_pBottom != NULL)			//UnmapViewOfFile(m_pBottom);			UnmapViewOfFile((void*)(((LONG*)m_pBottom) - 1)); // back up over the initialized field to the true beginning		CloseHandle(m_hMapping);	}	if (m_hMsgAvailableEvent != NULL)		CloseHandle(m_hMsgAvailableEvent);	m_hMsgAvailableEvent = NULL;}// Function name	: ShmemLockedQueue::Init// Description	    : // Return type		: bool // Argument         : char *name// Argument         : unsigned long sizebool ShmemLockedQueue::Init(char *name, unsigned long size){	bool bFirst = true;	HANDLE hInitEvent = NULL;	LONG *pInitialized;	m_dwMaxMsgSize = size;	size = size + sizeof(ShmemLockedQueueHeader) + 6*sizeof(LONG);	// Create a mapping from the page file	m_hMapping = CreateFileMapping(		INVALID_HANDLE_VALUE,		NULL,		PAGE_READWRITE,		0, size,		name);	// If the mapping already exists then we are attaching to it, 	// not creating it	if (GetLastError() == ERROR_ALREADY_EXISTS)		bFirst = false;	if (m_hMapping == NULL)		return false;	// Map the file and save the pointer to the base of the mapped file	m_pBottom = MapViewOfFile(		m_hMapping,		FILE_MAP_WRITE,		0,0,		size);	if (m_pBottom == NULL)	{		CloseHandle(m_hMapping);		m_hMapping = NULL;		return false;	}	/* NEW Jan 9, 2003	 Added an initialized field before the m_pBottom pointer	 This means that m_pBottom must be moved back in the finalize function before it can be released	 */	pInitialized = (LONG*)m_pBottom;	m_pBottom = (void*)(((LONG*)m_pBottom) + 1);	m_plQMutex = (LONG*)m_pBottom;	m_plQEmptyTrigger = &((LONG*)m_pBottom)[1];	m_plMsgAvailableTrigger = &((LONG*)m_pBottom)[2];	m_pEnd = (LPBYTE)m_pBottom + size - sizeof(LONG);	m_pBase = (LPBYTE)m_pBottom + 3*sizeof(LONG);	m_dwSize = size - sizeof(LONG);	// If this process is creating the mapping, 	// then set up the head and tail pointers	if (bFirst)	{		((unsigned long*)m_pBase)[0] = 0;		((unsigned long*)m_pBase)[1] = 2*sizeof(unsigned long);		*m_plQMutex = 0;		*m_plQEmptyTrigger = 0;		*m_plMsgAvailableTrigger = 0;	}	// Create or Attach to the synchronization handles for this queue	char pszEventName[200];	sprintf(pszEventName, "%s.event", name);	m_hMsgAvailableEvent = 		CreateEvent(NULL, TRUE, FALSE, pszEventName);	if (m_hMsgAvailableEvent == NULL)	{		CloseHandle(m_hMapping);		CloseHandle(m_plQMutex);		m_hMapping = NULL;		m_plQMutex = NULL;		return false;	}	if (bFirst)	{	    // mark the queue as initialized	    *pInitialized = SHM_Q_INITIALIZED;	}	else	{	    // wait until the queue is initialized	    int retry = 100;	    while (*pInitialized != SHM_Q_INITIALIZED && retry)	    {		Sleep(200);		retry--;	    }	    if (*pInitialized != SHM_Q_INITIALIZED)		return false;	}	return true;}// Function name	: ShmemLockedQueue::Insert// Description	    : // Return type		: bool // Argument         : unsigned char *buffer// Argument         : unsigned int length// Argument         : int tag// Argument         : int frombool ShmemLockedQueue::Insert(	unsigned char *buffer, unsigned int length, int tag, int from){	ShmemLockedQueueHeader *pMessage;	lock(m_plQMutex);	if (length > m_dwMaxMsgSize)	{		unlock(m_plQMutex);		return false;	}	// Wait for a contiguous block large enough to hold the data 	while ( ( SHMEM_Q_TAIL_PTR >= m_pEnd ) || 			( (unsigned long)m_pEnd - 			  (unsigned long)SHMEM_Q_TAIL_PTR - 			   sizeof(ShmemLockedQueueHeader) < length) )	{		unlock(m_plQMutex);		if (m_pProgressPollFunction)		{			while (!test(m_plQEmptyTrigger))				m_pProgressPollFunction();		}		else			wait(m_plQEmptyTrigger);		lock(m_plQMutex);	}	// Read the tail pointer	pMessage = SHMEM_Q_TAIL_PTR;	// If the head offset is 0, set it to the tail	if ( SHMEM_Q_HEAD_OFFSET == 0 )		SHMEM_Q_HEAD_OFFSET = SHMEM_Q_TAIL_OFFSET;	// Set the state and advance the tail offset	pMessage->state = SHMEM_Q_BEING_WRITTEN;	// Advance the tail offset	SHMEM_Q_TAIL_OFFSET = 		(unsigned long)(			(unsigned long)pMessage + 			sizeof(ShmemLockedQueueHeader) + 			length - (unsigned long)m_pBase);	unlock(m_plQMutex);	// Write the header	pMessage->tag = tag;	pMessage->from = from;	pMessage->length = length;	pMessage->next_offset = (sizeof(ShmemLockedQueueHeader) + length);		// Copy the data	memcpy((LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), buffer, length);	lock(m_plQMutex);	// Signal data has arrived and release the mutex	pMessage->state = SHMEM_Q_AVAIL_FOR_READ;	if (m_bUseEvent)		SetEvent(m_hMsgAvailableEvent);	else		setevent(m_plMsgAvailableTrigger);	resetevent(m_plQEmptyTrigger);	unlock(m_plQMutex);	return true;}// NOTE: the SHP functions only work if this class is compiled with the // functions from nt_smp.cpp. Maybe in the future I will weave the SHP stuff// into this class.// Function name	: ShmemLockedQueue::InsertSHP// Description	    : This function acquires the mutex of the remote queue, //                    inserts the local buffer and length into the remote shmem//                    queue, and waits for the remote event to be signalled.  //                    The remote process removes the message from its shmem //                    queue, uses the information to read the buffer from this//                    process, and then signals the event meaning that the //                    buffer has been read and the user is free to touch the //                    buffer again.// Return type		: bool // Argument         : unsigned char *buffer// Argument         : unsigned int length// Argument         : int tag// Argument         : int from#include "nt_global_cpp.h"struct shpData{	void *address;	int length;};bool ShmemLockedQueue::InsertSHP(	unsigned char *buffer, unsigned int length, int tag, int from, 	HANDLE hRemoteMutex, HANDLE hRemoteEvent, ShmemLockedQueue *pOtherQueue){	ShmemLockedQueueHeader *pMessage;	shpData data;	data.address = buffer;	data.length = length;		WaitForSingleObject(hRemoteMutex, INFINITE);	lock(m_plQMutex);	// Wait for a contiguous block large enough to hold the data 	while ( ( SHMEM_Q_TAIL_PTR >= m_pEnd ) || 			( (unsigned long)m_pEnd - 			  (unsigned long)SHMEM_Q_TAIL_PTR - 			   sizeof(ShmemLockedQueueHeader) < sizeof(shpData) ) )	{		unlock(m_plQMutex);		if (m_pProgressPollFunction)		{			while (!test(m_plQEmptyTrigger))				m_pProgressPollFunction();		}		else			wait(m_plQEmptyTrigger);		lock(m_plQMutex);	}	// Read the tail pointer	pMessage = SHMEM_Q_TAIL_PTR;	// If the head offset is 0, set it to the tail	if ( SHMEM_Q_HEAD_OFFSET == 0 )		SHMEM_Q_HEAD_OFFSET = SHMEM_Q_TAIL_OFFSET;	// Set the state and advance the tail offset	pMessage->state = SHMEM_Q_BEING_WRITTEN;	// Advance the tail offset	SHMEM_Q_TAIL_OFFSET = (unsigned long)(		(unsigned long)pMessage + 		sizeof(ShmemLockedQueueHeader) + 		sizeof(shpData) - (unsigned long)m_pBase);	unlock(m_plQMutex);	// Write the header	pMessage->tag = tag;	pMessage->from = from;	pMessage->length = sizeof(shpData);	pMessage->next_offset = (sizeof(ShmemLockedQueueHeader) + sizeof(shpData));		// Copy the data	memcpy((LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), 		&data, sizeof(shpData));	lock(m_plQMutex);	// Signal data has arrived and release the mutex	pMessage->state = SHMEM_Q_SHP_AVAIL_FOR_READ;	if (m_bUseEvent)		SetEvent(m_hMsgAvailableEvent);	else		setevent(m_plMsgAvailableTrigger);	resetevent(m_plQEmptyTrigger);	unlock(m_plQMutex);	if (g_MsgQueue.m_pProgressPollFunction)	{		while (WaitForSingleObject(hRemoteEvent, 0) != WAIT_OBJECT_0)			g_MsgQueue.m_pProgressPollFunction();	}	else if (pOtherQueue->m_pProgressPollFunction)	{		while (WaitForSingleObject(hRemoteEvent, 0) != WAIT_OBJECT_0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -