⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 smpd_handle_command.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 5 页
字号:
	    if (piter->id == process_id)	    {		pmi_context = piter->pmi;		break;	    }	    piter = piter->next;	}	if (pmi_context == NULL)	{	    smpd_err_printf("received dbs result for a pmi context that doesn't exist: unmatched id = %d\n", process_id);	    smpd_exit_fn(FCNAME);	    return SMPD_SUCCESS;	}	result = smpd_forward_command(context, pmi_context);	smpd_exit_fn(FCNAME);	return result;    }    if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "cmd_tag", &match_tag) != MPIU_STR_SUCCESS)    {	smpd_err_printf("result command received without a cmd_tag field: '%s'\n", context->read_cmd.cmd);	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    trailer = iter = context->wait_list;    while (iter)    {	if (iter->tag == match_tag)	{	    ret_val = SMPD_SUCCESS;	    if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "result", str, SMPD_MAX_CMD_LENGTH) == MPIU_STR_SUCCESS)	    {		if (strcmp(iter->cmd_str, "connect") == 0)		{		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			ret_val = SMPD_CONNECTED;			switch (context->state)			{			case SMPD_MPIEXEC_CONNECTING_TREE:			    smpd_dbg_printf("successful connect, state: connecting tree.\n");			    {				smpd_host_node_t *host_node_iter;				/* find the host node that this connect command was issued for and set the connect_to variable to to point to it */				host_node_iter = smpd_process.host_list;				context->connect_to = NULL;				while (host_node_iter != NULL)				{				    if (host_node_iter->connect_cmd_tag == match_tag)				    {					context->connect_to = host_node_iter;					break;				    }				    host_node_iter = host_node_iter->next;				}			    }			    break;			case SMPD_MPIEXEC_CONNECTING_SMPD:			    smpd_dbg_printf("successful connect, state: connecting smpd.\n");			    break;			case SMPD_CONNECTING:			    smpd_dbg_printf("successful connect, state: connecting.\n");			    break;			default:			    break;			}		    }		    else		    {			smpd_err_printf("connect failed: %s\n", str);			ret_val = SMPD_ABORT;		    }		}		else if (strcmp(iter->cmd_str, "launch") == 0)		{		    smpd_process.nproc_launched++;		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			MPIU_Str_get_string_arg(context->read_cmd.cmd, "pg_id", pg_id, SMPD_MAX_DBS_NAME_LEN);			MPIU_Str_get_int_arg(context->read_cmd.cmd, "pg_rank", &pg_rank);			MPIU_Str_get_string_arg(context->read_cmd.cmd, "pg_ctx", pg_ctx, 100);			pg = smpd_process.pg_list;			while (pg)			{			    if (strcmp(pg->kvs, pg_id) == 0)			    {				MPIU_Strncpy(pg->processes[pg_rank].ctx_key, pg_ctx, 100);				break;			    }			    pg = pg->next;			}			if (context->spawn_context)			{			    smpd_dbg_printf("successfully spawned: '%s'\n", iter->cmd);			    /*printf("successfully spawned: '%s'\n", iter->cmd);fflush(stdout);*/			    context->spawn_context->num_outstanding_launch_cmds--;			    smpd_dbg_printf("%d outstanding spawns.\n", context->spawn_context->num_outstanding_launch_cmds);			    if (context->spawn_context->num_outstanding_launch_cmds == 0)			    {				/* add the result string */				result = smpd_add_command_arg(context->spawn_context->result_cmd, "result", SMPD_SUCCESS_STR);				if (result != SMPD_SUCCESS)				{				    smpd_err_printf("unable to add the result string to the result command.\n");				    smpd_exit_fn(FCNAME);				    return result;				}				/* send the spawn result command */				result = smpd_post_write_command(context, context->spawn_context->result_cmd);				if (result != SMPD_SUCCESS)				{				    smpd_err_printf("unable to post a write of the spawn result command.\n");				    smpd_exit_fn(FCNAME);				    return result;				}				smpd_process.spawning = SMPD_FALSE;				result = smpd_handle_delayed_spawn_command();				if (result != SMPD_SUCCESS)				{				    smpd_err_printf("An error occurred handling delayed spawn commands.\n");				    smpd_exit_fn(FCNAME);				    return result;				}			    }			}			else			{			    smpd_dbg_printf("successfully launched: '%s'\n", iter->cmd);			    if (!smpd_process.stdin_redirecting)			    {				rank = 0;				MPIU_Str_get_int_arg(iter->cmd, "i", &rank);				if (rank == 0)				{				    smpd_dbg_printf("root process launched, starting stdin redirection.\n");				    /* get a handle to stdin */#ifdef HAVE_WINDOWS_H				    result = smpd_make_socket_loop((SOCKET*)&stdin_fd, &hWrite);				    if (result)				    {					smpd_err_printf("Unable to make a local socket loop to forward stdin.\n");					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }#elif defined(USE_PTHREAD_STDIN_REDIRECTION)				    socketpair(AF_UNIX, SOCK_STREAM, 0, fd);				    smpd_process.stdin_read = fd[0];				    smpd_process.stdin_write = fd[1];				    stdin_fd = fd[0];#else				    stdin_fd = fileno(stdin);#endif				    /* convert the native handle to a sock */				    /*printf("stdin native sock %d\n", stdin_fd);fflush(stdout);*/				    result = MPIDU_Sock_native_to_sock(smpd_process.set, stdin_fd, NULL, &insock);				    if (result != MPI_SUCCESS)				    {					smpd_err_printf("unable to create a sock from stdin,\nsock error: %s\n", get_sock_error_string(result));					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    /* create a context for reading from stdin */				    result = smpd_create_context(SMPD_CONTEXT_MPIEXEC_STDIN, smpd_process.set, insock, -1, &context_in);				    if (result != SMPD_SUCCESS)				    {					smpd_err_printf("unable to create a context for stdin.\n");					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    MPIDU_Sock_set_user_ptr(insock, context_in);#ifdef HAVE_WINDOWS_H				    /* unfortunately, we cannot use stdin directly as a sock.  So, use a thread to read and forward				    stdin to a sock */				    smpd_process.hCloseStdinThreadEvent = CreateEvent(NULL, TRUE, FALSE, NULL);				    if (smpd_process.hCloseStdinThreadEvent == NULL)				    {					smpd_err_printf("Unable to create the stdin thread close event, error %d\n", GetLastError());					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    smpd_process.hStdinThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)smpd_stdin_thread, (void*)hWrite, 0, &dwThreadID);				    if (smpd_process.hStdinThread == NULL)				    {					smpd_err_printf("Unable to create a thread to read stdin, error %d\n", GetLastError());					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }#elif defined(USE_PTHREAD_STDIN_REDIRECTION)				    if (pthread_create(&smpd_process.stdin_thread, NULL, smpd_pthread_stdin_thread, NULL) != 0)				    {					smpd_err_printf("Unable to create a thread to read stdin, error %d\n", errno);					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				    /*pthread_detach(smpd_process.stdin_thread);*/#endif				    /* set this variable first before posting the first read to avoid a race condition? */				    smpd_process.stdin_redirecting = SMPD_TRUE;				    /* post a read for a user command from stdin */				    context_in->read_state = SMPD_READING_STDIN;				    result = MPIDU_Sock_post_read(insock, context_in->read_cmd.cmd, 1, 1, NULL);				    if (result != MPI_SUCCESS)				    {					smpd_err_printf("unable to post a read on stdin for an incoming user command, error:\n%s\n",					    get_sock_error_string(result));					smpd_exit_fn(FCNAME);					return SMPD_FAIL;				    }				}			    }			}			if (pg && pg->processes[pg_rank].suspend_cmd != NULL)			{			    /* send the delayed suspend command */			    result = smpd_post_write_command(context, pg->processes[pg_rank].suspend_cmd);			    if (result != SMPD_SUCCESS)			    {				smpd_err_printf("unable to post a write of the suspend command.\n");				smpd_exit_fn(FCNAME);				return result;			    }			}		    }		    else		    {			/* FIXME: We shouldn't abort if a process fails to launch as part of a spawn command */			smpd_process.nproc_exited++;			if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "error", err_msg, SMPD_MAX_ERROR_LEN) == MPIU_STR_SUCCESS)			{			    smpd_err_printf("launch failed: %s\n", err_msg);			}			else			{			    smpd_err_printf("launch failed: %s\n", str);			}			ret_val = SMPD_ABORT;		    }		}		else if (strcmp(iter->cmd_str, "start_dbs") == 0)		{		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			if (context->spawn_context)			{			    if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "kvs_name", context->spawn_context->kvs_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)			    {				if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "domain_name", context->spawn_context->domain_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)				{				    smpd_dbg_printf("start_dbs succeeded, kvs_name: '%s', domain_name: '%s'\n", context->spawn_context->kvs_name, context->spawn_context->domain_name);				    ret_val = smpd_launch_processes(context->spawn_context->launch_list, context->spawn_context->kvs_name, context->spawn_context->domain_name, context->spawn_context);				}				else				{				    smpd_err_printf("invalid start_dbs result returned, no domain_name specified: '%s'\n", context->read_cmd.cmd);				    ret_val = SMPD_FAIL;				}			    }			    else			    {				smpd_err_printf("invalid start_dbs result returned, no kvs_name specified: '%s'\n", context->read_cmd.cmd);				ret_val = SMPD_FAIL;			    }			}			else			{			    if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "kvs_name", smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)			    {				if (MPIU_Str_get_string_arg(context->read_cmd.cmd, "domain_name", smpd_process.domain_name, SMPD_MAX_DBS_NAME_LEN) == MPIU_STR_SUCCESS)				{				    smpd_dbg_printf("start_dbs succeeded, kvs_name: '%s', domain_name: '%s'\n", smpd_process.kvs_name, smpd_process.domain_name);				    if (smpd_process.launch_list != NULL)				    {					ret_val = smpd_launch_processes(smpd_process.launch_list, smpd_process.kvs_name, smpd_process.domain_name, NULL);				    }				    else				    {					/* mpiexec connected to an smpd without any processes to launch.  This means it is running -pmiserver */					create_process_group(smpd_process.nproc, smpd_process.kvs_name, &pg);					result = smpd_create_command("pmi_listen", 0, 1, SMPD_TRUE, &cmd_ptr);					if (result == SMPD_SUCCESS)					{					    result = smpd_add_command_int_arg(cmd_ptr, "nproc", smpd_process.nproc);					    if (result == SMPD_SUCCESS)					    {						result = smpd_post_write_command(smpd_process.left_context, cmd_ptr);						if (result != SMPD_SUCCESS)						{						    smpd_err_printf("unable to post a write for the pmi_listen command\n");						    ret_val = SMPD_FAIL;						}					    }					    else					    {						smpd_err_printf("unable to add the nproc to the pmi_listen command\n");						ret_val = SMPD_FAIL;					    }					}					else					{					    smpd_err_printf("unable to create a pmi_listen command.\n");					    ret_val = SMPD_FAIL;					}				    }				}				else				{				    smpd_err_printf("invalid start_dbs result returned, no domain_name specified: '%s'\n", context->read_cmd.cmd);				    ret_val = SMPD_FAIL;				}			    }			    else			    {				smpd_err_printf("invalid start_dbs result returned, no kvs_name specified: '%s'\n", context->read_cmd.cmd);				ret_val = SMPD_FAIL;			    }			}		    }		    else		    {			smpd_err_printf("start_dbs failed: %s\n", str);			ret_val = SMPD_ABORT;		    }		}		else if (strcmp(iter->cmd_str, "pmi_listen") == 0)		{		    MPIU_Str_get_string_arg(context->read_cmd.cmd, "host_description", host_description, 256);		    MPIU_Str_get_int_arg(context->read_cmd.cmd, "listener_port", &listener_port);		    printf("%s\n%d\n%s\n", host_description, listener_port, smpd_process.kvs_name);		    /*printf("%s %d %s\n", smpd_process.host_list->host, smpd_process.port, smpd_process.kvs_name);*/		    fflush(stdout);		}		else if (strcmp(iter->cmd_str, "spawn") == 0)		{		    smpd_dbg_printf("%s command result = %s\n", iter->cmd_str, str);		    ret_val = SMPD_DBS_RETURN;		}		else if (iter->cmd_str[0] == 'd' && iter->cmd_str[1] == 'b')		{		    smpd_dbg_printf("%s command result = %s\n", iter->cmd_str, str);		    ret_val = SMPD_DBS_RETURN;		}		else if ((strcmp(iter->cmd_str, "init") == 0) || (strcmp(iter->cmd_str, "finalize") == 0))		{		    smpd_dbg_printf("%s command result = %s\n", iter->cmd_str, str);		    ret_val = SMPD_DBS_RETURN;		}		else if (strcmp(iter->cmd_str, "barrier") == 0)		{		    smpd_dbg_printf("%s command result = %s\n", iter->cmd_str, str);		    ret_val = SMPD_DBS_RETURN;		}		else if (strcmp(iter->cmd_str, "validate") == 0)		{		    /* print the result of the validate command */		    printf("%s\n", str);		    /* close the session */		    ret_val = smpd_create_command("done", smpd_process.id, context->id, SMPD_FALSE, &cmd_ptr);		    if (ret_val == SMPD_SUCCESS)		    {			ret_val = smpd_post_write_command(context, cmd_ptr);			if (ret_val == SMPD_SUCCESS)			{			    ret_val = SMPD_CLOSE;			}			else			{			    smpd_err_printf("unable to post a write of a done command.\n");			}		    }		    else		    {			smpd_err_printf("unable to create a done command.\n");		    }		}		else if (strcmp(iter->cmd_str, "status") == 0)		{		    /* print the result of the status command */		    printf("smpd running on %s\n", smpd_process.console_host);		    if (strcmp(str, "none"))		    {			printf("dynamic hosts: %s\n", str);		    }		    ret_val = smpd_create_command("done", smpd_process.id, context->id, SMPD_FALSE, &cmd_ptr);		    if (ret_val == SMPD_SUCCESS)		    {			ret_val = smpd_post_write_command(context, cmd_ptr);			if (ret_val == SMPD_SUCCESS)			{			    ret_val = SMPD_CLOSE;			}			else			{			    smpd_err_printf("unable to post a write of a done command.\n");			}		    }		    else		    {			smpd_err_printf("unable to create a done command.\n");		    }		}		else if (strcmp(iter->cmd_str, "get") == 0)		{		    if (strcmp(str, SMPD_SUCCESS_STR) == 0)		    {			/* print the result of the get command */			char value[SMPD_MAX_VALUE_LENGTH];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -