📄 nt_ipvishm_priv.cpp
字号:
if (strlen(g_pszHostName) < 1) { if (gethostname(g_pszHostName, NT_HOSTNAME_LEN) == SOCKET_ERROR) { err = WSAGetLastError(); if (err == WSAEFAULT) AbortInit(err, "Cannot handle hostnames longer than 100 characters"); else AbortInit(err, "gethostname failed"); } // Convert the host name to an ip string to make connection establishment more robust NT_Tcp_get_ip_string(g_pszHostName, g_pszHostName); } if (g_bUseBNR) { char pBuffer[100]; if (MPID_MyWorldRank == 0) BNR_Put(g_myBNRgroup, "MPICH_ROOT", g_pszHostName, -1); BNR_Fence(g_myBNRgroup); BNR_Get(g_myBNRgroup, "MPICH_ROOT", pBuffer); SetEnvironmentVariable("MPICH_ROOT", pBuffer); // Remove this line later sprintf(pszIproc, "%d", MPID_MyWorldRank); } // Read in the variables passed in the environment if (GetEnvironmentVariable("MPICH_JOBID", g_pszJobID, 100) == 0) g_bMPIRunUsed = false; g_bUseDatabase = false; if (GetEnvironmentVariable("MPICH_DBS", pszTemp, 100)) { char *token; token = strtok(pszTemp, ":"); if (token != NULL) strcpy(pszDBSHost, token); token = strtok(NULL, " \n"); if (token != NULL) strcpy(pszDBSPort, token); g_bUseDatabase = true; } else { if ( GetEnvironmentVariable("MPICH_DBS_HOST", pszDBSHost, 100) && GetEnvironmentVariable("MPICH_DBS_PORT", pszDBSPort, 10) ) { g_bUseDatabase = true; } } if (g_bUseDatabase) { char pszEnv[1024]; int length = 1024; g_Database.SetID(g_pszJobID); g_Database.Init(); if (GetEnvironmentVariable("MPICH_IPROC", pszIproc, 10) == 0) { // If there is no iproc variable in the environment then get a // generic environment from the dbs server. g_Database.Get("env", pszEnv, &length); SetEnvironmentString(pszEnv); GetEnvironmentVariable("MPICH_IPROC", pszIproc, 10); } else { if (GetEnvironmentVariable("MPICH_NPROC", pszNproc, 10) == 0) { // If there is an iproc but no nproc envrionment variable // then get the environment specific to this process from // the dbs server. char pszEnvKey[100]; sprintf(pszEnvKey, "env%d", atoi(pszIproc)); g_Database.Get(pszEnvKey, pszEnv, &length); SetEnvironmentString(pszEnv); } // If there is an iproc and nproc environment variable then get // nothing from the dbs server. } } else { if (GetEnvironmentVariable("MPICH_IPROC", pszIproc, 10) == 0) { // If an application is run without MPIRun then it is the first // and only process strcpy(pszIproc, "0"); g_bMPIRunUsed = false; } if (GetEnvironmentVariable("MPICH_ROOT", pszTemp, 100)) { char *token; token = strtok(pszTemp, ":"); if (token != NULL) strcpy(g_pszRootHostName, token); token = strtok(NULL, " \n"); if (token != NULL) strcpy(pszRootPort, token); } else { if (GetEnvironmentVariable("MPICH_ROOTHOST", g_pszRootHostName, 100) == 0) { unsigned long size = 100; GetComputerName(g_pszRootHostName, &size); g_bMPIRunUsed = false; } if (GetEnvironmentVariable("MPICH_ROOTPORT", pszRootPort, 10) == 0) { strcpy(pszRootPort, "-1"); g_bMPIRunUsed = false; } } g_nRootPort = atoi(pszRootPort); GetEnvironmentVariable("MPICH_EXTRA", pszExtra, 100); } if (GetEnvironmentVariable("MPICH_NPROC", pszNproc, 10) == 0) { // If an application is run without MPIRun then it is the only process strcpy(pszNproc, "1"); g_bMPIRunUsed = false; } if (GetEnvironmentVariable("MPICH_NUMCOMMPORTS", pszTemp, 100)) g_NumCommPortThreads = atoi(pszTemp); if (GetEnvironmentVariable("MPICH_NOCOMMPORT", pszTemp, 100)) bCommPortAvailable = false; MPID_MyWorldRank = g_nIproc = atoi(pszIproc); MPID_MyWorldSize = g_nNproc = atoi(pszNproc); // Save the high performance counter frequency QueryPerformanceFrequency(&g_nPerfFrequency); if (g_nIproc == 0) ClearLog(); if (g_nNproc < 1) AbortInit(1, "Invalid number of processes: %d", g_nNproc); g_pProcTable = new NT_ipvishm_ProcEntry[g_nNproc]; if (g_pProcTable == NULL) AbortInit(1, "Unable to allocate memory for the proc table in MPID_Init"); for (i=0; i<g_nNproc; i++) { g_pProcTable[i].exename[0] = '\0'; g_pProcTable[i].host[0] = '\0'; g_pProcTable[i].listen_port = 0; g_pProcTable[i].control_port = 0; g_pProcTable[i].pid = 0; g_pProcTable[i].sock = INVALID_SOCKET; g_pProcTable[i].sock_event = NULL; g_pProcTable[i].hConnectLock = NULL; g_pProcTable[i].hValidDataEvent = CreateEvent(NULL, TRUE, FALSE, NULL); g_pProcTable[i].shm = 0; g_pProcTable[i].via = 0; g_pProcTable[i].msg.ovl.hEvent = NULL; g_pProcTable[i].msg.state = NT_MSG_READING_TAG; g_pProcTable[i].multinic = FALSE; g_pProcTable[i].num_nics = 0; } g_pProcTable[g_nIproc].num_nics = GetLocalIPs(g_pProcTable[g_nIproc].nic_ip, MAX_NUM_NICS); if (g_pProcTable[g_nIproc].num_nics > 1) g_pProcTable[g_nIproc].multinic = TRUE; char pszNetMask[50]; if (GetEnvironmentVariable("MPICH_NETMASK", pszNetMask, 50)) { char *token = strtok(pszNetMask, "/"); if (token != NULL) { token = strtok(NULL, "\n"); if (token != NULL) { g_nNicNet = GetIP(pszNetMask); g_nNicMask = GetMask(token); g_bMultinic = true; } } } else { g_nNicNet = 0; g_nNicMask = 0; g_bMultinic = false; } bool bFixedPortUsed = false; if (g_nRootPort > 0 && g_nIproc == 0) { g_pProcTable[0].control_port = g_nRootPort; // If a specific port was provided through the environment then // don't write the port out to a file. bFixedPortUsed = true; } // The executable name is the full path to the executable HMODULE hModule = GetModuleHandle(NULL); if (!GetModuleFileName(hModule, g_pProcTable[g_nIproc].exename, NT_EXENAME_LEN)) strcpy(g_pProcTable[g_nIproc].exename, "unknown.exe"); strcpy(g_pProcTable[g_nIproc].host, g_pszHostName); g_pProcTable[g_nIproc].pid = (long)GetCurrentProcessId(); // If all the processes can reach each other through shared memory then there is // no need to create the socket completion port threads. int nNumShmQueues = GetShmemClique(); if (nNumShmQueues == g_nNproc) bCommPortAvailable = false; /* // If all the processes can reach each other through VI's then there is // no need to create the socket completion port threads. GetViClique(); if (everyonecantalkvi) bCommPortAvailable = false; //*/ if (bCommPortAvailable) // If there is no completion port available (Win9x) then socket communication is not available { DWORD dwThreadID; HANDLE hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); // Start the communication thread hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); if (hReadyEvent == NULL) AbortInit(1, "Unable to create an event in MPID_Init"); g_hCommPortThread = CreateThread( NULL, 0, (LPTHREAD_START_ROUTINE) CommPortThread, hReadyEvent, NT_THREAD_STACK_SIZE, &dwThreadID); if (g_hCommPortThread == NULL) AbortInit(GetLastError(), "Unable to spawn CommPortThread"); if (WaitForSingleObject(hReadyEvent, MPICH_SHORT_TIMEOUT) == WAIT_TIMEOUT) AbortInit(1, "Communication thread setup timed out"); CloseHandle(hReadyEvent); } else g_hCommPortThread = NULL; if (g_bUseBNR) { char pszKey[100], pszValue[MAX_PATH]; sprintf(pszKey, "ListenPort%d", g_nIproc); sprintf(pszValue, "%d", g_pProcTable[g_nIproc].listen_port); BNR_Put(g_myBNRgroup, pszKey, pszValue, -1); sprintf(pszKey, "ListenHost%d", g_nIproc); strcpy(pszValue, g_pProcTable[g_nIproc].host); BNR_Put(g_myBNRgroup, pszKey, pszValue, -1); sprintf(pszKey, "Executable%d", g_nIproc); strcpy(pszValue, g_pProcTable[g_nIproc].exename); BNR_Put(g_myBNRgroup, pszKey, pszValue, -1); sprintf(pszKey, "pid%d", g_nIproc); sprintf(pszValue, "%d", g_pProcTable[g_nIproc].pid); BNR_Put(g_myBNRgroup, pszKey, pszValue, -1); // Put anything for VI ??? } else if (g_bUseDatabase) { char pszKey[100], pszValue[MAX_PATH]; sprintf(pszKey, "ListenPort%d", g_nIproc); sprintf(pszValue, "%d", g_pProcTable[g_nIproc].listen_port); g_Database.Put(pszKey, pszValue, strlen(pszValue)+1); sprintf(pszKey, "ListenHost%d", g_nIproc); strcpy(pszValue, g_pProcTable[g_nIproc].host); g_Database.Put(pszKey, pszValue, strlen(pszValue)+1); sprintf(pszKey, "Executable%d", g_nIproc); strcpy(pszValue, g_pProcTable[g_nIproc].exename); g_Database.Put(pszKey, pszValue, strlen(pszValue)+1); sprintf(pszKey, "pid%d", g_nIproc); sprintf(pszValue, "%d", g_pProcTable[g_nIproc].pid); g_Database.Put(pszKey, pszValue, strlen(pszValue)+1); //g_Database.Put(pszKey, &g_pProcTable[g_nIproc].pid, sizeof(int)); // Put anything for VI ??? } else { DWORD dwThreadID; HANDLE hReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL); // Start the control thread ResetEvent(hReadyEvent); g_hControlLoopThread = CreateThread( NULL, 0, (LPTHREAD_START_ROUTINE) ControlLoopThread, hReadyEvent, NT_THREAD_STACK_SIZE, &dwThreadID); if (g_hControlLoopThread == NULL) AbortInit(GetLastError(), "Unable to spawn ControlLoopThread"); if (WaitForSingleObject(hReadyEvent, MPICH_SHORT_TIMEOUT) == WAIT_TIMEOUT) AbortInit(1, "Control thread setup timed out"); if (g_nIproc == 0) { ResetEvent(hReadyEvent); // Why do I use a global variable instead of just using // g_pProcTable[0].control_port? g_nRootPort = g_pProcTable[0].control_port; if (g_bMPIRunUsed && !bFixedPortUsed) { if (strnicmp(pszExtra, "shm:", 4) == 0) { // Write the port number to the temporary memory mapped file // described by pszExtra HANDLE hMapping; LONG *pMapping; SECURITY_ATTRIBUTES saAttr; saAttr.nLength = sizeof(SECURITY_ATTRIBUTES); saAttr.lpSecurityDescriptor = NULL; saAttr.bInheritHandle = FALSE; // Create a mapping from the page file hMapping = CreateFileMapping( INVALID_HANDLE_VALUE, &saAttr, //NULL, PAGE_READWRITE, 0, sizeof(LONG), &pszExtra[4]); if (hMapping == NULL) AbortInit(GetLastError(), "Unable to create a memory mapping for inter-process communication"); if (GetLastError() != ERROR_ALREADY_EXISTS) AbortInit(1, "MPIRun has not created the memory mapping to place the root port number in"); // Map the file and save the pointer to the base of the mapped file pMapping = (LONG *)MapViewOfFile( hMapping, FILE_MAP_WRITE, 0,0, sizeof(LONG)); if (pMapping == NULL) AbortInit(GetLastError(), "Unable to memory map the view of the ipc file"); // Write the listening port to the ipc shared memory file *pMapping = g_pProcTable[0].control_port; // Wait for the launcher to read the data before closing the mapping while (*pMapping != 0) Sleep(200); UnmapViewOfFile(pMapping); CloseHandle(hMapping); } else if (strnicmp(pszExtra, "mpd:", 4) == 0) { // use mpd to get the root port back to mpirun if (!PutRootPortInMPDDatabase(&pszExtra[4], g_pProcTable[0].control_port, g_pszJobID)) AbortInit(-1, "Unable to put the root listening port in the mpd database"); g_bMPDFinalize = true; } else { // Write the port number to the temporary file // described by pszExtra char str[100]; DWORD num_written; HANDLE hFile = CreateFile(pszExtra, GENERIC_WRITE, FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL); if (hFile == INVALID_HANDLE_VALUE) { int error = GetLastError(); LogMsg("CreateFile failed: error %d, file '%s'\n", error, pszExtra); AbortInit(error, "CreateFile failed: %s", pszExtra); } sprintf(str, "%d\n", g_pProcTable[0].control_port); if (!WriteFile(hFile, str, strlen(str)+2, &num_written, NULL)) { int error = GetLastError(); LogMsg("WriteFile failed of the root control port, Error: %d", error); CloseHandle(hFile); AbortInit(error, "WriteFile(%s, root port) failed", pszExtra); } CloseHandle(hFile); } } SendInitDataToRoot(); // Wait for g_hControlLoop(Client)Thread(s) to signal that all // processes have connected. For now, this is set to INFINITE. // What value would be appropriate to give the processes time to // launch and connect back to the root? WaitForSingleObject(g_hEveryoneConnectedEvent, INFINITE); } else { if (strnicmp(pszExtra, "mpd:", 4) == 0) { if (!ParseMPDString(&pszExtra[4])) AbortInit(-1, "Unable to parse the mpd host and port\n"); easy_socket_init(); g_bMPDFinalize = true; } // Send the root process or the database server information so it // can inform other processes how to connect to this process SendInitDataToRoot(); } CloseHandle(hReadyEvent); } pszTemp[0] = '\0'; GetEnvironmentVariable("MPICH_SINGLETHREAD", pszTemp, 100); if (pszTemp[0] == '1') { SetEnvironmentVariable("MPICH_SHM_SINGLETHREAD", "1"); SetEnvironmentVariable("MPICH_VI_SINGLETHREAD", "1"); } // Initialize the shared memory stuff try { InitSMP(); }catch(...) { nt_error("exception thrown in InitSMP caught in Init", 1); } try{ // Initialize the VIA stuff if (InitVI()) { pszTemp[0] = '\0'; pszTemp[1] = '\0'; GetEnvironmentVariable("MPICH_SHM_SINGLETHREAD", pszTemp, 100); GetEnvironmentVariable("MPICH_VI_SINGLETHREAD", &pszTemp[1], 99); if (pszTemp[0] == '1' && pszTemp[1] == '1' && g_pShmemQueue) g_MsgQueue.SetProgressFunction(PollShmemAndViQueues); } }catch(...) { nt_error("exception thrown in InitVi caught in Init", 1); } }catch(...) { nt_error("Exception caught in Init.", 1); }}// Function name : MPID_NT_ipvishm_End// Description : Finishes any outstanding IO, closes all connections, and waits for // everyone else to finish?// Return type : void void MPID_NT_ipvishm_End(){ //DPRINTF(("MPID_NT_ipvishm_End() called.\n")); g_bInNT_ipvishm_End = true; if (g_bMPDFinalize) UpdateMPIFinalizedInMPD(); if (g_nNproc > 1) { if (g_bUseBNR) { char pKey[100], pValue[100]; sprintf(pKey, "InDone%d", g_nIproc); BNR_Put(g_myBNRgroup, pKey, "yes", 0); BNR_Fence(g_myBNRgroup); if (g_nIproc == 0) { for (int i=0; i<g_nNproc; i++) { sprintf(pKey, "InDone%d", i); BNR_Get(g_myBNRgroup, pKey, pValue); } BNR_Put(g_myBNRgroup, "AllDone", "yes", -1); } BNR_Fence(g_myBNRgroup); BNR_Get(g_myBNRgroup, "AllDone", pValue); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -