⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bnr_spawn.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "bnrimpl.h"#include "mpdutil.h"#include "mpd.h"#include "bsocket.h"#include <stdio.h>#include "mpichinfo.h"#include "redirectio.h"HANDLE g_hSpawnMutex = NULL;HANDLE g_hJobThreads[100];int g_nNumJobThreads = 0;struct Spawn_struct{    Spawn_struct();    Spawn_struct(int n);    ~Spawn_struct();    struct Spawn_node    {	int pid;	int launchid;	char fwd_host[100];	int fwd_port;    };    int m_nNproc;    Spawn_node *m_pNode;    int m_bfd;    int m_bfdStop;    HANDLE m_hRedirectIOThread;    HANDLE m_hThread;    Spawn_struct *m_pNext;};Spawn_struct::Spawn_struct(){    m_bfd = BFD_INVALID_SOCKET;    m_bfdStop = BFD_INVALID_SOCKET;    m_hRedirectIOThread = NULL;    m_nNproc = 0;    m_pNode = NULL;    m_pNext = NULL;    m_hThread = NULL;}Spawn_struct::Spawn_struct(int n){    Spawn_struct();    m_nNproc = n;    m_pNode = new Spawn_node[n];    for (int i=0; i<n; i++)    {	m_pNode[i].fwd_host[0] = '\0';	m_pNode[i].fwd_port = 0;	m_pNode[i].launchid = -1;	m_pNode[i].pid = 0;    }}Spawn_struct::~Spawn_struct(){    m_nNproc = 0;    if (m_pNode)	delete m_pNode;    m_pNode = NULL;    if (m_hRedirectIOThread != NULL)    {	if (m_bfdStop != BFD_INVALID_SOCKET)	{	    beasy_send(m_bfdStop, "x", 1);	    if (WaitForSingleObject(m_hRedirectIOThread, 10000) == WAIT_TIMEOUT)		TerminateThread(m_hRedirectIOThread, 0);	}	else	    TerminateThread(m_hRedirectIOThread, 0);	CloseHandle(m_hRedirectIOThread);    }    m_hRedirectIOThread = NULL;    if (m_bfd != BFD_INVALID_SOCKET)    {	beasy_closesocket(m_bfd);	m_bfd = BFD_INVALID_SOCKET;    }    if (m_bfdStop != BFD_INVALID_SOCKET)    {	beasy_closesocket(m_bfdStop);	m_bfdStop = BFD_INVALID_SOCKET;    }    m_pNext = NULL;    // don't touch the m_hThread member because BNR_Finalize may be waiting on it?    if (!g_bBNRFinalizeWaiting && m_hThread != NULL)    {	CloseHandle(m_hThread);	m_hThread = NULL;    }}Spawn_struct *g_pSpawnList = NULL;char g_pszIOHost[100] = "";int g_nIOPort = 0;struct HostNode{    char pszHost[100];    int nSMPProcs;    HostNode *pNext;};static bool GetHostsFromFile(char *pszFileName, HostNode **ppNode, int nNumWanted){    FILE *fin;    char buffer[1024] = "";    char *pChar, *pChar2;    HostNode *node = NULL, *list = NULL, *cur_node;    // check the parameters    if ((nNumWanted < 1) || (ppNode == NULL))	return false;    // open the file    fin = fopen(pszFileName, "r");    if (fin == NULL)    {	printf("Error: unable to open file '%s', error %d\n", pszFileName, GetLastError());	return false;    }        // Read the host names from the file    while (fgets(buffer, 1024, fin))    {	pChar = buffer;		// Advance over white space	while (*pChar != '\0' && isspace(*pChar))	    pChar++;	if (*pChar == '#' || *pChar == '\0')	    continue;		// Trim trailing white space	pChar2 = &buffer[strlen(buffer)-1];	while (isspace(*pChar2) && (pChar >= pChar))	{	    *pChar2 = '\0';	    pChar2--;	}		// If there is anything left on the line, consider it a host name	if (strlen(pChar) > 0)	{	    node = new HostNode;	    node->nSMPProcs = 1;	    node->pNext = NULL;	    	    // Copy the host name	    pChar2 = node->pszHost;	    while (*pChar != '\0' && !isspace(*pChar))	    {		*pChar2 = *pChar;		pChar++;		pChar2++;	    }	    *pChar2 = '\0';	    pChar2 = strtok(node->pszHost, ":");	    pChar2 = strtok(NULL, "\n");	    if (pChar2 != NULL)	    {		node->nSMPProcs = atoi(pChar2);		if (node->nSMPProcs < 1)		    node->nSMPProcs = 1;	    }	    else	    {		// Advance over white space		while (*pChar != '\0' && isspace(*pChar))		    pChar++;		// Get the number of SMP processes		if (*pChar != '\0')		{		    node->nSMPProcs = atoi(pChar);		    if (node->nSMPProcs < 1)			node->nSMPProcs = 1;		}	    }	    if (list == NULL)	    {		list = node;		cur_node = node;	    }	    else	    {		cur_node->pNext = node;		cur_node = node;	    }	}    }    fclose(fin);    if (list == NULL)	return false;    // Allocate the first host node    node = new HostNode;    int num_left = nNumWanted;    HostNode *n = list, *target = node;    // add the nodes to the target list, cycling if necessary    while (num_left)    {	target->pNext = NULL;	strcpy(target->pszHost, n->pszHost);	if (num_left <= n->nSMPProcs)	{	    target->nSMPProcs = num_left;	    num_left = 0;	}	else	{	    target->nSMPProcs = n->nSMPProcs;	    num_left = num_left - n->nSMPProcs;	}	if (num_left)	{	    target->pNext = new HostNode;	    target = target->pNext;	}	n = n->pNext;	if (n == NULL)	    n = list;    }        // free the list created from the file    while (list)    {	n = list;	list = list->pNext;	delete n;    }    // add the generated list to the end of the list passed in    if (*ppNode == NULL)    {	*ppNode = node;    }    else    {	cur_node = *ppNode;	while (cur_node->pNext != NULL)	    cur_node = cur_node->pNext;	cur_node->pNext = node;    }    return true;}static void FreeHosts(HostNode *pNode){    HostNode *n;    while (pNode)    {	n = pNode;	pNode = pNode->pNext;	delete n;    }}static void GetHost(HostNode *pList, int nRank, char *pszHost){    nRank++;    while (nRank > 0)    {	if (pList == NULL)	    return;	nRank = nRank - pList->nSMPProcs;	if (nRank > 0)	    pList = pList->pNext;    }    if (pList == NULL)	return;    strcpy(pszHost, pList->pszHost);}static void CreateCommand(int count, int *maxprocs, char **cmds, char ***argvs, int nIproc, char *pszCmd){    int i = 0;    char **ppArg;    nIproc++;    while (nIproc > 0)    {	if (i >= count)	    return;	nIproc = nIproc - maxprocs[i];	if (nIproc > 0)	    i++;    }    if (i >= count)	return;    sprintf(pszCmd, "\"%s\"", cmds[i]);    if (argvs == NULL)	return;    ppArg = argvs[i];    while (ppArg)    {	strcat(pszCmd, " ");	strcat(pszCmd, *ppArg);	ppArg++;    }}static void RemoveSpawnThread(Spawn_struct *pSpawn){    if (g_bBNRFinalizeWaiting)	return;    WaitForSingleObject(g_hSpawnMutex, INFINITE);    for (int i=0; i<g_nNumJobThreads; i++)    {	if (g_hJobThreads[i] == pSpawn->m_hThread)	{	    g_nNumJobThreads--;	    g_hJobThreads[i] = g_hJobThreads[g_nNumJobThreads];	    CloseHandle(pSpawn->m_hThread);	    pSpawn->m_hThread = NULL;	    break;	}    }    ReleaseMutex(g_hSpawnMutex);}static void RemoveSpawnStruct(Spawn_struct *pSpawn){    Spawn_struct *p, *pTrailer;    WaitForSingleObject(g_hSpawnMutex, INFINITE);    if (pSpawn == g_pSpawnList)    {	g_pSpawnList = g_pSpawnList->m_pNext;	ReleaseMutex(g_hSpawnMutex);	return;    }    pTrailer = g_pSpawnList;    p = g_pSpawnList->m_pNext;    while (p)    {	if (p == pSpawn)	{	    pTrailer->m_pNext = p->m_pNext;	    ReleaseMutex(g_hSpawnMutex);	    return;	}	pTrailer = pTrailer->m_pNext;	p = p->m_pNext;    }    ReleaseMutex(g_hSpawnMutex);}void SpawnWaitThread(Spawn_struct *pSpawn){    int i,j;    char pszStr[1024];    int nPid;    for (i=0; i<pSpawn->m_nNproc; i++)    {	sprintf(pszStr, "getexitcodewait %d", pSpawn->m_pNode[i].launchid);	if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR)	{	    printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError());	    RemoveSpawnThread(pSpawn);	    RemoveSpawnStruct(pSpawn);	    delete pSpawn;	    return;	}    }    for (i=0; i<pSpawn->m_nNproc; i++)    {	if (!ReadString(pSpawn->m_bfd, pszStr))	{	    printf("ReadString(exitcode) failed, error %d\n", WSAGetLastError());	    RemoveSpawnThread(pSpawn);	    RemoveSpawnStruct(pSpawn);	    delete pSpawn;	    return;	}	char *token = strtok(pszStr, ":");	if (token != NULL)	{	    token = strtok(NULL, "\n");	    if (token != NULL)	    {		nPid = atoi(token);		for (j=0; j<pSpawn->m_nNproc; j++)		{		    if (pSpawn->m_pNode[j].pid == nPid)		    {			if ((j > 0) && ((pSpawn->m_nNproc/2) > j))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -