📄 shmemlockedqueue.cpp
字号:
#include "ShmemLockedQueue.h"#include <stdio.h>#include "lock.h"#define SHM_Q_INITIALIZED 0x12345678// Function name : ShmemLockedQueue::ShmemLockedQueue// Description : // Return type : ShmemLockedQueue::ShmemLockedQueue(){ m_dwMaxMsgSize = 0; m_dwSize = 0; m_hMapping = NULL; m_hMsgAvailableEvent = NULL; m_pBase = NULL; m_pBottom = NULL; m_pEnd = NULL; m_plMsgAvailableTrigger = NULL; m_plQEmptyTrigger = NULL; m_plQMutex = NULL; m_pProgressPollFunction = NULL; char pszTemp[100]; if (GetEnvironmentVariable("MPICH_USE_POLLING", pszTemp, 100)) m_bUseEvent = false; else m_bUseEvent = true;}// Function name : ShmemLockedQueue::~ShmemLockedQueue// Description : // Return type : ShmemLockedQueue::~ShmemLockedQueue(){ if (m_hMapping != NULL) { if (m_pBottom != NULL) //UnmapViewOfFile(m_pBottom); UnmapViewOfFile((void*)(((LONG*)m_pBottom) - 1)); // back up over the initialized field to the true beginning CloseHandle(m_hMapping); } if (m_hMsgAvailableEvent != NULL) CloseHandle(m_hMsgAvailableEvent); m_hMsgAvailableEvent = NULL;}// Function name : ShmemLockedQueue::Init// Description : // Return type : bool // Argument : char *name// Argument : unsigned long sizebool ShmemLockedQueue::Init(char *name, unsigned long size){ bool bFirst = true; HANDLE hInitEvent = NULL; LONG *pInitialized; m_dwMaxMsgSize = size; size = size + sizeof(ShmemLockedQueueHeader) + 6*sizeof(LONG); // Create a mapping from the page file m_hMapping = CreateFileMapping( INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, name); // If the mapping already exists then we are attaching to it, // not creating it if (GetLastError() == ERROR_ALREADY_EXISTS) bFirst = false; if (m_hMapping == NULL) return false; // Map the file and save the pointer to the base of the mapped file m_pBottom = MapViewOfFile( m_hMapping, FILE_MAP_WRITE, 0,0, size); if (m_pBottom == NULL) { CloseHandle(m_hMapping); m_hMapping = NULL; return false; } /* NEW Jan 9, 2003 Added an initialized field before the m_pBottom pointer This means that m_pBottom must be moved back in the finalize function before it can be released */ pInitialized = (LONG*)m_pBottom; m_pBottom = (void*)(((LONG*)m_pBottom) + 1); m_plQMutex = (LONG*)m_pBottom; m_plQEmptyTrigger = &((LONG*)m_pBottom)[1]; m_plMsgAvailableTrigger = &((LONG*)m_pBottom)[2]; m_pEnd = (LPBYTE)m_pBottom + size - sizeof(LONG); m_pBase = (LPBYTE)m_pBottom + 3*sizeof(LONG); m_dwSize = size - sizeof(LONG); // If this process is creating the mapping, // then set up the head and tail pointers if (bFirst) { ((unsigned long*)m_pBase)[0] = 0; ((unsigned long*)m_pBase)[1] = 2*sizeof(unsigned long); *m_plQMutex = 0; *m_plQEmptyTrigger = 0; *m_plMsgAvailableTrigger = 0; } // Create or Attach to the synchronization handles for this queue char pszEventName[200]; sprintf(pszEventName, "%s.event", name); m_hMsgAvailableEvent = CreateEvent(NULL, TRUE, FALSE, pszEventName); if (m_hMsgAvailableEvent == NULL) { CloseHandle(m_hMapping); CloseHandle(m_plQMutex); m_hMapping = NULL; m_plQMutex = NULL; return false; } if (bFirst) { // mark the queue as initialized *pInitialized = SHM_Q_INITIALIZED; } else { // wait until the queue is initialized int retry = 100; while (*pInitialized != SHM_Q_INITIALIZED && retry) { Sleep(200); retry--; } if (*pInitialized != SHM_Q_INITIALIZED) return false; } return true;}// Function name : ShmemLockedQueue::Insert// Description : // Return type : bool // Argument : unsigned char *buffer// Argument : unsigned int length// Argument : int tag// Argument : int frombool ShmemLockedQueue::Insert( unsigned char *buffer, unsigned int length, int tag, int from){ ShmemLockedQueueHeader *pMessage; lock(m_plQMutex); if (length > m_dwMaxMsgSize) { unlock(m_plQMutex); return false; } // Wait for a contiguous block large enough to hold the data while ( ( SHMEM_Q_TAIL_PTR >= m_pEnd ) || ( (unsigned long)m_pEnd - (unsigned long)SHMEM_Q_TAIL_PTR - sizeof(ShmemLockedQueueHeader) < length) ) { unlock(m_plQMutex); if (m_pProgressPollFunction) { while (!test(m_plQEmptyTrigger)) m_pProgressPollFunction(); } else wait(m_plQEmptyTrigger); lock(m_plQMutex); } // Read the tail pointer pMessage = SHMEM_Q_TAIL_PTR; // If the head offset is 0, set it to the tail if ( SHMEM_Q_HEAD_OFFSET == 0 ) SHMEM_Q_HEAD_OFFSET = SHMEM_Q_TAIL_OFFSET; // Set the state and advance the tail offset pMessage->state = SHMEM_Q_BEING_WRITTEN; // Advance the tail offset SHMEM_Q_TAIL_OFFSET = (unsigned long)( (unsigned long)pMessage + sizeof(ShmemLockedQueueHeader) + length - (unsigned long)m_pBase); unlock(m_plQMutex); // Write the header pMessage->tag = tag; pMessage->from = from; pMessage->length = length; pMessage->next_offset = (sizeof(ShmemLockedQueueHeader) + length); // Copy the data memcpy((LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), buffer, length); lock(m_plQMutex); // Signal data has arrived and release the mutex pMessage->state = SHMEM_Q_AVAIL_FOR_READ; if (m_bUseEvent) SetEvent(m_hMsgAvailableEvent); else setevent(m_plMsgAvailableTrigger); resetevent(m_plQEmptyTrigger); unlock(m_plQMutex); return true;}// NOTE: the SHP functions only work if this class is compiled with the // functions from nt_smp.cpp. Maybe in the future I will weave the SHP stuff// into this class.// Function name : ShmemLockedQueue::InsertSHP// Description : This function acquires the mutex of the remote queue, // inserts the local buffer and length into the remote shmem// queue, and waits for the remote event to be signalled. // The remote process removes the message from its shmem // queue, uses the information to read the buffer from this// process, and then signals the event meaning that the // buffer has been read and the user is free to touch the // buffer again.// Return type : bool // Argument : unsigned char *buffer// Argument : unsigned int length// Argument : int tag// Argument : int from#include "nt_global_cpp.h"struct shpData{ void *address; int length;};bool ShmemLockedQueue::InsertSHP( unsigned char *buffer, unsigned int length, int tag, int from, HANDLE hRemoteMutex, HANDLE hRemoteEvent, ShmemLockedQueue *pOtherQueue){ ShmemLockedQueueHeader *pMessage; shpData data; data.address = buffer; data.length = length; WaitForSingleObject(hRemoteMutex, INFINITE); lock(m_plQMutex); // Wait for a contiguous block large enough to hold the data while ( ( SHMEM_Q_TAIL_PTR >= m_pEnd ) || ( (unsigned long)m_pEnd - (unsigned long)SHMEM_Q_TAIL_PTR - sizeof(ShmemLockedQueueHeader) < sizeof(shpData) ) ) { unlock(m_plQMutex); if (m_pProgressPollFunction) { while (!test(m_plQEmptyTrigger)) m_pProgressPollFunction(); } else wait(m_plQEmptyTrigger); lock(m_plQMutex); } // Read the tail pointer pMessage = SHMEM_Q_TAIL_PTR; // If the head offset is 0, set it to the tail if ( SHMEM_Q_HEAD_OFFSET == 0 ) SHMEM_Q_HEAD_OFFSET = SHMEM_Q_TAIL_OFFSET; // Set the state and advance the tail offset pMessage->state = SHMEM_Q_BEING_WRITTEN; // Advance the tail offset SHMEM_Q_TAIL_OFFSET = (unsigned long)( (unsigned long)pMessage + sizeof(ShmemLockedQueueHeader) + sizeof(shpData) - (unsigned long)m_pBase); unlock(m_plQMutex); // Write the header pMessage->tag = tag; pMessage->from = from; pMessage->length = sizeof(shpData); pMessage->next_offset = (sizeof(ShmemLockedQueueHeader) + sizeof(shpData)); // Copy the data memcpy((LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), &data, sizeof(shpData)); lock(m_plQMutex); // Signal data has arrived and release the mutex pMessage->state = SHMEM_Q_SHP_AVAIL_FOR_READ; if (m_bUseEvent) SetEvent(m_hMsgAvailableEvent); else setevent(m_plMsgAvailableTrigger); resetevent(m_plQEmptyTrigger); unlock(m_plQMutex); if (g_MsgQueue.m_pProgressPollFunction) { while (WaitForSingleObject(hRemoteEvent, 0) != WAIT_OBJECT_0) g_MsgQueue.m_pProgressPollFunction(); } else if (pOtherQueue->m_pProgressPollFunction) { while (WaitForSingleObject(hRemoteEvent, 0) != WAIT_OBJECT_0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -