📄 nt_ipvishm_priv.cpp
字号:
#include "mpid.h"#include "ShmemLockedQueue.h"#include "nt_global_cpp.h"#include <windows.h>#include <stdio.h>#include <time.h>#include "Database.h"#include "bnrfunctions.h"#include <stdlib.h>#include "mpdutil.h"#define MPICH_MPD_TIMEOUT 20#define MPICH_SHORT_TIMEOUT 15000#define MPICH_MEDIUM_TIMEOUT 30000// Global variablesint g_nLastRecvFrom = 0;int g_nIproc = 0;int g_nNproc = 0;unsigned int g_nNicMask = 0;unsigned int g_nNicNet = 0;bool g_bMultinic = false;char g_pszHostName[NT_HOSTNAME_LEN] = "";char g_pszRootHostName[NT_HOSTNAME_LEN] = "";int g_nRootPort = 0;NT_ipvishm_ProcEntry *g_pProcTable = NULL;MessageQueue g_MsgQueue;char g_ErrMsg[1024];bool g_bInNT_ipvishm_End = false;LARGE_INTEGER g_nPerfFrequency;#ifdef DEBUG_OUTPUTbool g_bVerbose = false;#endifbool g_bMPIRunUsed = true;bool g_bUseDatabase = false;bool g_bUseBNR = false;Database g_Database;char g_pszJobID[100]="";char g_pszMPDHost[NT_HOSTNAME_LEN];char g_pszMPDPhrase[256];char g_pszMPDId[20];int g_nMPDPort;bool g_bMPDFinalize = false;extern "C" {int MPID_NT_ipvishm_is_shm( int );__declspec(dllexport) void GetMPICHVersion(char *str, int length);}// Function name : GetMPICHVersion// Description : // Return type : void // Argument : char *str// Argument : int length__declspec(dllexport) void GetMPICHVersion(char *str, int length){ //_snprintf(str, length, "%d.%d.%d %s", VERSION_RELEASE, VERSION_MAJOR, VERSION_MINOR, __DATE__); _snprintf(str, length, "%s %s", MPICH_VERSION, __DATE__);}// Function name : PollShmemQueue// Description : // Return type : void void PollShmemAndViQueues(){ bool bSleep = true; if (g_pShmemQueue[g_nIproc]->RemoveNextInsert(&g_MsgQueue, false)) bSleep = false; if (ViWorkerThread(0)) bSleep = false; if (bSleep) Sleep(0);}// Function name : MakeErrMsg// Description : // Return type : void // Argument : int error// Argument : char *pFormat// Argument : ...void MakeErrMsg(int error, char *pFormat, ...){ char chMsg[1024]; va_list pArg; va_start(pArg, pFormat); vsprintf(chMsg, pFormat, pArg); va_end(pArg); nt_error(chMsg, error);}// Function name : ArgSqueeze// Description : Remove all null arguments from an arg vector; update the number of arguments.// Return type : void // Argument : int *Argc// Argument : char **argvvoid ArgSqueeze( int *Argc, char **argv ){ int argc, i, j; // Compress out the eliminated args argc = *Argc; j = 0; i = 0; while (j < argc) { while (argv[j] == 0 && j < argc) j++; if (j < argc) argv[i++] = argv[j++]; } // Back off the last value if it is null if (!argv[i-1]) i--; *Argc = i;}// Function name : AbortInit// Description : // Return type : void // Argument : int error// Argument : char *pFormat// Argument : ...void AbortInit(int error, char *pFormat, ...){ char chMsg[1024]; va_list pArg; va_start(pArg, pFormat); vsprintf(chMsg, pFormat, pArg); va_end(pArg); nt_error(chMsg, error);}// Function name : SetEnvironmentString// Description : // Return type : void // Argument : char *pszEnvvoid SetEnvironmentString(char *pszEnv){ char name[MAX_PATH]="", value[MAX_PATH]=""; char *pChar; pChar = name; while (*pszEnv != '\0') { if (*pszEnv == '=') { *pChar = '\0'; pChar = value; } else if (*pszEnv == '|') { *pChar = '\0'; pChar = name; SetEnvironmentVariable(name, value); } else { *pChar = *pszEnv; pChar++; } pszEnv++; } *pChar = '\0'; SetEnvironmentVariable(name, value);}static unsigned int GetIP(char *pszIP){ unsigned int nIP; unsigned int a,b,c,d; sscanf(pszIP, "%u.%u.%u.%u", &a, &b, &c, &d); //printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout); nIP = (d << 24) | (c << 16) | (b << 8) | a; return nIP;}static unsigned int GetMask(char *pszMask){ unsigned int nMask = 0; if (strstr(pszMask, ".")) { unsigned int a,b,c,d; sscanf(pszMask, "%u.%u.%u.%u", &a, &b, &c, &d); //printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout); nMask = (d << 24) | (c << 16) | (b << 8) | a; } else { int nBits = atoi(pszMask); for (int i=0; i<nBits; i++) { nMask = nMask << 1; nMask = nMask | 0x1; } } /* unsigned int a, b, c, d; a = ((unsigned char *)(&nMask))[0]; b = ((unsigned char *)(&nMask))[1]; c = ((unsigned char *)(&nMask))[2]; d = ((unsigned char *)(&nMask))[3]; printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout); */ return nMask;}static int GetLocalIPs(unsigned int *pIP, int max){ char hostname[100], **hlist; HOSTENT *h = NULL; int error; int n = 0; if (gethostname(hostname, 100) == SOCKET_ERROR) { error = WSAGetLastError(); return 0; } h = gethostbyname(hostname); if (h == NULL) { error = WSAGetLastError(); return 0; } hlist = h->h_addr_list; while (*hlist != NULL && n<max) { pIP[n] = *(unsigned int*)(*hlist); /* unsigned int a, b, c, d; a = ((unsigned char *)(&pIP[n]))[0]; b = ((unsigned char *)(&pIP[n]))[1]; c = ((unsigned char *)(&pIP[n]))[2]; d = ((unsigned char *)(&pIP[n]))[3]; printf("ip: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout); */ hlist++; n++; } return n;}bool PutRootPortInMPDDatabase(char *str, int port, char *barrier_name){ char dbname[100]; char pszStr[256]; SOCKET sock; DWORD length = 100; char *pszID; pszID = getenv("MPD_ID"); if (pszID != NULL) strcpy(g_pszMPDId, pszID); strcpy(pszStr, str); str = strtok(pszStr, ":"); if (str == NULL) return false; strcpy(dbname, str); str = strtok(NULL, ":"); if (str == NULL) return false; g_nMPDPort = atoi(str); str = strtok(NULL, ":"); if (str == NULL) return false; strcpy(g_pszMPDPhrase, str); str = strtok(NULL, ":"); if (str != NULL) strcpy(g_pszMPDHost, str); else GetComputerName(g_pszMPDHost, &length); easy_socket_init(); if (ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &sock)) { return false; } sprintf(pszStr, "dbput name=%s key=port value=%d", dbname, port); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR:PutRootPortInMPDDatabase: Unable to write '%s' to socket[%d]\n", pszStr, sock); easy_closesocket(sock); return false; } if (!ReadStringTimeout(sock, pszStr, MPICH_MPD_TIMEOUT)) { printf("ERROR:PutRootPortInMPDDatabase: put failed: error %d", WSAGetLastError()); easy_closesocket(sock); return false; } if (strnicmp(pszStr, "DBS_SUCCESS", 11) != 0) { printf("ERROR:PutRootPortInMPDDatabase: putting the root port in the mpd database failed.\n%s", pszStr); WriteString(sock, "done"); easy_closesocket(sock); return false; } sprintf(pszStr, "barrier name=%s count=2", barrier_name); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR:PutRootPortInMPDDatabase: Unable to write the barrier command: error %d", WSAGetLastError()); easy_closesocket(sock); return false; } bool bBarrierContinue = true; while (bBarrierContinue) { if (!ReadStringTimeout(sock, pszStr, MPICH_MPD_TIMEOUT)) { printf("ERROR:PutRootPortInMPDDatabase: Unable to read the result of the barrier command: error %d", WSAGetLastError()); easy_closesocket(sock); return false; } if (strncmp(pszStr, "SUCCESS", 8)) { // If it is not 'SUCCESS' then if (strncmp(pszStr, "INFO", 4)) { // If it is not an 'INFO - ...' message then it is an error printf("ERROR:PutRootPortInMPDDatabase: barrier failed:\n%s", pszStr); easy_closesocket(sock); return false; } } else { bBarrierContinue = false; } } WriteString(sock, "done"); easy_closesocket(sock); return true;}bool ParseMPDString(char *str){ char pszStr[1024]; char *pszID; pszID = getenv("MPD_ID"); if (pszID == NULL) return false; strcpy(g_pszMPDId, pszID); strcpy(pszStr, str); str = strtok(pszStr, ":"); if (str == NULL) return false; strcpy(g_pszMPDHost, str); str = strtok(NULL, ":"); if (str == NULL) return false; g_nMPDPort = atoi(str); str = strtok(NULL, ":"); if (str == NULL) return false; strcpy(g_pszMPDPhrase, str); return true;}bool UpdateMPIFinalizedInMPD(){ SOCKET sock; char pszStr[256]; if (ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &sock)) { printf("ConnectToMPD(%s:%d) failed preventing process %d from signalling that it has reached MPI_Finalize\n", g_pszMPDHost, g_nMPDPort, g_nIproc); fflush(stdout); return false; } sprintf(pszStr, "setMPIFinalized %s", g_pszMPDId); if (WriteString(sock, pszStr) == SOCKET_ERROR) { printf("ERROR:UpdateMPIFinalized: Unable to write '%s' to socket[%d]\n", pszStr, sock); fflush(stdout); easy_closesocket(sock); return false; } if (!ReadStringTimeout(sock, pszStr, MPICH_MPD_TIMEOUT)) { printf("ERROR:UpdateMPIFinalized: Unable to read the result of the setMPIFinalized command\n"); fflush(stdout); easy_closesocket(sock); return false; } if (stricmp(pszStr, "SUCCESS") != 0) { printf("ERROR:UpdateMPIFinalized: setMPIFinalized failed.\n"); fflush(stdout); WriteString(sock, "done"); easy_closesocket(sock); return false; } WriteString(sock, "done"); easy_closesocket(sock); return true;}// Function name : MPID_NT_ipvishm_Init// Description : Launches all the processes and sets up a mechanism by which// any process can make a connection with any other process.// Return type : void // Argument : int *argc// Argument : char ***argvvoid MPID_NT_ipvishm_Init( int *argc, char ***argv ){ char pszIproc[10]="", pszNproc[10]="", pszRootPort[10]=""; char pszExtra[256]="", pszTemp[100]; char pszDBSHost[100]="", pszDBSPort[10]=""; bool bCommPortAvailable = true; WSADATA wsaData; int err, i; try{#ifdef DEBUG_OUTPUT char pszVerbose[100]; if (GetEnvironmentVariable("MPICH_VERBOSE", pszVerbose, 100)) g_bVerbose = true;#endif //Start the Winsock dll if ((err = WSAStartup( MAKEWORD( 2, 0 ), &wsaData )) != 0) AbortInit(err, "Winsock2 dll not initialized"); // Attempt to use BNR g_bUseBNR = LoadBNRFunctions(); if (g_bUseBNR) { BNR_Group parent_group, joint_group; if (BNR_Init() == BNR_FAIL) { g_bUseBNR = false; } else { if (BNR_Get_group(&g_myBNRgroup) == BNR_FAIL) AbortInit(1, "BNR_Get_group failed"); if (BNR_Get_parent(&parent_group) == BNR_FAIL) AbortInit(1, "BNR_Get_parent failed"); try { if (BNR_Merge(g_myBNRgroup, parent_group, &joint_group) == BNR_FAIL) AbortInit(1, "BNR_Merge failed"); }catch(...) { AbortInit(1, "Exception caught in BNR_Merge"); } if (BNR_Fence(joint_group) == BNR_FAIL) AbortInit(1, "BNR_Fence failed"); MPID_MyWorldRank = -1; if (BNR_Get_rank(g_myBNRgroup, &MPID_MyWorldRank) == BNR_FAIL) AbortInit(1, "BNR_Get_rank failed"); char pKey[100], pBuffer[4096] = ""; sprintf(pKey, "env%d", MPID_MyWorldRank); if (BNR_Get(joint_group, pKey, pBuffer) == BNR_FAIL) AbortInit(1, "BNR_Get %s failed", pKey); SetEnvironmentString(pBuffer); if (BNR_Free_group(parent_group) == BNR_FAIL) AbortInit(1, "BNR_Free_group(parent_group) failed"); if (BNR_Free_group(joint_group) == BNR_FAIL) AbortInit(1, "BNR_Free_group(joint_group) failed"); } } // Save the local host name // For multihomed systems MPI_COMNIC can set the hostname to a specific nic // else the default is whatever gethostname returns g_pszHostName[0] = '\0'; GetEnvironmentVariable("MPICH_COMNIC", g_pszHostName, NT_HOSTNAME_LEN);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -