⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 smpd_handle_command.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 5 页
字号:
#undef FCNAME#define FCNAME "smpd_handle_result"int smpd_handle_result(smpd_context_t *context){    int result, ret_val;    char str[SMPD_MAX_CMD_LENGTH];    char err_msg[SMPD_MAX_ERROR_LEN];    smpd_command_t *iter, *trailer, *cmd_ptr;    int match_tag;    char ctx_key[100];    int process_id;    smpd_context_t *pmi_context;    smpd_process_t *piter;    int rank;    MPIDU_Sock_t insock;    MPIDU_SOCK_NATIVE_FD stdin_fd;    smpd_context_t *context_in;#ifdef HAVE_WINDOWS_H    DWORD dwThreadID;    SOCKET hWrite;#endif#ifdef USE_PTHREAD_STDIN_REDIRECTION    int fd[2];#endif    char pg_id[SMPD_MAX_DBS_NAME_LEN+1] = "";    char pg_ctx[100] = "";    int pg_rank = -1;    smpd_process_group_t *pg;    char host_description[256];    int listener_port;    char context_str[20];    smpd_context_t *orig_context;    int num_decoded;    smpd_enter_fn(FCNAME);    if (context->type != SMPD_CONTEXT_PMI && MPIU_Str_get_string_arg(context->read_cmd.cmd, "ctx_key", ctx_key, 100) == MPIU_STR_SUCCESS)    {	process_id = atoi(ctx_key);	smpd_dbg_printf("forwarding the dbs result command to the pmi context %d.\n", process_id);	pmi_context = NULL;	piter = smpd_process.process_list;	while (piter)	{	    if (piter->id == process_id)	    {		pmi_context = piter->pmi;		break;	    }	    piter = piter->next;	}	if (pmi_context == NULL)	{	    smpd_err_printf("received dbs result for a pmi context that doesn't exist: unmatched id = %d\n", process_id);	    smpd_exit_fn(FCNAME);	    return SMPD_SUCCESS;	}	result = smpd_forward_command(context, pmi_context);	smpd_exit_fn(FCNAME);	return result;    }    if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "cmd_tag", &match_tag) != MPIU_STR_SUCCESS)    {	smpd_err_printf("result command received without a cmd_tag field: '%s'\n", context->read_cmd.cmd);	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    trailer = iter = context->wait_list;    while (iter)    {	if (iter->tag == match_tag)	{	    ret_val = SMPD_SUCCESS;	    if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "result", str, SMPD_MAX_CMD_LENGTH) == MPIU_STR_SUCCESS)	    {		if (strcmp(iter->cmd_str, "connect") == 0)		{		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			ret_val = SMPD_CONNECTED;			switch (context->state)			{			case SMPD_MPIEXEC_CONNECTING_TREE:			    smpd_dbg_printf("successful connect, state: connecting tree.\n");			    {				smpd_host_node_t *host_node_iter;				/* find the host node that this connect command was issued for and set the connect_to variable to to point to it */				host_node_iter = smpd_process.host_list;				context->connect_to = NULL;				while (host_node_iter != NULL)				{				    if (host_node_iter->connect_cmd_tag == match_tag)				    {					context->connect_to = host_node_iter;					break;				    }				    host_node_iter = host_node_iter->next;				}			    }			    break;			case SMPD_MPIEXEC_CONNECTING_SMPD:			    smpd_dbg_printf("successful connect, state: connecting smpd.\n");			    break;			case SMPD_CONNECTING:			    smpd_dbg_printf("successful connect, state: connecting.\n");			    break;			default:			    break;			}		    }		    else		    {			smpd_err_printf("connect failed: %s\n", str);			ret_val = SMPD_ABORT;		    }		}		else if (strcmp(iter->cmd_str, "launch") == 0)		{		    smpd_process.nproc_launched++;		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			MPIU_Str_get_string_arg(context->read_cmd.cmd, "pg_id", pg_id, SMPD_MAX_DBS_NAME_LEN);			MPIU_Str_get_int_arg(context->read_cmd.cmd, "pg_rank", &pg_rank);			MPIU_Str_get_string_arg(context->read_cmd.cmd, "pg_ctx", pg_ctx, 100);			pg = smpd_process.pg_list;			while (pg)			{			    if (strcmp(pg->kvs, pg_id) == 0)			    {				MPIU_Strncpy(pg->processes[pg_rank].ctx_key, pg_ctx, 100);				break;			    }			    pg = pg->next;			}			if (context->spawn_context)			{			    smpd_dbg_printf("successfully spawned: '%s'\n", iter->cmd);			    /*printf("successfully spawned: '%s'\n", iter->cmd);fflush(stdout);*/			    context->spawn_context->num_outstanding_launch_cmds--;			    smpd_dbg_printf("%d outstanding spawns.\n", context->spawn_context->num_outstanding_launch_cmds);			    if (context->spawn_context->num_outstanding_launch_cmds == 0)			    {				/* add the result string */				result = smpd_add_command_arg(context->spawn_context->result_cmd, "result", SMPD_SUCCESS_STR);				if (result != SMPD_SUCCESS)				{				    smpd_err_printf("unable to add the result string to the result command.\n");				    smpd_exit_fn(FCNAME);				    return result;				}				/* send the spawn result command */				result = smpd_post_write_command(context, context->spawn_context->result_cmd);				if (result != SMPD_SUCCESS)				{				    smpd_err_printf("unable to post a write of the spawn result command.\n");				    smpd_exit_fn(FCNAME);				    return result;				}				smpd_process.spawning = SMPD_FALSE;				result = smpd_handle_delayed_spawn_command();				if (result != SMPD_SUCCESS)				{				    smpd_err_printf("An error occurred handling delayed spawn commands.\n");				    smpd_exit_fn(FCNAME);				    return result;				}			    }			}			else			{			    smpd_dbg_printf("successfully launched: '%s'\n", iter->cmd);			    if (!smpd_process.stdin_redirecting)			    {				rank = 0;				MPIU_Str_get_int_arg(iter->cmd, "i", &rank);				if (rank == 0)				{				    smpd_dbg_printf("root process launched, starting stdin redirection.\n");				    /* get a handle to stdin */#ifdef HAVE_WINDOWS_H				    result = smpd_make_socket_loop((SOCKET*)&stdin_fd, &hWrite);				    if (result)				    {					smpd_err_printf("Unable to make a local socket loop to forward stdin.\n");					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }#elif defined(USE_PTHREAD_STDIN_REDIRECTION)				    socketpair(AF_UNIX, SOCK_STREAM, 0, fd);				    smpd_process.stdin_read = fd[0];				    smpd_process.stdin_write = fd[1];				    stdin_fd = fd[0];#else				    stdin_fd = fileno(stdin);#endif				    /* convert the native handle to a sock */				    /*printf("stdin native sock %d\n", stdin_fd);fflush(stdout);*/				    result = MPIDU_Sock_native_to_sock(smpd_process.set, stdin_fd, NULL, &insock);				    if (result != MPI_SUCCESS)				    {					smpd_err_printf("unable to create a sock from stdin,\nsock error: %s\n", get_sock_error_string(result));					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    /* create a context for reading from stdin */				    result = smpd_create_context(SMPD_CONTEXT_MPIEXEC_STDIN, smpd_process.set, insock, -1, &context_in);				    if (result != SMPD_SUCCESS)				    {					smpd_err_printf("unable to create a context for stdin.\n");					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    MPIDU_Sock_set_user_ptr(insock, context_in);#ifdef HAVE_WINDOWS_H				    /* unfortunately, we cannot use stdin directly as a sock.  So, use a thread to read and forward				    stdin to a sock */				    smpd_process.hCloseStdinThreadEvent = CreateEvent(NULL, TRUE, FALSE, NULL);				    if (smpd_process.hCloseStdinThreadEvent == NULL)				    {					smpd_err_printf("Unable to create the stdin thread close event, error %d\n", GetLastError());					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    smpd_process.hStdinThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)smpd_stdin_thread, (void*)hWrite, 0, &dwThreadID);				    if (smpd_process.hStdinThread == NULL)				    {					smpd_err_printf("Unable to create a thread to read stdin, error %d\n", GetLastError());					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }#elif defined(USE_PTHREAD_STDIN_REDIRECTION)				    if (pthread_create(&smpd_process.stdin_thread, NULL, smpd_pthread_stdin_thread, NULL) != 0)				    {					smpd_err_printf("Unable to create a thread to read stdin, error %d\n", errno);					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    /*pthread_detach(smpd_process.stdin_thread);*/#endif				    /* set this variable first before posting the first read to avoid a race condition? */				    smpd_process.stdin_redirecting = SMPD_TRUE;				    /* post a read for a user command from stdin */				    context_in->read_state = SMPD_READING_STDIN;				    result = MPIDU_Sock_post_read(insock, context_in->read_cmd.cmd, 1, 1, NULL);				    if (result != MPI_SUCCESS)				    {					smpd_err_printf("unable to post a read on stdin for an incoming user command, error:\n%s\n",					    get_sock_error_string(result));					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				}			    }			}			if (pg && pg->processes[pg_rank].suspend_cmd != NULL)			{			    /* send the delayed suspend command */			    result = smpd_post_write_command(context, pg->processes[pg_rank].suspend_cmd);			    if (result != SMPD_SUCCESS)			    {				smpd_err_printf("unable to post a write of the suspend command.\n");				smpd_exit_fn(FCNAME);				return result;			    }			}		    }		    else		    {			/* FIXME: We shouldn't abort if a process fails to launch as part of a spawn command */			smpd_process.nproc_exited++;			if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "error", err_msg, SMPD_MAX_ERROR_LEN) == MPIU_STR_SUCCESS)			{			    smpd_err_printf("launch failed: %s\n", err_msg);			}			else			{			    smpd_err_printf("launch failed: %s\n", str);			}			ret_val = SMPD_ABORT;		    }		}		else if (strcmp(iter->cmd_str, "start_dbs") == 0)		{		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			if (context->spawn_context)			{			    if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "kvs_name", context->spawn_context->kvs_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)			    {				if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "domain_name", context->spawn_context->domain_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)				{				    smpd_dbg_printf("start_dbs succeeded, kvs_name: '%s', domain_name: '%s'\n", context->spawn_context->kvs_name, context->spawn_context->domain_name);				    ret_val = smpd_launch_processes(context->spawn_context->launch_list, context->spawn_context->kvs_name, context->spawn_context->domain_name, context->spawn_context);				}				else				{				    smpd_err_printf("invalid start_dbs result returned, no domain_name specified: '%s'\n", context->read_cmd.cmd);				    ret_val = SMPD_FAIL;				}			    }			    else			    {				smpd_err_printf("invalid start_dbs result returned, no kvs_name specified: '%s'\n", context->read_cmd.cmd);				ret_val = SMPD_FAIL;			    }			}			else			{			    if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "kvs_name", smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)			    {				if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "domain_name", smpd_process.domain_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)				{				    smpd_dbg_printf("start_dbs succeeded, kvs_name: '%s', domain_name: '%s'\n", smpd_process.kvs_name, smpd_process.domain_name);				    if (smpd_process.launch_list != NULL)				    {					ret_val = smpd_launch_processes(smpd_process.launch_list, smpd_process.kvs_name, smpd_process.domain_name, NULL);				    }				    else				    {					/* mpiexec connected to an smpd without any processes to launch.  This means it is running -pmiserver */					create_process_group(smpd_process.nproc, smpd_process.kvs_name, &pg);					result = smpd_create_command("pmi_listen", 0, 1, SMPD_TRUE, &cmd_ptr);					if (result == SMPD_SUCCESS)					{					    result = smpd_add_command_int_arg(cmd_ptr, "nproc", smpd_process.nproc);					    if (result == SMPD_SUCCESS)					    {						result = smpd_post_write_command(smpd_process.left_context, cmd_ptr);						if (result != SMPD_SUCCESS)						{						    smpd_err_printf("unable to post a write for the pmi_listen command\n");						    ret_val = SMPD_FAIL;						}					    }					    else					    {						smpd_err_printf("unable to add the nproc to the pmi_listen command\n");						ret_val = SMPD_FAIL;					    }					}					else					{					    smpd_err_printf("unable to create a pmi_listen command.\n");					    ret_val = SMPD_FAIL;					}				    }				}				else				{				    smpd_err_printf("invalid start_dbs result returned, no domain_name specified: '%s'\n", context->read_cmd.cmd);				    ret_val = SMPD_FAIL;				}			    }			    else			    {				smpd_err_printf("invalid start_dbs result returned, no kvs_name specified: '%s'\n", context->read_cmd.cmd);				ret_val = SMPD_FAIL;			    }			}		    }		    else		    {			smpd_err_printf("start_dbs failed: %s\n", str);			ret_val = SMPD_ABORT;		    }		}		else if (strcmp(iter->cmd_str, "pmi_listen") == 0)		{		    MPIU_Str_get_string_arg(context->read_cmd.cmd, "host_description", host_description, 256);		    MPIU_Str_get_int_arg(context->read_cmd.cmd, "listener_port", &listener_port);            /* SINGLETON: If no port to connect back print this information else connect to the port              * and execute the SINGLETON INIT protocol              */            if(smpd_process.singleton_client_port > 0){                smpd_context_t *p_singleton_mpiexec_context;                char connect_to_host[SMPD_SINGLETON_MAX_HOST_NAME_LEN];                /* Create a context */                result = smpd_create_context(SMPD_CONTEXT_SINGLETON_INIT_MPIEXEC, context->set,                                             MPIDU_SOCK_INVALID_SOCK, -1, &p_singleton_mpiexec_context);                if(result != SMPD_SUCCESS){                    context->state = SMPD_DONE;                    smpd_err_printf("smpd_create_context failed, error = %d\n", result);                    return SMPD_FAIL;                }                p_singleton_mpiexec_context->state = SMPD_SINGLETON_MPIEXEC_CONNECTING;                p_singleton_mpiexec_context->singleton_init_pm_port = listener_port;                strncpy(p_singleton_mpiexec_context->singleton_init_hostname, host_description,                         SMPD_SINGLETON_MAX_HOST_NAME_LEN);                strncpy(p_singleton_mpiexec_context->singleton_init_kvsname, smpd_process.kvs_name,                        SMPD_SINGLETON_MAX_KVS_NAME_LEN);                /* Post a connect */                /* FIXME : mismatch btw size of connect_to->host and max host name len */                result = get_hostname(connect_to_host, SMPD_SINGLETON_MAX_HOST_NAME_LEN);                if(result != SMPD_SUCCESS){                    context->state = SMPD_DONE;                    smpd_err_printf("gethostname failed, error = %d\n", result);                    result = smpd_free_context(p_singleton_mpiexec_context);                    if(result != SMPD_SUCCESS){ 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -