⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nt_ipvishm_priv.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 3 页
字号:
#include "mpid.h"#include "ShmemLockedQueue.h"#include "nt_global_cpp.h"#include <windows.h>#include <stdio.h>#include <time.h>#include "Database.h"#include "bnrfunctions.h"#include <stdlib.h>#include "mpdutil.h"#define MPICH_MPD_TIMEOUT     20#define MPICH_SHORT_TIMEOUT   15000#define MPICH_MEDIUM_TIMEOUT  30000// Global variablesint g_nLastRecvFrom = 0;int g_nIproc = 0;int g_nNproc = 0;unsigned int g_nNicMask = 0;unsigned int g_nNicNet = 0;bool g_bMultinic = false;char g_pszHostName[NT_HOSTNAME_LEN] = "";char g_pszRootHostName[NT_HOSTNAME_LEN] = "";int g_nRootPort = 0;NT_ipvishm_ProcEntry *g_pProcTable = NULL;MessageQueue g_MsgQueue;char g_ErrMsg[1024];bool g_bInNT_ipvishm_End = false;LARGE_INTEGER g_nPerfFrequency;#ifdef DEBUG_OUTPUTbool g_bVerbose = false;#endifbool g_bMPIRunUsed = true;bool g_bUseDatabase = false;bool g_bUseBNR = false;Database g_Database;char g_pszJobID[100]="";char g_pszMPDHost[NT_HOSTNAME_LEN];char g_pszMPDPhrase[256];char g_pszMPDId[20];int g_nMPDPort;bool g_bMPDFinalize = false;extern "C" {int MPID_NT_ipvishm_is_shm( int );__declspec(dllexport) void GetMPICHVersion(char *str, int length);}// Function name	: GetMPICHVersion// Description	    : // Return type		: void // Argument         : char *str// Argument         : int length__declspec(dllexport) void GetMPICHVersion(char *str, int length){    //_snprintf(str, length, "%d.%d.%d %s", VERSION_RELEASE, VERSION_MAJOR, VERSION_MINOR, __DATE__);    _snprintf(str, length, "%s %s", MPICH_VERSION, __DATE__);}// Function name	: PollShmemQueue// Description	    : // Return type		: void void PollShmemAndViQueues(){	bool bSleep = true;	if (g_pShmemQueue[g_nIproc]->RemoveNextInsert(&g_MsgQueue, false))		bSleep = false;	if (ViWorkerThread(0))		bSleep = false;	if (bSleep)		Sleep(0);}// Function name	: MakeErrMsg// Description	    : // Return type		: void // Argument         : int error// Argument         : char *pFormat// Argument         : ...void MakeErrMsg(int error, char *pFormat, ...){	char chMsg[1024];	va_list pArg;		va_start(pArg, pFormat);	vsprintf(chMsg, pFormat, pArg);	va_end(pArg);	nt_error(chMsg, error);}// Function name	: ArgSqueeze// Description	    : Remove all null arguments from an arg vector; update the number of arguments.// Return type		: void // Argument         :  int *Argc// Argument         : char **argvvoid ArgSqueeze( int *Argc, char **argv ){    int argc, i, j;    // Compress out the eliminated args    argc = *Argc;    j    = 0;    i    = 0;    while (j < argc) 	{		while (argv[j] == 0 && j < argc) 			j++;		if (j < argc) argv[i++] = argv[j++];    }    // Back off the last value if it is null    if (!argv[i-1]) 		i--;    *Argc = i;}// Function name	: AbortInit// Description	    : // Return type		: void // Argument         : int error// Argument         : char *pFormat// Argument         : ...void AbortInit(int error, char *pFormat, ...){	char chMsg[1024];	va_list pArg;		va_start(pArg, pFormat);	vsprintf(chMsg, pFormat, pArg);	va_end(pArg);	nt_error(chMsg, error);}// Function name	: SetEnvironmentString// Description	    : // Return type		: void // Argument         : char *pszEnvvoid SetEnvironmentString(char *pszEnv){	char name[MAX_PATH]="", value[MAX_PATH]="";	char *pChar;	pChar = name;	while (*pszEnv != '\0')	{		if (*pszEnv == '=')		{			*pChar = '\0';			pChar = value;		}		else		if (*pszEnv == '|')		{			*pChar = '\0';			pChar = name;			SetEnvironmentVariable(name, value);		}		else		{			*pChar = *pszEnv;			pChar++;		}		pszEnv++;	}	*pChar = '\0';	SetEnvironmentVariable(name, value);}static unsigned int GetIP(char *pszIP){    unsigned int nIP;    unsigned int a,b,c,d;    sscanf(pszIP, "%u.%u.%u.%u", &a, &b, &c, &d);    //printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout);    nIP = (d << 24) | (c << 16) | (b << 8) | a;    return nIP;}static unsigned int GetMask(char *pszMask){    unsigned int nMask = 0;    if (strstr(pszMask, "."))    {	unsigned int a,b,c,d;	sscanf(pszMask, "%u.%u.%u.%u", &a, &b, &c, &d);	//printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout);	nMask = (d << 24) | (c << 16) | (b << 8) | a;    }    else    {	int nBits = atoi(pszMask);	for (int i=0; i<nBits; i++)	{	    nMask = nMask << 1;	    nMask = nMask | 0x1;	}    }    /*    unsigned int a, b, c, d;    a = ((unsigned char *)(&nMask))[0];    b = ((unsigned char *)(&nMask))[1];    c = ((unsigned char *)(&nMask))[2];    d = ((unsigned char *)(&nMask))[3];    printf("mask: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout);    */    return nMask;}static int GetLocalIPs(unsigned int *pIP, int max){    char hostname[100], **hlist;    HOSTENT *h = NULL;    int error;    int n = 0;        if (gethostname(hostname, 100) == SOCKET_ERROR)    {	error = WSAGetLastError();	return 0;    }        h = gethostbyname(hostname);    if (h == NULL)    {	error = WSAGetLastError();	return 0;    }        hlist = h->h_addr_list;    while (*hlist != NULL && n<max)    {	pIP[n] = *(unsigned int*)(*hlist);	/*	unsigned int a, b, c, d;	a = ((unsigned char *)(&pIP[n]))[0];	b = ((unsigned char *)(&pIP[n]))[1];	c = ((unsigned char *)(&pIP[n]))[2];	d = ((unsigned char *)(&pIP[n]))[3];	printf("ip: %u.%u.%u.%u\n", a, b, c, d);fflush(stdout);	*/	hlist++;	n++;    }    return n;}bool PutRootPortInMPDDatabase(char *str, int port, char *barrier_name){	char dbname[100];	char pszStr[256];	SOCKET sock;	DWORD length = 100;	char *pszID;	pszID = getenv("MPD_ID");	if (pszID != NULL)	    strcpy(g_pszMPDId, pszID);	strcpy(pszStr, str);	str = strtok(pszStr, ":");	if (str == NULL)	    return false;	strcpy(dbname, str);	str = strtok(NULL, ":");	if (str == NULL)	    return false;	g_nMPDPort = atoi(str);	str = strtok(NULL, ":");	if (str == NULL)	    return false;	strcpy(g_pszMPDPhrase, str);	str = strtok(NULL, ":");	if (str != NULL)	    strcpy(g_pszMPDHost, str);	else	    GetComputerName(g_pszMPDHost, &length);	easy_socket_init();	if (ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &sock))	{		return false;	}	sprintf(pszStr, "dbput name=%s key=port value=%d", dbname, port);	if (WriteString(sock, pszStr) == SOCKET_ERROR)	{		printf("ERROR:PutRootPortInMPDDatabase: Unable to write '%s' to socket[%d]\n", pszStr, sock);		easy_closesocket(sock);		return false;	}	if (!ReadStringTimeout(sock, pszStr, MPICH_MPD_TIMEOUT))	{		printf("ERROR:PutRootPortInMPDDatabase: put failed: error %d", WSAGetLastError());		easy_closesocket(sock);		return false;	}	if (strnicmp(pszStr, "DBS_SUCCESS", 11) != 0)	{	    printf("ERROR:PutRootPortInMPDDatabase: putting the root port in the mpd database failed.\n%s", pszStr);	    WriteString(sock, "done");	    easy_closesocket(sock);	    return false;	}	sprintf(pszStr, "barrier name=%s count=2", barrier_name);	if (WriteString(sock, pszStr) == SOCKET_ERROR)	{		printf("ERROR:PutRootPortInMPDDatabase: Unable to write the barrier command: error %d", WSAGetLastError());		easy_closesocket(sock);		return false;	}	bool bBarrierContinue = true;	while (bBarrierContinue)	{	    if (!ReadStringTimeout(sock, pszStr, MPICH_MPD_TIMEOUT))	    {		printf("ERROR:PutRootPortInMPDDatabase: Unable to read the result of the barrier command: error %d", WSAGetLastError());		easy_closesocket(sock);		return false;	    }	    if (strncmp(pszStr, "SUCCESS", 8))	    {		// If it is not 'SUCCESS' then		if (strncmp(pszStr, "INFO", 4))		{		    // If it is not an 'INFO - ...' message then it is an error		    printf("ERROR:PutRootPortInMPDDatabase: barrier failed:\n%s", pszStr);		    easy_closesocket(sock);		    return false;		}	    }	    else	    {		bBarrierContinue = false;	    }	}	WriteString(sock, "done");	easy_closesocket(sock);	return true;}bool ParseMPDString(char *str){    char pszStr[1024];    char *pszID;    pszID = getenv("MPD_ID");    if (pszID == NULL)	return false;    strcpy(g_pszMPDId, pszID);    strcpy(pszStr, str);    str = strtok(pszStr, ":");    if (str == NULL)	return false;    strcpy(g_pszMPDHost, str);    str = strtok(NULL, ":");    if (str == NULL)	return false;    g_nMPDPort = atoi(str);    str = strtok(NULL, ":");    if (str == NULL)	return false;    strcpy(g_pszMPDPhrase, str);    return true;}bool UpdateMPIFinalizedInMPD(){    SOCKET sock;    char pszStr[256];    if (ConnectToMPD(g_pszMPDHost, g_nMPDPort, g_pszMPDPhrase, &sock))    {	printf("ConnectToMPD(%s:%d) failed preventing process %d from signalling that it has reached MPI_Finalize\n", g_pszMPDHost, g_nMPDPort, g_nIproc);	fflush(stdout);	return false;    }    sprintf(pszStr, "setMPIFinalized %s", g_pszMPDId);    if (WriteString(sock, pszStr) == SOCKET_ERROR)    {	printf("ERROR:UpdateMPIFinalized: Unable to write '%s' to socket[%d]\n", pszStr, sock);	fflush(stdout);	easy_closesocket(sock);	return false;    }    if (!ReadStringTimeout(sock, pszStr, MPICH_MPD_TIMEOUT))    {	printf("ERROR:UpdateMPIFinalized: Unable to read the result of the setMPIFinalized command\n");	fflush(stdout);	easy_closesocket(sock);	return false;    }    if (stricmp(pszStr, "SUCCESS") != 0)    {	printf("ERROR:UpdateMPIFinalized: setMPIFinalized failed.\n");	fflush(stdout);	WriteString(sock, "done");	easy_closesocket(sock);	return false;    }    WriteString(sock, "done");    easy_closesocket(sock);    return true;}// Function name	: MPID_NT_ipvishm_Init// Description	    : Launches all the processes and sets up a mechanism by which//                    any process can make a connection with any other process.// Return type		: void // Argument         :  int *argc// Argument         : char ***argvvoid MPID_NT_ipvishm_Init( int *argc, char ***argv ){	char pszIproc[10]="", pszNproc[10]="", pszRootPort[10]="";	char pszExtra[256]="", pszTemp[100];	char pszDBSHost[100]="", pszDBSPort[10]="";	bool bCommPortAvailable = true;	WSADATA wsaData;	int err, i;	try{#ifdef DEBUG_OUTPUT	char pszVerbose[100];	if (GetEnvironmentVariable("MPICH_VERBOSE", pszVerbose, 100))		g_bVerbose = true;#endif	//Start the Winsock dll	if ((err = WSAStartup( MAKEWORD( 2, 0 ), &wsaData )) != 0)		AbortInit(err, "Winsock2 dll not initialized");	// Attempt to use BNR	g_bUseBNR = LoadBNRFunctions();	if (g_bUseBNR)	{		BNR_Group parent_group, joint_group;		if (BNR_Init() == BNR_FAIL)		{			g_bUseBNR = false;		}		else		{			if (BNR_Get_group(&g_myBNRgroup) == BNR_FAIL)				AbortInit(1, "BNR_Get_group failed");			if (BNR_Get_parent(&parent_group) == BNR_FAIL)				AbortInit(1, "BNR_Get_parent failed");			try	{			if (BNR_Merge(g_myBNRgroup, parent_group, &joint_group) == BNR_FAIL)				AbortInit(1, "BNR_Merge failed");			}catch(...) { AbortInit(1, "Exception caught in BNR_Merge"); }			if (BNR_Fence(joint_group) == BNR_FAIL)				AbortInit(1, "BNR_Fence failed");			MPID_MyWorldRank = -1;			if (BNR_Get_rank(g_myBNRgroup, &MPID_MyWorldRank) == BNR_FAIL)				AbortInit(1, "BNR_Get_rank failed");						char pKey[100], pBuffer[4096] = "";			sprintf(pKey, "env%d", MPID_MyWorldRank);			if (BNR_Get(joint_group, pKey, pBuffer) == BNR_FAIL)				AbortInit(1, "BNR_Get %s failed", pKey);			SetEnvironmentString(pBuffer);						if (BNR_Free_group(parent_group) == BNR_FAIL)				AbortInit(1, "BNR_Free_group(parent_group) failed");			if (BNR_Free_group(joint_group) == BNR_FAIL)				AbortInit(1, "BNR_Free_group(joint_group) failed");		}	}	// Save the local host name	// For multihomed systems MPI_COMNIC can set the hostname to a specific nic	// else the default is whatever gethostname returns	g_pszHostName[0] = '\0';	GetEnvironmentVariable("MPICH_COMNIC", g_pszHostName, NT_HOSTNAME_LEN);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -