⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpiexec.c

📁 fortran并行计算包
💻 C
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include <stdio.h>#include "mpiexec.h"#include "smpd.h"#include "smpd_implthread.h"#ifdef HAVE_WINDOWS_Hvoid timeout_thread(void *p){    MPIU_Size_t num_written;    char ch = -1;    SMPD_UNREFERENCED_ARG(p);    Sleep(smpd_process.timeout * 1000);    smpd_err_printf("\nmpiexec terminated job due to %d second timeout.\n", smpd_process.timeout);    if (smpd_process.timeout_sock != MPIDU_SOCK_INVALID_SOCK)    {	MPIDU_Sock_write(smpd_process.timeout_sock, &ch, 1, &num_written);	Sleep(30000); /* give the engine 30 seconds to shutdown and then force an exit */    }    ExitProcess((UINT)-1);}#else#ifdef SIGALRMvoid timeout_function(int signo){    MPIU_Size_t num_written;    char ch = -1;    static int alarmed = 0;    if (signo == SIGALRM)    {	if (smpd_process.timeout_sock != MPIDU_SOCK_INVALID_SOCK && alarmed == 0)	{	    alarmed = 1;	    smpd_err_printf("\nmpiexec terminated job due to %d second timeout.\n", smpd_process.timeout);	    MPIDU_Sock_write(smpd_process.timeout_sock, &ch, 1, &num_written);	    alarm(30); /* give the engine 30 seconds to shutdown and then force an exit */	}	else	{	    if (alarmed == 0)	    {		smpd_err_printf("\nmpiexec terminated job due to %d second timeout.\n", smpd_process.timeout);	    }	    exit(-1);	}    }}#else#ifdef HAVE_PTHREAD_Hvoid *timeout_thread(void *p){    MPIU_Size_t num_written;    char ch = -1;    sleep(smpd_process.timeout);    smpd_err_printf("\nmpiexec terminated job due to %d second timeout.\n", smpd_process.timeout);    if (smpd_process.timeout_sock != MPIDU_SOCK_INVALID_SOCK)    {	MPIDU_Sock_write(smpd_process.timeout_sock, &ch, 1, &num_written);	sleep(30); /* give the engine 30 seconds to shutdown and then force an exit */    }    exit(-1);}#endif#endif#endif#ifdef HAVE_WINDOWS_HBOOL WINAPI mpiexec_ctrl_handler(DWORD dwCtrlType){    static int first = 1;    char ch = -1;    MPIU_Size_t num_written;    /* This handler could be modified to send the event to the remote processes instead of killing the job. */    /* Doing so would require new command types for smpd */    switch (dwCtrlType)    {    case CTRL_LOGOFF_EVENT:	/* The logoff event could be a problem if two users are logged on to the same machine remotely.	 * If user A runs mpiexec and user B logs out of his session, this event will cause user A's	 * mpiexec command to kill the job and make user A mad.  But if we ignore the logoff event then 	 * when user A logs out mpiexec will hang?	 */	break;    case CTRL_C_EVENT:    case CTRL_BREAK_EVENT:    case CTRL_CLOSE_EVENT:    case CTRL_SHUTDOWN_EVENT:	if (smpd_process.mpiexec_abort_sock != MPIDU_SOCK_INVALID_SOCK)	{	    if (first == 1)	    {		/* The first break signal tries to abort the job with an abort message */		first = 0;		printf("mpiexec aborting job...\n");fflush(stdout);		num_written = 0;		MPIDU_Sock_write(smpd_process.mpiexec_abort_sock, &ch, 1, &num_written);		if (num_written == 1)		    return TRUE;	    }	    /* The second break signal unconditionally exits mpiexec */	}	if (smpd_process.hCloseStdinThreadEvent)	{	    SetEvent(smpd_process.hCloseStdinThreadEvent);	}	if (smpd_process.hStdinThread != NULL)	{	    /* close stdin so the input thread will exit */	    CloseHandle(GetStdHandle(STD_INPUT_HANDLE));	    if (WaitForSingleObject(smpd_process.hStdinThread, 3000) != WAIT_OBJECT_0)	    {		TerminateThread(smpd_process.hStdinThread, 321);	    }	    CloseHandle(smpd_process.hStdinThread);	}	smpd_exit(-1);	return TRUE;    }    return FALSE;}#endifint main(int argc, char* argv[]){    int result = SMPD_SUCCESS;    smpd_host_node_t *host_node_ptr;    smpd_launch_node_t *launch_node_ptr;    smpd_context_t *context;    MPIDU_Sock_set_t set;    MPIDU_Sock_t sock = MPIDU_SOCK_INVALID_SOCK;    smpd_state_t state;    smpd_enter_fn("main");    /* catch an empty command line */    if (argc < 2)    {	mp_print_options();	exit(0);    }    smpd_process.mpiexec_argv0 = argv[0];    /* initialize */    putenv("PMI_SMPD_FD=0");    result = PMPI_Init(&argc, &argv);    SMPD_CS_ENTER();    if (result != MPI_SUCCESS)    {	smpd_err_printf("MPI_Init failed,\nerror: %d\n", result);	smpd_exit_fn("main");	return result;    }    /* Initialization now done by [P]MPI_Init above    result = MPIDU_Sock_init();    if (result != MPI_SUCCESS)    {	smpd_err_printf("MPIDU_Sock_init failed,\nsock error: %s\n",		      get_sock_error_string(result));	smpd_exit_fn("main");	return result;    }    result = smpd_init_process();    if (result != SMPD_SUCCESS)    {	smpd_err_printf("smpd_init_process failed.\n");	goto quit_job;    }    */    smpd_process.dbg_state = SMPD_DBG_STATE_ERROUT;    /* parse the command line */    smpd_dbg_printf("parsing the command line.\n");    result = mp_parse_command_args(&argc, &argv);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("Unable to parse the mpiexec command arguments.\n");	goto quit_job;    }    /* print and see what we've got */    /* debugging output *************/    smpd_dbg_printf("host tree:\n");    host_node_ptr = smpd_process.host_list;    if (!host_node_ptr)	smpd_dbg_printf("<none>\n");    while (host_node_ptr)    {	smpd_dbg_printf(" host: %s, parent: %d, id: %d\n",	    host_node_ptr->host,	    host_node_ptr->parent, host_node_ptr->id);	host_node_ptr = host_node_ptr->next;    }    smpd_dbg_printf("launch nodes:\n");    launch_node_ptr = smpd_process.launch_list;    if (!launch_node_ptr)	smpd_dbg_printf("<none>\n");    while (launch_node_ptr)    {	smpd_dbg_printf(" iproc: %d, id: %d, exe: %s\n",	    launch_node_ptr->iproc, launch_node_ptr->host_id,	    launch_node_ptr->exe);	launch_node_ptr = launch_node_ptr->next;    }    /* end debug output *************/    /* set the id of the mpiexec node to zero */    smpd_process.id = 0;    result = MPIDU_Sock_create_set(&set);    if (result != MPI_SUCCESS)    {	smpd_err_printf("MPIDU_Sock_create_set failed,\nsock error: %s\n", get_sock_error_string(result));	goto quit_job;    }    smpd_process.set = set;    /* Check to see if the user wants to use a remote shell mechanism for launching the processes     * instead of using the smpd process managers.     */    if (smpd_process.rsh_mpiexec == SMPD_TRUE)    {	/* Do rsh or localonly stuff */	result = mpiexec_rsh();	/* skip over the non-rsh code and go to the cleanup section */	goto quit_job;    }    /* Start the timeout mechanism if specified */    /* This code occurs after the rsh_mpiexec option check because the rsh code implementes timeouts differently */    if (smpd_process.timeout > 0)    {#ifdef HAVE_WINDOWS_H	/* create a Windows thread to sleep until the timeout expires */	if (smpd_process.timeout_thread == NULL)	{	    smpd_process.timeout_thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)timeout_thread, NULL, 0, NULL);	    if (smpd_process.timeout_thread == NULL)	    {		printf("Error: unable to create a timeout thread, errno %d.\n", GetLastError());		smpd_exit_fn("mp_parse_command_args");		return SMPD_FAIL;	    }	}#elif defined(SIGALRM)	/* create an alarm to signal mpiexec when the timeout expires */	smpd_signal(SIGALRM, timeout_function);	alarm(smpd_process.timeout);#elif defined(HAVE_PTHREAD_H)	/* create a pthread to sleep until the timeout expires */	result = pthread_create(&smpd_process.timeout_thread, NULL, timeout_thread, NULL);	if (result != 0)	{	    printf("Error: unable to create a timeout thread, errno %d.\n", result);	    smpd_exit_fn("mp_parse_command_args");	    return SMPD_FAIL;	}#else	/* no timeout mechanism available */#endif    }    /* make sure we have a passphrase to authenticate connections to the smpds */    if (smpd_process.passphrase[0] == '\0')	smpd_get_smpd_data("phrase", smpd_process.passphrase, SMPD_PASSPHRASE_MAX_LENGTH);    if (smpd_process.passphrase[0] == '\0')    {	if (smpd_process.noprompt)	{	    printf("Error: No smpd passphrase specified through the registry or .smpd file, exiting.\n");	    result = SMPD_FAIL;	    goto quit_job;	}	printf("Please specify an authentication passphrase for smpd: ");	fflush(stdout);	smpd_get_password(smpd_process.passphrase);    }    /* set the state to create a console session or a job session */    state = smpd_process.do_console ? SMPD_MPIEXEC_CONNECTING_SMPD : SMPD_MPIEXEC_CONNECTING_TREE;    result = smpd_create_context(SMPD_CONTEXT_LEFT_CHILD, set, sock, 1, &context);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("unable to create a context for the first host in the tree.\n");	goto quit_job;    }#ifdef HAVE_WINDOWS_H    if (!smpd_process.local_root)    {#endif	/* start connecting the tree by posting a connect to the first host */	result = MPIDU_Sock_post_connect(set, context, smpd_process.host_list->host, smpd_process.port, &sock);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("Unable to connect to '%s:%d',\nsock error: %s\n",		smpd_process.host_list->host, smpd_process.port, get_sock_error_string(result));	    goto quit_job;	}#ifdef HAVE_WINDOWS_H    }#endif    context->sock = sock;    context->state = state;    context->connect_to = smpd_process.host_list;#ifdef HAVE_WINDOWS_H    if (smpd_process.local_root)    {	int port;	smpd_context_t *rc_context;	/* The local_root option is implemented by having mpiexec act as the smpd	 * and launch the smpd manager.  Then mpiexec connects to this manager just	 * as if it had been created by a real smpd.  This causes all the processes	 * destined for the first smpd host to be launched by this child process of	 * mpiexec and not the smpd service.  This allows for these processes to	 * create windows that are visible to the interactive user.  It also means 	 * that the job cannot be run in the context of a user other than the user	 * running mpiexec. */	/* get the path to smpd.exe because pszExe is currently mpiexec.exe */	smpd_get_smpd_data("binary", smpd_process.pszExe, SMPD_MAX_EXE_LENGTH);	/* launch the manager process */	result = smpd_start_win_mgr(context, SMPD_FALSE);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to start the local smpd manager.\n");	    goto quit_job;	}	/* connect to the manager */	smpd_dbg_printf("connecting a new socket.\n");	port = atol(context->port_str);	if (port < 1)	{	    smpd_err_printf("Invalid reconnect port read: %d\n", port);	    goto quit_job;	}	result = smpd_create_context(context->type, context->set, MPIDU_SOCK_INVALID_SOCK, context->id, &rc_context);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to create a new context for the reconnection.\n");	    goto quit_job;	}	rc_context->state = context->state;	rc_context->write_state = SMPD_RECONNECTING;	context->state = SMPD_CLOSING;	rc_context->connect_to = context->connect_to;	rc_context->connect_return_id = context->connect_return_id;	rc_context->connect_return_tag = context->connect_return_tag;	strcpy(rc_context->host, context->host);	smpd_process.left_context = rc_context;	smpd_dbg_printf("posting a re-connect to %s:%d in %s context.\n", rc_context->connect_to->host, port, smpd_get_context_str(rc_context));	result = MPIDU_Sock_post_connect(rc_context->set, rc_context, rc_context->connect_to->host, port, &rc_context->sock);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("Unable to post a connect to '%s:%d',\nsock error: %s\n",		rc_context->connect_to->host, port, get_sock_error_string(result));	    if (smpd_post_abort_command("Unable to connect to '%s:%d',\nsock error: %s\n",		rc_context->connect_to->host, port, get_sock_error_string(result)) != SMPD_SUCCESS)	    {		goto quit_job;	    }	}    }    else    {#endif	smpd_process.left_context = context;	result = MPIDU_Sock_set_user_ptr(sock, context);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("unable to set the smpd sock user pointer,\nsock error: %s\n",		get_sock_error_string(result));	    goto quit_job;	}#ifdef HAVE_WINDOWS_H    }#endif#ifdef HAVE_WINDOWS_H    {	/* Create a break handler and a socket to handle aborting the job when mpiexec receives break signals */	smpd_context_t *reader_context;	MPIDU_Sock_t sock_reader;	MPIDU_SOCK_NATIVE_FD reader, writer;	smpd_make_socket_loop((SOCKET*)&reader, (SOCKET*)&writer);	result = MPIDU_Sock_native_to_sock(set, reader, NULL, &sock_reader);	result = MPIDU_Sock_native_to_sock(set, writer, NULL, &smpd_process.mpiexec_abort_sock);	result = smpd_create_context(SMPD_CONTEXT_MPIEXEC_ABORT, set, sock_reader, -1, &reader_context);	reader_context->read_state = SMPD_READING_MPIEXEC_ABORT;	result = MPIDU_Sock_post_read(sock_reader, &reader_context->read_cmd.cmd, 1, 1, NULL);	if (!SetConsoleCtrlHandler(mpiexec_ctrl_handler, TRUE))	{	    /* Don't error out; allow the job to run without a ctrl handler? */	    result = GetLastError();	    smpd_dbg_printf("unable to set a ctrl handler for mpiexec, error %d\n", result);	}    }#endif    result = smpd_enter_at_state(set, state);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("state machine failed.\n");	goto quit_job;    }quit_job:    if ((result != SMPD_SUCCESS) && (smpd_process.mpiexec_exit_code == 0))    {	smpd_process.mpiexec_exit_code = -1;    }    /* finalize */    /*    smpd_dbg_printf("calling MPIDU_Sock_finalize\n");    result = MPIDU_Sock_finalize();    if (result != MPI_SUCCESS)    {	smpd_err_printf("MPIDU_Sock_finalize failed,\nsock error: %s\n", get_sock_error_string(result));    }    */    /* MPI_Finalize called in smpd_exit()    smpd_dbg_printf("calling MPI_Finalize\n");    result = PMPI_Finalize();    if (result != MPI_SUCCESS)    {	smpd_err_printf("MPI_Finalize failed,\nerror: %d\n", result);    }    */#ifdef HAVE_WINDOWS_H    if (smpd_process.hCloseStdinThreadEvent)	SetEvent(smpd_process.hCloseStdinThreadEvent);    if (smpd_process.hStdinThread != NULL)    {	/* close stdin so the input thread will exit */	CloseHandle(GetStdHandle(STD_INPUT_HANDLE));	if (WaitForSingleObject(smpd_process.hStdinThread, 3000) != WAIT_OBJECT_0)	{	    TerminateThread(smpd_process.hStdinThread, 321);	}	CloseHandle(smpd_process.hStdinThread);    }    if (smpd_process.hCloseStdinThreadEvent)    {	CloseHandle(smpd_process.hCloseStdinThreadEvent);	smpd_process.hCloseStdinThreadEvent = NULL;    }#elif defined(USE_PTHREAD_STDIN_REDIRECTION)    smpd_cancel_stdin_thread();#endif    smpd_exit_fn("main");    SMPD_CS_EXIT();    return smpd_exit(smpd_process.mpiexec_exit_code);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -