📄 shmemlockedqueue.cpp
字号:
pOtherQueue->m_pProgressPollFunction(); //pOtherQueue->RemoveNextInsert(&g_MsgQueue, false); } else WaitForSingleObject(hRemoteEvent, INFINITE); ResetEvent(hRemoteEvent); ReleaseMutex(hRemoteMutex); return true;}// Function name : ShmemLockedQueue::RemoveNext// Description : // Return type : bool // Argument : unsigned char *buffer// Argument : unsigned int *length// Argument : int *tag// Argument : int *frombool ShmemLockedQueue::RemoveNext( unsigned char *buffer, unsigned int *length, int *tag, int *from){ ShmemLockedQueueHeader *pTail, *pHead, *pMessage; // Get the next available entry in the queue while(true) { // Get the queue mutex lock(m_plQMutex); // Wait for the queue to not be empty while (SHMEM_Q_HEAD_OFFSET == 0) { unlock(m_plQMutex); if (m_bUseEvent) { if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) != WAIT_OBJECT_0) { printf("ShmemLockedQueue:RemoveNext:Wait for MsgAvailableEvent on an empty queue failed, error %d\n", GetLastError());fflush(stdout); return false; } } else wait(m_plMsgAvailableTrigger); lock(m_plQMutex); } // Search the queue for the next entry not being read by another thread pMessage = SHMEM_Q_HEAD_PTR; pTail = SHMEM_Q_TAIL_PTR; while ((pMessage->state == SHMEM_Q_BEING_READ) && (pMessage < pTail)) pMessage = (ShmemLockedQueueHeader*) ((LPBYTE)pMessage + pMessage->next_offset); // If we haven't reached the tail and the element we are on is not // currently being written, then successfully break out of this loop if ( (pMessage < pTail) && (pMessage->state != SHMEM_Q_BEING_WRITTEN) ) break; // All messages are being read or the next message in order is // not ready. I need to reset MsgAvailableEvent, wait for it to be // signalled and then start over if (m_bUseEvent) { ResetEvent(m_hMsgAvailableEvent); unlock(m_plQMutex); if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) != WAIT_OBJECT_0) { printf("ShmemLockedQueue:RemoveNext:Wait for MsgAvailableEvent failed, error %d\n", GetLastError());fflush(stdout); return false; } } else { resetevent(m_plMsgAvailableTrigger); unlock(m_plQMutex); wait(m_plMsgAvailableTrigger); } } // Check that the buffer provided is large enough to hold the data if (pMessage->length > *length) { printf("ShmemLockedQueue:RemoveNext:shmem message length %d > %d user buffer length\n", pMessage->length, *length); unlock(m_plQMutex); return false; } // Mark the message as being read pMessage->state = SHMEM_Q_BEING_READ; unlock(m_plQMutex); // Read the data from the message *tag = pMessage->tag; *from = pMessage->from; *length = pMessage->length; memcpy(buffer, (LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), pMessage->length); lock(m_plQMutex); // Mark the message as having been read pMessage->state = SHMEM_Q_READ; // Update the head and tail pointers of the queue pHead = SHMEM_Q_HEAD_PTR; pTail = SHMEM_Q_TAIL_PTR; // Advance the head pointer over all the read messages while ( (pHead < pTail) && (pHead->state == SHMEM_Q_READ) ) pHead = (ShmemLockedQueueHeader*)((LPBYTE)pHead + pHead->next_offset); if (pHead >= pTail) { // When the head catches up to the tail, // the queue is empty so reset the pointers and signal the queue empty SHMEM_Q_HEAD_OFFSET = 0; SHMEM_Q_TAIL_OFFSET = 2*sizeof(unsigned long); if (m_bUseEvent) ResetEvent(m_hMsgAvailableEvent); else resetevent(m_plMsgAvailableTrigger); setevent(m_plQEmptyTrigger); } else SHMEM_Q_HEAD_OFFSET = (unsigned long)((LPBYTE)pHead - (LPBYTE)m_pBase); unlock(m_plQMutex); return true;}// Function name : ShmemLockedQueue::RemoveNextInsert// Description : // Return type : bool // Argument : MessageQueue *pMsgQueuebool ShmemLockedQueue::RemoveNextInsert(MessageQueue *pMsgQueue, bool bBlocking){ ShmemLockedQueueHeader *pTail, *pHead, *pMessage; if (bBlocking) { // Get the next available entry in the queue while(true) { // Get the queue mutex lock(m_plQMutex); // Wait for the queue to not be empty while (SHMEM_Q_HEAD_OFFSET == 0) { unlock(m_plQMutex); if (m_bUseEvent) { if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) != WAIT_OBJECT_0) { printf("ShmemLockedQueue:RemoveNextInsert:Wait for MsgAvailableEvent on an empty queue failed, error %d\n", GetLastError());fflush(stdout); return false; } } else wait(m_plMsgAvailableTrigger); lock(m_plQMutex); } // Search the queue for the next available entry pMessage = SHMEM_Q_HEAD_PTR; pTail = SHMEM_Q_TAIL_PTR; while ((pMessage->state == SHMEM_Q_BEING_READ) && (pMessage < pTail)) pMessage = (ShmemLockedQueueHeader*) ((LPBYTE)pMessage + pMessage->next_offset); // If we haven't reached the tail and the element we are on is not // currently being written, then successfully break out of this loop if ( (pMessage < pTail) && (pMessage->state != SHMEM_Q_BEING_WRITTEN) ) break; // All messages are being read or the next message in order is not // ready. I need to reset MsgAvailableEvent, wait for it to be // signalled and then start over if (m_bUseEvent) { ResetEvent(m_hMsgAvailableEvent); unlock(m_plQMutex); if (WaitForSingleObject(m_hMsgAvailableEvent, INFINITE) != WAIT_OBJECT_0) { printf("ShmemLockedQueue:RemoveNextInsert:Wait for MsgAvailableEvent failed, error %d\n", GetLastError());fflush(stdout); return false; } } else { resetevent(m_plMsgAvailableTrigger); unlock(m_plQMutex); wait(m_plMsgAvailableTrigger); } } } else { // Try to get the next available entry in the queue // Get the queue mutex lock(m_plQMutex); // Wait for the queue to not be empty if (SHMEM_Q_HEAD_OFFSET == 0) { unlock(m_plQMutex); return false; } // Search the queue for the next available entry pMessage = SHMEM_Q_HEAD_PTR; pTail = SHMEM_Q_TAIL_PTR; while ((pMessage->state == SHMEM_Q_BEING_READ) && (pMessage < pTail)) pMessage = (ShmemLockedQueueHeader*) ((LPBYTE)pMessage + pMessage->next_offset); // If we haven't reached the tail and the element we are on is not // currently being written, then successfully break out of this loop if ( (pMessage >= pTail) || (pMessage->state == SHMEM_Q_BEING_WRITTEN) ) { // All messages are being read or the next message in order is not // ready. I need to reset MsgAvailableEvent, wait for it to be // signalled and then start over if (m_bUseEvent) { ResetEvent(m_hMsgAvailableEvent); unlock(m_plQMutex); return false; } else { resetevent(m_plMsgAvailableTrigger); unlock(m_plQMutex); return false; } } } MessageQueue::MsgQueueElement *pElement; if (pMessage->from == -1) { unlock(m_plQMutex); return false; } if (pMessage->state == SHMEM_Q_SHP_AVAIL_FOR_READ) { shpData data; memcpy(&data, (LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), sizeof(shpData)); void *pLocal = g_MsgQueue.GetBufferToFill(pMessage->tag, data.length, pMessage->from, &pElement); if (!ReadProcessMemory( g_hProcesses[pMessage->from], data.address, pLocal, data.length , NULL)) //nt_error("Unable to read remote memory", pMessage->from); MakeErrMsg(GetLastError(), "Unable to read remote memory in process %d", pMessage->from); SetEvent(g_hShpSendCompleteEvent[g_nIproc]); g_MsgQueue.SetElementEvent(pElement); } else { void *pBuffer = pMsgQueue->GetBufferToFill( pMessage->tag, pMessage->length, pMessage->from, &pElement); // Mark the message as being read pMessage->state = SHMEM_Q_BEING_READ; unlock(m_plQMutex); // Read the data from the message memcpy(pBuffer, (LPBYTE)pMessage + sizeof(ShmemLockedQueueHeader), pMessage->length); pMsgQueue->SetElementEvent(pElement); lock(m_plQMutex); } // Mark the message as having been read pMessage->state = SHMEM_Q_READ; // Update the head and tail pointers of the queue pHead = SHMEM_Q_HEAD_PTR; pTail = SHMEM_Q_TAIL_PTR; // Advance the head pointer over all the read messages while ( (pHead < pTail) && (pHead->state == SHMEM_Q_READ) ) pHead = (ShmemLockedQueueHeader*)((LPBYTE)pHead + pHead->next_offset); if (pHead >= pTail) { // When the head catches up to the tail, the queue is empty so reset // the pointers and signal the queue empty SHMEM_Q_HEAD_OFFSET = 0; SHMEM_Q_TAIL_OFFSET = 2*sizeof(unsigned long); if (m_bUseEvent) ResetEvent(m_hMsgAvailableEvent); else resetevent(m_plMsgAvailableTrigger); setevent(m_plQEmptyTrigger); } else SHMEM_Q_HEAD_OFFSET = (unsigned long)((LPBYTE)pHead - (LPBYTE)m_pBase); unlock(m_plQMutex); return true;}// Function name : ShmemLockedQueue::SetProgressFunction// Description : // Return type : void // Argument : void (*ProgressPollFunctionvoid ShmemLockedQueue::SetProgressFunction(void (*ProgressPollFunction)()){ m_pProgressPollFunction = ProgressPollFunction; //printf("ShmemLockedQueue::SetProgressFunction called\n");fflush(stdout);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -