📄 nt_smp.cpp
字号:
#include "nt_global_cpp.h"#include "ShmemLockedQueue.h"#include "parsecliques.h"#include <stdlib.h>// Shared memory stuffint g_ShmemQSize = 1024*1024;//ShmemQueue **g_pShmemQueue = NULL;ShmemLockedQueue **g_pShmemQueue = NULL;int g_nMaxShmSendSize = 1024*15;HANDLE g_hShmRecvThread = NULL;int g_nNumShemQueues = 0;// Shared process stuffHANDLE *g_hShpMutex = NULL, *g_hShpSendCompleteEvent = NULL;HANDLE *g_hProcesses = NULL;// Function name : ShmRecvThread// Description : // Return type : void // Argument : ShmemQueue *pShmemQueuevoid ShmRecvThread(ShmemLockedQueue *pShmemLockedQueue){ while (true) { if (!pShmemLockedQueue->RemoveNextInsert(&g_MsgQueue)) ExitThread(0); } }// Function name : PollShmemQueue// Description : // Return type : void void PollShmemQueue(){ //* if (!g_pShmemQueue[g_nIproc]->RemoveNextInsert(&g_MsgQueue, false)) Sleep(0); /*/ for (int i=0; i<10; i++) { if (g_pShmemQueue[g_nIproc]->RemoveNextInsert(&g_MsgQueue, false)) return; } Sleep(0); //*/}// Function name : GetShmemClique// Description : Determine which processes this process can reach through shared memory// Return type : void int GetShmemClique(){ int nSMPLow, nSMPHigh; int nCount = 0; int i; char pszTemp[100]; for (i=0; i<g_nNproc; i++) g_pProcTable[i].shm = 0; try{ if (GetEnvironmentVariable("MPICH_SHM_CLICKS", pszTemp, 100) || GetEnvironmentVariable("MPICH_SHM_CLIQUES", pszTemp, 100)) { int *pMembers = NULL; if (ParseCliques(pszTemp, g_nIproc, g_nNproc, &nCount, &pMembers)) { nt_error("Unable to parse the SHM cliques", 1); return 0; } for (i=0; i<nCount; i++) { if ( (pMembers[i] >= 0) && (pMembers[i] < g_nNproc) ) { //printf("rank %d reachable by shared memory\n", pMembers[i]);fflush(stdout); g_pProcTable[pMembers[i]].shm = 1; } } if (pMembers != NULL) delete pMembers; } else { char pszSMPLow[10]="", pszSMPHigh[10]=""; if (GetEnvironmentVariable("MPICH_SHM_LOW", pszSMPLow, 10)) nSMPLow = atoi(pszSMPLow); else nSMPLow = g_nIproc; if (GetEnvironmentVariable("MPICH_SHM_HIGH", pszSMPHigh, 10)) nSMPHigh = atoi(pszSMPHigh); else nSMPHigh = g_nIproc; for (i=nSMPLow; i<=nSMPHigh; i++) { if ( i >= 0 && i < g_nNproc ) g_pProcTable[i].shm = 1; } nCount = (nSMPHigh - nSMPLow) + 1; } }catch(...) { nt_error("exception caught in GetShmemClique\n", 1); return 0; } return nCount;}// Function name : InitSMP// Description : // Return type : void void InitSMP(){ int i; char nameBuffer[256], pszTemp[100], pszSMPLow[10]="", pszSMPHigh[10]=""; g_nNumShemQueues = GetShmemClique(); if (g_nNumShemQueues < 2) return; // Initialize shared memory stuff if (GetEnvironmentVariable("MPICH_MAXSHMMSG", pszTemp, 100)) { g_nMaxShmSendSize = atoi(pszTemp); if (g_nMaxShmSendSize < 0) g_nMaxShmSendSize = 0; } if (GetEnvironmentVariable("MPICH_SHMQSIZE", pszTemp, 100)) { g_ShmemQSize = atoi(pszTemp); if (g_ShmemQSize < g_nMaxShmSendSize) g_ShmemQSize = g_nMaxShmSendSize; } g_pShmemQueue = new ShmemLockedQueue*[g_nNproc]; for (i=0; i<g_nNproc; i++) g_pShmemQueue[i] = NULL; for (i=0; i<g_nNproc; i++) { if (g_pProcTable[i].shm == 1) { /*printf("initializing shmem queue %d\n", i);fflush(stdout);*/ g_pShmemQueue[i] = new ShmemLockedQueue; sprintf(nameBuffer, "%s.shm%d", g_pszJobID, i); if (!g_pShmemQueue[i]->Init(nameBuffer, g_ShmemQSize)) nt_error("unable to initialize ShmemQueue", i); } } // Initialize shared process stuff g_hShpMutex = new HANDLE[g_nNproc]; g_hShpSendCompleteEvent = new HANDLE[g_nNproc]; g_hProcesses = new HANDLE[g_nNproc]; // Create all the named events and mutexes for (i=0; i<g_nNproc; i++) { if (g_pProcTable[i].shm == 1) { char pBuffer[100]; sprintf(pBuffer, "%s.shp%dMutex", g_pszJobID, i); g_hShpMutex[i] = CreateMutex(NULL, FALSE, pBuffer); if (g_hShpMutex[i] == NULL) MakeErrMsg(GetLastError(), "InitSMP: CreateMutex failed for g_hShmMutex[%d]", i); sprintf(pBuffer, "%s.shp%dSendComplete", g_pszJobID, i); g_hShpSendCompleteEvent[i] = CreateEvent(NULL, TRUE, FALSE, pBuffer); if (g_hShpSendCompleteEvent[i] == NULL) MakeErrMsg(GetLastError(), "InitSMP: CreateEvent failed for g_hShpSendCompleteEvent[%d]", i); } } unsigned long pid = GetCurrentProcessId(); // Send my information to the other processes for (i=0; i<g_nNproc; i++) { if (i != g_nIproc && g_pProcTable[i].shm == 1) { if (!g_pShmemQueue[i]->Insert((unsigned char *)&pid, sizeof(unsigned long), 0, g_nIproc)) nt_error("InitSMP: Unable to send pid info to remote process", i); } } // Get the information about the other processes for (i=0; i<g_nNproc; i++) { if (i != g_nIproc && g_pProcTable[i].shm == 1) { int tag, from; unsigned int length = sizeof(unsigned long); if (!g_pShmemQueue[g_nIproc]->RemoveNext((unsigned char *)&pid, &length, &tag, &from)) nt_error("InitSMP: Unable to receive pid information from remote processes", 0); /*printf("received pid %d from shmem queue %d\n", pid, from);fflush(stdout);*/ g_hProcesses[from] = OpenProcess(STANDARD_RIGHTS_REQUIRED | PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION, FALSE, pid); } } pszTemp[0] = '\0'; GetEnvironmentVariable("MPICH_SHM_SINGLETHREAD", pszTemp, 100); if (pszTemp[0] == '1') { // Set the poll function so the shmem device will run single threaded. g_MsgQueue.SetProgressFunction(PollShmemQueue); //g_pShmemQueue[g_nIproc]->SetProgressFunction(PollShmemQueue); } else { // Start the shared memory receive thread DWORD dwThreadID; g_hShmRecvThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ShmRecvThread, g_pShmemQueue[g_nIproc], NT_THREAD_STACK_SIZE, &dwThreadID); if (g_hShmRecvThread == NULL) nt_error("InitSMP: Unable to create ShmRecvThread", 0); }}// Function name : EndSMP// Description : // Return type : void void EndSMP(){ int i; if (g_nNumShemQueues < 2) return; if (g_hShmRecvThread != NULL) { // Signal the shm thread to exit g_pShmemQueue[g_nIproc]->Insert(NULL, 0, 0, -1); WaitForSingleObject(g_hShmRecvThread, 5000); CloseHandle(g_hShmRecvThread); g_hShmRecvThread = NULL; } // Delete the shared memory queues for (i=0; i<g_nNproc; i++) { if (g_pProcTable[i].shm == 1) delete g_pShmemQueue[i]; } delete g_pShmemQueue; // Delete all the shared process stuff // Delete all the named events and mutexes for (i=0; i<g_nNproc; i++) { if (g_pProcTable[i].shm == 1) { CloseHandle(g_hShpMutex[i]); CloseHandle(g_hShpSendCompleteEvent[i]); CloseHandle(g_hProcesses[i]); } } delete g_hShpMutex; delete g_hShpSendCompleteEvent; delete g_hProcesses; g_hShpMutex = NULL; g_hShpSendCompleteEvent = NULL; g_hProcesses = NULL;}// Function name : NT_ShmSend// Description : // Return type : void // Argument : int type// Argument : void *buffer// Argument : int length// Argument : int tovoid NT_ShmSend(int type, void *buffer, int length, int to){ if (length > g_nMaxShmSendSize) { // Shared process send if (!g_pShmemQueue[to]->InsertSHP((unsigned char *)buffer, length, type, g_nIproc, g_hShpMutex[to], g_hShpSendCompleteEvent[to], g_pShmemQueue[g_nIproc])) { nt_error("shared process send failed", to); } } else { // Shared memory send if (!g_pShmemQueue[to]->Insert((unsigned char *)buffer, length, type, g_nIproc)) { nt_error("shared memory send failed", to); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -