📄 pmi_spawn.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* $Id: pmi_spawn.c,v 1.1 2002/10/07 22:12:45 toonen Exp $ * * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "pmiimpl.h"#include "mpdutil.h"#include "mpd.h"#include "bsocket.h"#include <stdio.h>#include "mpichinfo.h"#include "redirectio.h"#ifdef HAVE_STRING_H#include <string.h>#endif#ifndef err_printf#define err_printf printf#endiftypedef struct Spawn_node_st{ int pid; int launchid; char fwd_host[100]; int fwd_port;} Spawn_node;typedef struct Spawn_struct_st{ int m_nNproc; Spawn_node *m_pNode; int m_bfd; int m_bfdStop; HANDLE m_hRedirectIOThread; HANDLE m_hThread; struct Spawn_struct_st *m_pNext;} Spawn_struct;typedef struct HostNode_st{ char pszHost[100]; int nSMPProcs; struct HostNode_st *pNext;} HostNode;Spawn_struct *g_pSpawnList = NULL;char g_pszIOHost[100] = "";int g_nIOPort = 0;HANDLE g_hSpawnMutex = NULL;HANDLE g_hJobThreads[100];int g_nNumJobThreads = 0;Spawn_struct* CreateSpawn_struct(){ Spawn_struct *p = (Spawn_struct*)malloc(sizeof(Spawn_struct)); p->m_bfd = BFD_INVALID_SOCKET; p->m_bfdStop = BFD_INVALID_SOCKET; p->m_hRedirectIOThread = NULL; p->m_nNproc = 0; p->m_pNode = NULL; p->m_pNext = NULL; p->m_hThread = NULL; return p;}Spawn_struct* CreateSpawn_structn(int n){ int i; Spawn_struct *p = CreateSpawn_struct(); p->m_nNproc = n; p->m_pNode = (Spawn_node*)malloc(sizeof(Spawn_node)*n); for (i=0; i<n; i++) { p->m_pNode[i].fwd_host[0] = '\0'; p->m_pNode[i].fwd_port = 0; p->m_pNode[i].launchid = -1; p->m_pNode[i].pid = 0; } return p;}void FreeSpawn_struct(Spawn_struct *p){ p->m_nNproc = 0; if (p->m_pNode) free(p->m_pNode); p->m_pNode = NULL; if (p->m_hRedirectIOThread != NULL) { if (p->m_bfdStop != BFD_INVALID_SOCKET) { beasy_send(p->m_bfdStop, "x", 1); if (WaitForSingleObject(p->m_hRedirectIOThread, 10000) == WAIT_TIMEOUT) TerminateThread(p->m_hRedirectIOThread, 0); } else TerminateThread(p->m_hRedirectIOThread, 0); CloseHandle(p->m_hRedirectIOThread); } p->m_hRedirectIOThread = NULL; if (p->m_bfd != BFD_INVALID_SOCKET) { beasy_closesocket(p->m_bfd); p->m_bfd = BFD_INVALID_SOCKET; } if (p->m_bfdStop != BFD_INVALID_SOCKET) { beasy_closesocket(p->m_bfdStop); p->m_bfdStop = BFD_INVALID_SOCKET; } p->m_pNext = NULL; /* don't touch the m_hThread member because PMI_Finalize may be waiting on it?*/ if (!g_bPMIFinalizeWaiting && p->m_hThread != NULL) { CloseHandle(p->m_hThread); p->m_hThread = NULL; } free(p);}static BOOL GetHostsFromFile(char *pszFileName, HostNode **ppNode, int nNumWanted){ FILE *fin; char buffer[1024] = ""; char *pChar, *pChar2; HostNode *node = NULL, *list = NULL, *cur_node; int num_left; HostNode *n, *target; /* check the parameters*/ if ((nNumWanted < 1) || (ppNode == NULL)) return FALSE; /* open the file*/ fin = fopen(pszFileName, "r"); if (fin == NULL) { err_printf("Error: unable to open file '%s', error %d\n", pszFileName, GetLastError()); return FALSE; } /* Read the host names from the file*/ while (fgets(buffer, 1024, fin)) { pChar = buffer; /* Advance over white space*/ while (*pChar != '\0' && isspace(*pChar)) pChar++; if (*pChar == '#' || *pChar == '\0') continue; /* Trim trailing white space*/ pChar2 = &buffer[strlen(buffer)-1]; while (isspace(*pChar2) && (pChar >= pChar)) { *pChar2 = '\0'; pChar2--; } /* If there is anything left on the line, consider it a host name*/ if (strlen(pChar) > 0) { node = (HostNode*)malloc(sizeof(HostNode)); node->nSMPProcs = 1; node->pNext = NULL; /* Copy the host name*/ pChar2 = node->pszHost; while (*pChar != '\0' && !isspace(*pChar)) { *pChar2 = *pChar; pChar++; pChar2++; } *pChar2 = '\0'; pChar2 = strtok(node->pszHost, ":"); pChar2 = strtok(NULL, "\n"); if (pChar2 != NULL) { node->nSMPProcs = atoi(pChar2); if (node->nSMPProcs < 1) node->nSMPProcs = 1; } else { /* Advance over white space*/ while (*pChar != '\0' && isspace(*pChar)) pChar++; /* Get the number of SMP processes*/ if (*pChar != '\0') { node->nSMPProcs = atoi(pChar); if (node->nSMPProcs < 1) node->nSMPProcs = 1; } } if (list == NULL) { list = node; cur_node = node; } else { cur_node->pNext = node; cur_node = node; } } } fclose(fin); if (list == NULL) return FALSE; /* Allocate the first host node*/ node = (HostNode*)malloc(sizeof(HostNode)); num_left = nNumWanted; n = list; target = node; /* add the nodes to the target list, cycling if necessary*/ while (num_left) { target->pNext = NULL; strncpy(target->pszHost, n->pszHost, 100); if (num_left <= n->nSMPProcs) { target->nSMPProcs = num_left; num_left = 0; } else { target->nSMPProcs = n->nSMPProcs; num_left = num_left - n->nSMPProcs; } if (num_left) { target->pNext = (HostNode*)malloc(sizeof(HostNode)); target = target->pNext; } n = n->pNext; if (n == NULL) n = list; } /* free the list created from the file*/ while (list) { n = list; list = list->pNext; free(n); } /* add the generated list to the end of the list passed in*/ if (*ppNode == NULL) { *ppNode = node; } else { cur_node = *ppNode; while (cur_node->pNext != NULL) cur_node = cur_node->pNext; cur_node->pNext = node; } return TRUE;}static void FreeHosts(HostNode *pNode){ HostNode *n; while (pNode) { n = pNode; pNode = pNode->pNext; free(n); }}static void GetHost(HostNode *pList, int nRank, char *pszHost){ nRank++; while (nRank > 0) { if (pList == NULL) return; nRank = nRank - pList->nSMPProcs; if (nRank > 0) pList = pList->pNext; } if (pList == NULL) return; strncpy(pszHost, pList->pszHost, 100);}static void CreateCommand(int count, const int *maxprocs, const char **cmds, const char ***argvs, int nIproc, char *pszCmd){ int i = 0; const char **ppArg; nIproc++; while (nIproc > 0) { if (i >= count) return; nIproc = nIproc - maxprocs[i]; if (nIproc > 0) i++; } if (i >= count) return; snprintf(pszCmd, 1024, "\"%s\"", cmds[i]); if (argvs == NULL) return; ppArg = argvs[i]; while (ppArg) { strncat(pszCmd, " ", 2); strncat(pszCmd, *ppArg, 1024-strlen(pszCmd)); ppArg++; }}static void RemoveSpawnThread(Spawn_struct *pSpawn){ int i; if (g_bPMIFinalizeWaiting) return; WaitForSingleObject(g_hSpawnMutex, INFINITE); for (i=0; i<g_nNumJobThreads; i++) { if (g_hJobThreads[i] == pSpawn->m_hThread) { g_nNumJobThreads--; g_hJobThreads[i] = g_hJobThreads[g_nNumJobThreads]; CloseHandle(pSpawn->m_hThread); pSpawn->m_hThread = NULL; break; } } ReleaseMutex(g_hSpawnMutex);}static void RemoveSpawnStruct(Spawn_struct *pSpawn){ Spawn_struct *p, *pTrailer; WaitForSingleObject(g_hSpawnMutex, INFINITE); if (pSpawn == g_pSpawnList) { g_pSpawnList = g_pSpawnList->m_pNext; ReleaseMutex(g_hSpawnMutex); return; } pTrailer = g_pSpawnList; p = g_pSpawnList->m_pNext; while (p) { if (p == pSpawn) { pTrailer->m_pNext = p->m_pNext; ReleaseMutex(g_hSpawnMutex); return; } pTrailer = pTrailer->m_pNext; p = p->m_pNext; } ReleaseMutex(g_hSpawnMutex);}void SpawnWaitThread(Spawn_struct *pSpawn){ int i,j; char pszStr[1024]; int nPid; char *token; for (i=0; i<pSpawn->m_nNproc; i++) { snprintf(pszStr, 1024, "getexitcodewait %d", pSpawn->m_pNode[i].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); FreeSpawn_struct(pSpawn); return; } } for (i=0; i<pSpawn->m_nNproc; i++) { if (!ReadString(pSpawn->m_bfd, pszStr)) { err_printf("ReadString(exitcode) failed, error %d\n", WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); FreeSpawn_struct(pSpawn); return; } token = strtok(pszStr, ":"); if (token != NULL) { token = strtok(NULL, "\n"); if (token != NULL) { nPid = atoi(token); for (j=0; j<pSpawn->m_nNproc; j++) { if (pSpawn->m_pNode[j].pid == nPid) { if ((j > 0) && ((pSpawn->m_nNproc/2) > j)) { snprintf(pszStr, 1024, "stopforwarder host=%s port=%d abort=no", pSpawn->m_pNode[j].fwd_host, pSpawn->m_pNode[j].fwd_port); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); FreeSpawn_struct(pSpawn); return; } } snprintf(pszStr, 1024, "freeprocess %d", pSpawn->m_pNode[j].launchid); if (WriteString(pSpawn->m_bfd, pszStr) == SOCKET_ERROR) { err_printf("WriteString('%s') failed, error %d\n", pszStr, WSAGetLastError()); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); FreeSpawn_struct(pSpawn); return; } } } } } } if (pSpawn->m_bfdStop != BFD_INVALID_SOCKET) { /* tell the redirection thread to stop*/ pszStr[0] = 0; beasy_send(pSpawn->m_bfdStop, pszStr, 1); } if (pSpawn->m_hRedirectIOThread != NULL) { if (WaitForSingleObject(pSpawn->m_hRedirectIOThread, 10000) != WAIT_OBJECT_0) { TerminateThread(pSpawn->m_hRedirectIOThread, 0); } CloseHandle(pSpawn->m_hRedirectIOThread); pSpawn->m_hRedirectIOThread = NULL; } WriteString(pSpawn->m_bfd, "done"); beasy_closesocket(pSpawn->m_bfd); RemoveSpawnThread(pSpawn); RemoveSpawnStruct(pSpawn); FreeSpawn_struct(pSpawn);}int PMI_Spawn_multiple(int count, const char *cmds[], const char **argvs[], const int *maxprocs, const void *info, int *errors, int *same_domain, const void *preput_info){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -