⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 processwait.cpp

📁 MPICH是MPI的重要研究,提供了一系列的接口函数,为并行计算的实现提供了编程环境.
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include <stdio.h>#include <stdlib.h>#include "LaunchProcess.h"#include "global.h"#include "mpirun.h"#include "mpdutil.h"struct ProcessWaitThreadArg{    int n;    SOCKET *pSocket;    int *pId;    int *pRank;    SOCKET sockAbort;};struct ProcessWaitAbortThreadArg{    SOCKET sockAbort;    SOCKET sockStop;    int n;    SOCKET *pSocket;};// Function name	: ProcessWaitAbort// Description	    : // Return type		: void // Argument         : ProcessWaitAbortThreadArg *pArgvoid ProcessWaitAbort(ProcessWaitAbortThreadArg *pArg){    int n, i;    fd_set readset;    FD_ZERO(&readset);    FD_SET(pArg->sockAbort, &readset);    FD_SET(pArg->sockStop, &readset);    n = select(0, &readset, NULL, NULL, NULL);    if (n == SOCKET_ERROR)    {	PrintError(WSAGetLastError(), "bselect failed\n");fflush(stdout);	for (i=0; i<pArg->n; i++)	{	    easy_closesocket(pArg->pSocket[i]);	}	easy_closesocket(pArg->sockAbort);	easy_closesocket(pArg->sockStop);	return;    }    if (n == 0)    {	printf("ProcessWaitAbort: bselect returned zero sockets available\n");fflush(stdout);	for (i=0; i<pArg->n; i++)	{	    easy_closesocket(pArg->pSocket[i]);	}	easy_closesocket(pArg->sockAbort);	easy_closesocket(pArg->sockStop);	return;    }    if (FD_ISSET(pArg->sockAbort, &readset))    {	for (i=0; i<pArg->n; i++)	{	    easy_send(pArg->pSocket[i], "x", 1);	}    }    for (i=0; i<pArg->n; i++)    {	easy_closesocket(pArg->pSocket[i]);    }    easy_closesocket(pArg->sockAbort);    easy_closesocket(pArg->sockStop);}// Function name	: ProcessWait// Description	    : // Return type		: void // Argument         : ProcessWaitThreadArg *pArgvoid ProcessWait(ProcessWaitThreadArg *pArg){    int i, j, n;    fd_set totalset, readset;    char str[256];        FD_ZERO(&totalset);        FD_SET(pArg->sockAbort, &totalset);    for (i=0; i<pArg->n; i++)    {	FD_SET(pArg->pSocket[i], &totalset);    }        while (pArg->n)    {	readset = totalset;	n = select(0, &readset, NULL, NULL, NULL);	if (n == SOCKET_ERROR)	{	    PrintError(WSAGetLastError(), "bselect failed\n");fflush(stdout);	    for (i=0, j=0; i<pArg->n; i++, j++)	    {		while (pArg->pSocket[j] == INVALID_SOCKET)		    j++;		easy_closesocket(pArg->pSocket[j]);		pArg->pSocket[j] = INVALID_SOCKET;	    }	    return;	}	if (n == 0)	{	    printf("ProcessWait: bselect returned zero sockets available");fflush(stdout);	    for (i=0, j=0; i<pArg->n; i++, j++)	    {		while (pArg->pSocket[j] == INVALID_SOCKET)		    j++;		easy_closesocket(pArg->pSocket[j]);		pArg->pSocket[j] = INVALID_SOCKET;	    }	    return;	}	if (FD_ISSET(pArg->sockAbort, &readset))	{	    for (i=0; pArg->n > 0; i++)	    {		while (pArg->pSocket[i] == INVALID_SOCKET)		    i++;		sprintf(str, "kill %d", pArg->pId[i]);		//printf("%d:%s\n", __LINE__, str);fflush(stdout);		WriteString(pArg->pSocket[i], str);		int nRank = pArg->pRank[i];		if (g_nNproc > FORWARD_NPROC_THRESHOLD)		{		    if (nRank > 0 && (g_nNproc/2) > nRank)		    {			//printf("rank %d(%d) stopping forwarder\n", nRank, g_pProcessLaunchId[i]);fflush(stdout);			sprintf(str, "stopforwarder port=%d abort=yes", g_pForwardHost[nRank].nPort);			WriteString(pArg->pSocket[i], str);		    }		}		sprintf(str, "freeprocess %d", pArg->pId[i]);		WriteString(pArg->pSocket[i], str);		ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT);		WriteString(pArg->pSocket[i], "done");		easy_closesocket(pArg->pSocket[i]);		pArg->pSocket[i] = INVALID_SOCKET;		pArg->n--;	    }	    return;	}	for (i=0; n>0; i++)	{	    while (pArg->pSocket[i] == INVALID_SOCKET)		i++;	    if (FD_ISSET(pArg->pSocket[i], &readset))	    {		if (ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT))		{		    int nRank = pArg->pRank[i];		    		    if (strnicmp(str, "FAIL", 4) == 0)		    {			// get the error			sprintf(str, "geterror %d", pArg->pId[i]);			WriteString(pArg->pSocket[i], str);			ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT);			printf("getexitcode(rank %d) failed: %s\n", nRank, str);fflush(stdout);			if (g_bUseJobHost)			{			    UpdateJobKeyValue(nRank, "error", str);			    			    // get the time the process exited			    sprintf(str, "getexittime %d", pArg->pId[i]);			    WriteString(pArg->pSocket[i], str);			    ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT);			    UpdateJobKeyValue(nRank, "exittime", str);			}						if (easy_send(g_sockBreak, "x", 1) == SOCKET_ERROR)			{			    printf("Hard abort.\n");fflush(stdout);			    ExitProcess(-1);			}		    }		    else		    {			if (g_bUseJobHost)			{			    strtok(str, ":"); // strip the extra data from the string			    UpdateJobKeyValue(nRank, "exitcode", str);			    			    char *temp;			    if (g_bOutputExitCodes)				temp = strdup(str);			    // get the time the process exited			    sprintf(str, "getexittime %d", pArg->pId[i]);			    WriteString(pArg->pSocket[i], str);			    ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT);			    UpdateJobKeyValue(nRank, "exittime", str);			    if (g_bOutputExitCodes)			    {				printf("[rank %d exit code: %s, time: %s]\n", nRank, temp, str);fflush(stdout);				free(temp);			    }			}			else			{			    if (g_bOutputExitCodes)			    {				strtok(str, ":"); // strip the extra data from the string				printf("[rank %d exit code: %s]\n", nRank, str);fflush(stdout);			    }			}			if (!g_bNoMPI)			{			    sprintf(str, "getmpifinalized %d", pArg->pId[i]);			    WriteString(pArg->pSocket[i], str);			    ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT);			    if (stricmp(str, "yes") != 0)			    {				if (stricmp(str, "no") != 0)				    printf("getmpifinalized returned: %s\n", str);				else				{				    if (!g_bSuppressErrorOutput)					printf("process %d exited without calling MPIFinalize\n", nRank);				}				fflush(stdout);				easy_send(g_sockBreak, "x", 1);			    }			}		    }		    		    if (g_nNproc > FORWARD_NPROC_THRESHOLD)		    {			if (nRank > 0 && (g_nNproc/2) > nRank)			{			    sprintf(str, "stopforwarder port=%d abort=no", g_pForwardHost[nRank].nPort);			    WriteString(pArg->pSocket[i], str);			}		    }		    		    sprintf(str, "freeprocess %d", pArg->pId[i]);		    WriteString(pArg->pSocket[i], str);		    ReadStringTimeout(pArg->pSocket[i], str, g_nMPIRUN_SHORT_TIMEOUT);		    		    WriteString(pArg->pSocket[i], "done");		    easy_closesocket(pArg->pSocket[i]);		    FD_CLR(pArg->pSocket[i], &totalset);		    pArg->pSocket[i] = INVALID_SOCKET;		    n--;		    pArg->n--;		}		else		{		    PrintError(WSAGetLastError(), "ProcessWait:Reading the exit code for process %d failed\n", i);fflush(stdout);		    easy_closesocket(pArg->pSocket[i]);		    FD_CLR(pArg->pSocket[i], &totalset);		    pArg->pSocket[i] = INVALID_SOCKET;		    n--;		    pArg->n--;		    if (easy_send(g_sockBreak, "x", 1) == SOCKET_ERROR)		    {			printf("Unable to abort processes.\n");fflush(stdout);			ExitProcess(-1);		    }		    //return;		}	    }	}    }}// Function name	: WaitForExitCommands// Description	    : // Return type		: void void WaitForExitCommands(){    bool bKillSent = false;    if (g_nNumProcessSockets < FD_SETSIZE)    {	int i, n;	fd_set totalset, readset;	char str[256];	SOCKET break_sock;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -