⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 shmemlockedqueue.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
			pOtherQueue->m_pProgressPollFunction();			//pOtherQueue->RemoveNextInsert(&g_MsgQueue, false);	}	else		WaitForSingleObject(hRemoteEvent, INFINITE);	ResetEvent(hRemoteEvent);	ReleaseMutex(hRemoteMutex);	return true;}// Function name	: ShmemLockedQueue::RemoveNext// Description	    : // Return type		: bool // Argument         : unsigned char *buffer// Argument         : unsigned int *length// Argument         : int *tag// Argument         : int *frombool ShmemLockedQueue::RemoveNext(	unsigned char *buffer, unsigned int *length, int *tag, int *from){	ShmemLockedQueueHeader *pTail, *pHead, *pMessage;	// Get the next available entry in the queue	while(true)	{		// Get the queue mutex		lock(m_plQMutex);				// Wait for the queue to not be empty		while (SHMEM_Q_HEAD_OFFSET == 0)		{			unlock(m_plQMutex);			if (m_bUseEvent)			{				if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) 						!= WAIT_OBJECT_0)				{					printf("ShmemLockedQueue:RemoveNext:Wait for MsgAvailableEvent on an empty queue failed, error %d\n", GetLastError());fflush(stdout);					return false;				}			}			else				wait(m_plMsgAvailableTrigger);			lock(m_plQMutex);		}				// Search the queue for the next entry not being read by another thread		pMessage = SHMEM_Q_HEAD_PTR;		pTail = SHMEM_Q_TAIL_PTR;				while ((pMessage->state == SHMEM_Q_BEING_READ) && (pMessage < pTail))			pMessage = (ShmemLockedQueueHeader*)				((LPBYTE)pMessage + pMessage->next_offset);				// If we haven't reached the tail and the element we are on is not 		// currently being written, then successfully break out of this loop		if ( (pMessage < pTail) && (pMessage->state != SHMEM_Q_BEING_WRITTEN) )			break;		// All messages are being read or the next message in order is 		// not ready. I need to reset MsgAvailableEvent, wait for it to be 		// signalled and then start over		if (m_bUseEvent)		{			ResetEvent(m_hMsgAvailableEvent);			unlock(m_plQMutex);			if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) 					!= WAIT_OBJECT_0)			{				printf("ShmemLockedQueue:RemoveNext:Wait for MsgAvailableEvent failed, error %d\n", GetLastError());fflush(stdout);				return false;			}		}		else		{			resetevent(m_plMsgAvailableTrigger);			unlock(m_plQMutex);			wait(m_plMsgAvailableTrigger);		}	}	// Check that the buffer provided is large enough to hold the data	if (pMessage->length > *length)	{		printf("ShmemLockedQueue:RemoveNext:shmem message length %d > %d user buffer length\n", pMessage->length, *length);		unlock(m_plQMutex);		return false;	}	// Mark the message as being read	pMessage->state = SHMEM_Q_BEING_READ;	unlock(m_plQMutex);	// Read the data from the message	*tag = pMessage->tag;	*from = pMessage->from;	*length = pMessage->length;	memcpy(buffer, 		(LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), pMessage->length);	lock(m_plQMutex);	// Mark the message as having been read	pMessage->state = SHMEM_Q_READ;	// Update the head and tail pointers of the queue	pHead = SHMEM_Q_HEAD_PTR;	pTail = SHMEM_Q_TAIL_PTR;			// Advance the head pointer over all the read messages	while ( (pHead < pTail) && (pHead->state == SHMEM_Q_READ) )		pHead = (ShmemLockedQueueHeader*)((LPBYTE)pHead + pHead->next_offset);	if (pHead >= pTail)	{		// When the head catches up to the tail, 		// the queue is empty so reset the pointers and signal the queue empty		SHMEM_Q_HEAD_OFFSET = 0;		SHMEM_Q_TAIL_OFFSET = 2*sizeof(unsigned long);		if (m_bUseEvent)			ResetEvent(m_hMsgAvailableEvent);		else			resetevent(m_plMsgAvailableTrigger);		setevent(m_plQEmptyTrigger);	}	else		SHMEM_Q_HEAD_OFFSET = (unsigned long)((LPBYTE)pHead - (LPBYTE)m_pBase);	unlock(m_plQMutex);	return true;}// Function name	: ShmemLockedQueue::RemoveNextInsert// Description	    : // Return type		: bool // Argument         : MessageQueue *pMsgQueuebool ShmemLockedQueue::RemoveNextInsert(MessageQueue *pMsgQueue, bool bBlocking){	ShmemLockedQueueHeader *pTail, *pHead, *pMessage;	if (bBlocking)	{		// Get the next available entry in the queue		while(true)		{			// Get the queue mutex			lock(m_plQMutex);						// Wait for the queue to not be empty			while (SHMEM_Q_HEAD_OFFSET == 0)			{				unlock(m_plQMutex);				if (m_bUseEvent)				{					if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) 						!= WAIT_OBJECT_0)					{						printf("ShmemLockedQueue:RemoveNextInsert:Wait for MsgAvailableEvent on an empty queue failed, error %d\n", GetLastError());fflush(stdout);						return false;					}				}				else					wait(m_plMsgAvailableTrigger);				lock(m_plQMutex);			}						// Search the queue for the next available entry			pMessage = SHMEM_Q_HEAD_PTR;			pTail = SHMEM_Q_TAIL_PTR;						while ((pMessage->state == SHMEM_Q_BEING_READ) && (pMessage < pTail))				pMessage = (ShmemLockedQueueHeader*)				((LPBYTE)pMessage + pMessage->next_offset);						// If we haven't reached the tail and the element we are on is not 			// currently being written, then successfully break out of this loop			if ( (pMessage < pTail) && (pMessage->state != SHMEM_Q_BEING_WRITTEN) )				break;						// All messages are being read or the next message in order is not 			// ready. I need to reset MsgAvailableEvent, wait for it to be 			// signalled and then start over			if (m_bUseEvent)			{				ResetEvent(m_hMsgAvailableEvent);				unlock(m_plQMutex);				if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) 					!= WAIT_OBJECT_0)				{					printf("ShmemLockedQueue:RemoveNextInsert:Wait for MsgAvailableEvent failed, error %d\n", GetLastError());fflush(stdout);					return false;				}			}			else			{				resetevent(m_plMsgAvailableTrigger);				unlock(m_plQMutex);				wait(m_plMsgAvailableTrigger);			}		}	}	else	{		// Try to get the next available entry in the queue		// Get the queue mutex		lock(m_plQMutex);					// Wait for the queue to not be empty		if (SHMEM_Q_HEAD_OFFSET == 0)		{			unlock(m_plQMutex);			return false;		}					// Search the queue for the next available entry		pMessage = SHMEM_Q_HEAD_PTR;		pTail = SHMEM_Q_TAIL_PTR;					while ((pMessage->state == SHMEM_Q_BEING_READ) && (pMessage < pTail))			pMessage = (ShmemLockedQueueHeader*)						((LPBYTE)pMessage + pMessage->next_offset);					// If we haven't reached the tail and the element we are on is not 		// currently being written, then successfully break out of this loop		if ( (pMessage >= pTail) || (pMessage->state == SHMEM_Q_BEING_WRITTEN) )		{			// All messages are being read or the next message in order is not 			// ready. I need to reset MsgAvailableEvent, wait for it to be 			// signalled and then start over			if (m_bUseEvent)			{				ResetEvent(m_hMsgAvailableEvent);				unlock(m_plQMutex);				return false;			}			else			{				resetevent(m_plMsgAvailableTrigger);				unlock(m_plQMutex);				return false;			}		}	}	MessageQueue::MsgQueueElement *pElement;	if (pMessage->from == -1)	{		unlock(m_plQMutex);		return false;	}	if (pMessage->state == SHMEM_Q_SHP_AVAIL_FOR_READ)	{		shpData data;		memcpy(&data, (LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), 			sizeof(shpData));		void *pLocal = 			g_MsgQueue.GetBufferToFill(pMessage->tag, data.length, 										pMessage->from, &pElement);		if (!ReadProcessMemory(					g_hProcesses[pMessage->from], 				data.address, pLocal, data.length , NULL))			//nt_error("Unable to read remote memory", pMessage->from);			MakeErrMsg(GetLastError(), "Unable to read remote memory in process %d", pMessage->from);		SetEvent(g_hShpSendCompleteEvent[g_nIproc]);		g_MsgQueue.SetElementEvent(pElement);	}	else	{		void *pBuffer = pMsgQueue->GetBufferToFill(			pMessage->tag, pMessage->length, pMessage->from, &pElement);				// Mark the message as being read		pMessage->state = SHMEM_Q_BEING_READ;		unlock(m_plQMutex);				// Read the data from the message		memcpy(pBuffer, (LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), 				pMessage->length);		pMsgQueue->SetElementEvent(pElement);				lock(m_plQMutex);	}	// Mark the message as having been read	pMessage->state = SHMEM_Q_READ;	// Update the head and tail pointers of the queue	pHead = SHMEM_Q_HEAD_PTR;	pTail = SHMEM_Q_TAIL_PTR;			// Advance the head pointer over all the read messages	while ( (pHead < pTail) && (pHead->state == SHMEM_Q_READ) )		pHead = (ShmemLockedQueueHeader*)((LPBYTE)pHead + pHead->next_offset);	if (pHead >= pTail)	{		// When the head catches up to the tail, the queue is empty so reset 		// the pointers and signal the queue empty		SHMEM_Q_HEAD_OFFSET = 0;		SHMEM_Q_TAIL_OFFSET = 2*sizeof(unsigned long);		if (m_bUseEvent)			ResetEvent(m_hMsgAvailableEvent);		else			resetevent(m_plMsgAvailableTrigger);		setevent(m_plQEmptyTrigger);	}	else		SHMEM_Q_HEAD_OFFSET = (unsigned long)((LPBYTE)pHead - (LPBYTE)m_pBase);	unlock(m_plQMutex);	return true;}// Function name	: ShmemLockedQueue::SetProgressFunction// Description	    : // Return type		: void // Argument         : void (*ProgressPollFunctionvoid ShmemLockedQueue::SetProgressFunction(void (*ProgressPollFunction)()){	m_pProgressPollFunction = ProgressPollFunction;	//printf("ShmemLockedQueue::SetProgressFunction called\n");fflush(stdout);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -