📄 messagequeue.cpp
字号:
// MessageQueue.cpp: implementation of the MessageQueue class.////////////////////////////////////////////////////////////////////////#include "MessageQueue.h"#include <stdio.h>//////////////////////////////////////////////////////////////////////// Construction/Destruction//////////////////////////////////////////////////////////////////////// Function name : MessageQueue::MessageQueue// Description : // Return type : MessageQueue::MessageQueue(){ int i; m_pHead = new MessageQueue::InternalNode[MSGQ_INITIALNUMNODES]; for (i=0; i<MSGQ_INITIALNUMNODES; i++) { m_pHead[i].available = TRUE; m_pHead[i].nextfree = &m_pHead[i+1]; } m_pHead[MSGQ_INITIALNUMNODES-1].nextfree = NULL; m_pNextAvailable = &m_pHead[1]; m_pAllocList = NULL; // Insert a node for 0 because I know it is the most common tag m_pHead->available = FALSE; m_pHead->link.tag = 0; m_pHead->link.list = NULL; m_pHead->link.posted = NULL; m_pHead->link.next = NULL; m_num_elements = MSGQ_ELEMENT_BLOCK; m_num_available = MSGQ_ELEMENT_BLOCK; m_cur_index = 0; m_pool_size = 1; char pszTemp[100]; if (GetEnvironmentVariable("MPICH_USE_POLLING", pszTemp, 100)) m_bUseEvent = false; else m_bUseEvent = true; m_pPool = new MsgQueueElement*; m_pPool[0] = new MsgQueueElement[MSGQ_ELEMENT_BLOCK]; for (i=0; i<MSGQ_ELEMENT_BLOCK; i++) { InitElement(&m_pPool[0][i]); } // Why is the following function protected in the header file by (_WIN32_WINNT >= 0x0500) // when the help files claim that they are implemented in NT 4? //InitializeCriticalSectionAndSpinCount(&m_CriticalSection, 4000); InitializeCriticalSection(&m_CriticalSection); m_pProgressPollFunction = NULL; m_nGCCount = 0; m_nGCMax = 10;}// Function name : MessageQueue::~MessageQueue// Description : // Return type : MessageQueue::~MessageQueue(){ int i; for (i=0; i<m_num_elements; i++) CloseElement(&m_pPool[i/MSGQ_ELEMENT_BLOCK][i%MSGQ_ELEMENT_BLOCK]); m_num_elements = 0; m_num_available = 0; for (i=0; i<m_pool_size; i++) delete m_pPool[i]; delete m_pPool; DeleteCriticalSection(&m_CriticalSection); delete m_pHead; m_pHead = NULL; AllocatedNode *pAllocNode; while (m_pAllocList) { delete m_pAllocList->pBuffer; pAllocNode = m_pAllocList; m_pAllocList = m_pAllocList->pNext; delete pAllocNode; }}// Function name : MessageQueue::allocElement// Description : // Return type : MessageQueue::MsgQueueElement* MessageQueue::MsgQueueElement* MessageQueue::allocElement(){ // Check to see if the allocated array needs to be expanded if (m_num_available == 0) { int i; MessageQueue::MsgQueueElement **p = new MessageQueue::MsgQueueElement*[m_pool_size+1]; for (i=0; i<m_pool_size; i++) p[i] = m_pPool[i]; p[m_pool_size] = new MessageQueue::MsgQueueElement[MSGQ_ELEMENT_BLOCK]; for (i=0; i<MSGQ_ELEMENT_BLOCK; i++) { InitElement(&p[m_pool_size][i]); } m_pool_size++; delete m_pPool; m_pPool = p; m_num_elements += MSGQ_ELEMENT_BLOCK; m_num_available = MSGQ_ELEMENT_BLOCK; } // Find the next un-used element while (m_pPool[m_cur_index/MSGQ_ELEMENT_BLOCK][m_cur_index%MSGQ_ELEMENT_BLOCK].bIn_use) ++m_cur_index %= m_num_elements; // Mark it as used m_pPool[m_cur_index/MSGQ_ELEMENT_BLOCK][m_cur_index%MSGQ_ELEMENT_BLOCK].bIn_use = true; m_num_available--; // Return it return &m_pPool[m_cur_index/MSGQ_ELEMENT_BLOCK][m_cur_index%MSGQ_ELEMENT_BLOCK];}// Function name : MessageQueue::freeElement// Description : // Return type : void // Argument : MessageQueue::MsgQueueElement *elementvoid MessageQueue::freeElement(MessageQueue::MsgQueueElement *element){ EnterCriticalSection(&m_CriticalSection); element->bIn_use = false; ResetElementEvent(element); m_num_available++; LeaveCriticalSection(&m_CriticalSection);}// Function name : MessageQueue::AllocNode// Description : // Return type : MessageQueue::InternalNode * MessageQueue::InternalNode * MessageQueue::AllocNode(){ InternalNode *pNode; EnterCriticalSection(&m_CriticalSection); pNode = m_pNextAvailable; m_pNextAvailable = m_pNextAvailable->nextfree; if (m_pNextAvailable == NULL) { InternalNode *n = new InternalNode[MSGQ_INITIALNUMNODES]; for (int i=0; i<MSGQ_INITIALNUMNODES; i++) { n[i].available = TRUE; n[i].nextfree = &n[i+1]; } n[MSGQ_INITIALNUMNODES-1].nextfree = NULL; m_pNextAvailable = n; AllocatedNode *pANode = new AllocatedNode; pANode->pBuffer = n; pANode->pNext = m_pAllocList; m_pAllocList = pANode; } pNode->available = FALSE; LeaveCriticalSection(&m_CriticalSection); return pNode;}// Function name : MessageQueue::FreeNode// Description : // Return type : void // Argument : MessageQueue::InternalNode *pNodevoid MessageQueue::FreeNode(MessageQueue::InternalNode *pNode){ EnterCriticalSection(&m_CriticalSection); pNode->available = TRUE; pNode->nextfree = m_pNextAvailable; m_pNextAvailable = pNode; LeaveCriticalSection(&m_CriticalSection);}// Function name : MessageQueue::GarbageCollect// Description : // Return type : void void MessageQueue::GarbageCollect(){ EnterCriticalSection(&m_CriticalSection); InternalNode *pNode, *pTrailer; // Leave the first node even if it is empty. This node contains // tag 0 which is used for control messages. Control messages // occur frequently. It would be meaningless to remove this node // because it would just be re-allocated with the next message. // So start with the second node. pNode = m_pHead->link.next; pTrailer = m_pHead; while (pNode) { if ((pNode->link.posted == NULL) && (pNode->link.list == NULL)) { pTrailer->link.next = pNode->link.next; pNode->available = TRUE; pNode->nextfree = m_pNextAvailable; m_pNextAvailable = pNode; } else { pTrailer = pTrailer->link.next; } pNode = pNode->link.next; } m_nGCCount = 0; LeaveCriticalSection(&m_CriticalSection);}// Function name : MessageQueue::FindNode// Description : // Return type : void // Argument : MessageQueue::InternalNode *&nodevoid MessageQueue::FindNode(MessageQueue::InternalNode *&node){ if (node == NULL) { node = AllocNode(); node->link.list = AllocNode(); find_buffer = node->link.list->list.buffer = new char[find_length]; node->link.list->list.length = find_length; node->link.list->list.from = find_from; find_pElement = node->link.list->list.element = allocElement(); node->link.list->list.next = NULL; node->link.posted = NULL; node->link.next = NULL; node->link.tag = find_tag; m_nGCCount++; return; } InternalNode *pNode = node; while (true) { if (find_tag == pNode->link.tag) { if (pNode->link.posted != NULL) { find_buffer = pNode->link.posted->list.buffer; if (find_length > pNode->link.posted->list.length) { printf("MessageQueue:FindNode:Error - find_length: %d, pNode->link.posted->list.length: %d\n", find_length, pNode->link.posted->list.length);fflush(stdout); pNode->link.posted->list.length = -1; } else pNode->link.posted->list.length = find_length; pNode->link.posted->list.from = find_from; find_pElement = pNode->link.posted->list.element; pNode->link.posted = pNode->link.posted->list.next; return; } InternalNode *list = pNode->link.list; if (list == NULL) { pNode->link.list = AllocNode(); find_buffer = pNode->link.list->list.buffer = new char[find_length]; pNode->link.list->list.length = find_length; pNode->link.list->list.from = find_from; find_pElement = pNode->link.list->list.element = allocElement(); pNode->link.list->list.next = NULL; return; } while (list->list.next != NULL) list = list->list.next; list->list.next = AllocNode(); list = list->list.next; find_buffer = list->list.buffer = new char[find_length]; list->list.length = find_length; list->list.from = find_from; find_pElement = list->list.element = allocElement(); list->list.next = NULL; return; } if (pNode->link.next == NULL) { pNode->link.next = AllocNode(); pNode->link.next->link.list = AllocNode(); find_buffer = pNode->link.next->link.list->list.buffer = new char[find_length]; pNode->link.next->link.list->list.length = find_length; pNode->link.next->link.list->list.from = find_from; find_pElement = pNode->link.next->link.list->list.element = allocElement(); pNode->link.next->link.list->list.next = NULL; pNode->link.next->link.posted = NULL; pNode->link.next->link.next = NULL; pNode->link.next->link.tag = find_tag; m_nGCCount++; return; } pNode = pNode->link.next; }}// Function name : MessageQueue::GetBufferToFill// Description : // Return type : void* // Argument : int tag// Argument : int length// Argument : int from// Argument : MessageQueue::MsgQueueElement **ppElementvoid* MessageQueue::GetBufferToFill(int tag, int length, int from, MessageQueue::MsgQueueElement **ppElement){ void *ret_val; EnterCriticalSection(&m_CriticalSection); find_tag = tag; find_length = length; find_from = from; FindNode(m_pHead); *ppElement = find_pElement; ret_val = find_buffer; LeaveCriticalSection(&m_CriticalSection); if (m_nGCCount > m_nGCMax) GarbageCollect(); return ret_val;}// Function name : MessageQueue::FillFindNode// Description : // Return type : void // Argument : MessageQueue::InternalNode *&nodevoid MessageQueue::FillFindNode(MessageQueue::InternalNode *&node){ if (node == NULL) { node = AllocNode(); node->link.posted = AllocNode(); node->link.posted->list.buffer = find_buffer; node->link.posted->list.length = find_length; node->link.posted->list.from = -1; find_pElement = node->link.posted->list.element = allocElement();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -