⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 smpd_handle_command.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 5 页
字号:
    }    if (MPIU_Str_get_int_arg(cmd->cmd, "first", &first) != MPIU_STR_SUCCESS)    {	first = 0;	smpd_err_printf("no first flag in the stdout command: '%s'\n", cmd->cmd);    }    if (MPIU_Str_get_string_arg(cmd->cmd, "data", data, SMPD_MAX_STDOUT_LENGTH) == MPIU_STR_SUCCESS)    {	smpd_decode_buffer(data, data, SMPD_MAX_STDOUT_LENGTH, &num_decoded);	data[num_decoded] = '\0';	if (data[num_decoded-1] == '\n')	{	    ends_in_cr = SMPD_TRUE;	}	/*printf("[%d]", rank);*/	if (smpd_process.prefix_output == SMPD_TRUE)	{	    if (first)	    {		MPIU_Snprintf(prefix, 20, "[%d]", rank);		prefix_length = strlen(prefix);		write_to_stdout(prefix, prefix_length);	    }	    MPIU_Snprintf(prefix, 20, "\n[%d]", rank);	    prefix_length = strlen(prefix);	    token = strtok(data, "\r\n");	    while (token != NULL)	    {		write_to_stdout(token, strlen(token));		token = strtok(NULL, "\r\n");		if (token != NULL)		{		    write_to_stdout(prefix, prefix_length);		}		else		{		    if (ends_in_cr == SMPD_TRUE)		    {			write_to_stdout("\n", 1);		    }		}	    }	}	else	{	    write_to_stdout(data, num_decoded);	}    }    else    {	smpd_err_printf("unable to get the data from the stdout command: '%s'\n", cmd->cmd);    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "write_to_stderr"static int write_to_stderr(const char *buffer, size_t num_bytes){#ifdef HAVE_WINDOWS_H    HANDLE hStderr;    DWORD num_written;    smpd_enter_fn(FCNAME);    /* MT - This should acquire the hLaunchProcessMutex so that it doesn't acquire a redirected handle. */    /* But since the code is not multi-threaded yet this doesn't matter */    hStderr = GetStdHandle(STD_ERROR_HANDLE);    WriteFile(hStderr, buffer, num_bytes, &num_written, NULL);#else    smpd_enter_fn(FCNAME);    fwrite(buffer, 1, num_bytes, stderr);    fflush(stdout);#endif    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_stderr_command"int smpd_handle_stderr_command(smpd_context_t *context){    int rank;    char data[SMPD_MAX_STDOUT_LENGTH];    smpd_command_t *cmd;    int num_decoded = 0;    int first;    char prefix[20];    size_t prefix_length;    char *token;    SMPD_BOOL ends_in_cr = SMPD_FALSE;    smpd_enter_fn(FCNAME);    cmd = &context->read_cmd;    if (MPIU_Str_get_int_arg(cmd->cmd, "rank", &rank) != MPIU_STR_SUCCESS)    {	rank = -1;	smpd_err_printf("no rank in the stderr command: '%s'\n", cmd->cmd);    }    if (MPIU_Str_get_int_arg(cmd->cmd, "first", &first) != MPIU_STR_SUCCESS)    {	first = 0;	smpd_err_printf("no first flag in the stderr command: '%s'\n", cmd->cmd);    }    if (MPIU_Str_get_string_arg(cmd->cmd, "data", data, SMPD_MAX_STDOUT_LENGTH) == MPIU_STR_SUCCESS)    {	smpd_decode_buffer(data, data, SMPD_MAX_STDOUT_LENGTH, &num_decoded);	data[num_decoded] = '\0';	if (data[num_decoded-1] == '\n')	{	    ends_in_cr = SMPD_TRUE;	}	/*fprintf(stderr, "[%d]", rank);*/	if (smpd_process.prefix_output == SMPD_TRUE)	{	    if (first)	    {		MPIU_Snprintf(prefix, 20, "[%d]", rank);		prefix_length = strlen(prefix);		write_to_stderr(prefix, prefix_length);	    }	    MPIU_Snprintf(prefix, 20, "\n[%d]", rank);	    prefix_length = strlen(prefix);	    token = strtok(data, "\r\n");	    while (token != NULL)	    {		write_to_stderr(token, strlen(token));		token = strtok(NULL, "\r\n");		if (token != NULL)		{		    write_to_stderr(prefix, prefix_length);		}		else		{		    write_to_stderr("\n", 1);		}	    }	}	else	{	    write_to_stderr(data, num_decoded);	}    }    else    {	smpd_err_printf("unable to get the data from the stderr command: '%s'\n", cmd->cmd);    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "create_process_group"static int create_process_group(int nproc, char *kvs_name, smpd_process_group_t **pg_pptr){    int i;    smpd_process_group_t *pg;    smpd_enter_fn(FCNAME);    /* initialize a new process group structure */    pg = (smpd_process_group_t*)MPIU_Malloc(sizeof(smpd_process_group_t));    if (pg == NULL)    {	smpd_err_printf("unable to allocate memory for a process group structure.\n");	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    pg->aborted = SMPD_FALSE;    pg->any_init_received = SMPD_FALSE;    pg->any_noinit_process_exited = SMPD_FALSE;    strncpy(pg->kvs, kvs_name, SMPD_MAX_DBS_NAME_LEN);    pg->num_procs = nproc;    pg->processes = (smpd_exit_process_t*)MPIU_Malloc(nproc * sizeof(smpd_exit_process_t));    if (pg->processes == NULL)    {	smpd_err_printf("unable to allocate an array of %d process exit structures.\n", nproc);	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    for (i=0; i<nproc; i++)    {	pg->processes[i].ctx_key[0] = '\0';	pg->processes[i].errmsg = NULL;	pg->processes[i].exitcode = -1;	pg->processes[i].exited = SMPD_FALSE;	pg->processes[i].finalize_called = SMPD_FALSE;	pg->processes[i].init_called = SMPD_FALSE;	pg->processes[i].suspended = SMPD_FALSE;	pg->processes[i].suspend_cmd = NULL;    }    /* add the process group to the global list */    pg->next = smpd_process.pg_list;    smpd_process.pg_list = pg;    *pg_pptr = pg;    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_launch_processes"int smpd_launch_processes(smpd_launch_node_t *launch_list, char *kvs_name, char *domain_name, smpd_spawn_context_t *spawn_context){    int result;    smpd_command_t *cmd_ptr;    smpd_launch_node_t *launch_node_ptr, *launch_iter;    smpd_map_drive_node_t *map_iter;    char drive_arg_str[20];    char drive_map_str[SMPD_MAX_EXE_LENGTH];    smpd_process_group_t *pg;    int i;    smpd_enter_fn(FCNAME);    launch_node_ptr = launch_list;    smpd_dbg_printf("creating a process group of size %d on node %d called %s\n", launch_node_ptr->nproc, smpd_process.id, kvs_name);    result = create_process_group(launch_list->nproc, kvs_name, &pg);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("unable to create a process group.\n");	goto launch_failure;    }    launch_iter = launch_node_ptr;    for (i=0; i<launch_node_ptr->nproc; i++)    {	if (launch_iter == NULL)	{	    smpd_err_printf("number of launch nodes does not match number of processes: %d < %d\n", i, launch_node_ptr->nproc);	    goto launch_failure;	} 	pg->processes[launch_iter->iproc].node_id = launch_iter->host_id;	MPIU_Strncpy(pg->processes[launch_iter->iproc].host, launch_iter->hostname, SMPD_MAX_HOST_LENGTH);	launch_iter = launch_iter->next;    }    /* launch the processes */    smpd_dbg_printf("launching the processes.\n");    launch_node_ptr = launch_list;    while (launch_node_ptr)    {	/* create the launch command */	result = smpd_create_command("launch", 0, 	    launch_node_ptr->host_id,	    SMPD_TRUE, &cmd_ptr);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to create a launch command.\n");	    goto launch_failure;	}	result = smpd_add_command_arg(cmd_ptr, "c", launch_node_ptr->exe);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the command line to the launch command: '%s'\n", launch_node_ptr->exe);	    goto launch_failure;	}	result = smpd_add_command_int_arg(cmd_ptr, "s", spawn_context ? 1 : 0);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the spawn flag to the launch command: '%s'\n", launch_node_ptr->exe);	    goto launch_failure;	}	result = smpd_add_command_int_arg(cmd_ptr, "a", launch_node_ptr->appnum);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the application number to the launch command: '%s'\n", launch_node_ptr->exe);	    goto launch_failure;	}	if (launch_node_ptr->env[0] != '\0')	{	    result = smpd_add_command_arg(cmd_ptr, "e", launch_node_ptr->env);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the environment variables to the launch command: '%s'\n", launch_node_ptr->env);		goto launch_failure;	    }	}	if (launch_node_ptr->dir[0] != '\0')	{	    result = smpd_add_command_arg(cmd_ptr, "d", launch_node_ptr->dir);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the working directory to the launch command: '%s'\n", launch_node_ptr->dir);		goto launch_failure;	    }	}	if (launch_node_ptr->path[0] != '\0')	{	    result = smpd_add_command_arg(cmd_ptr, "p", launch_node_ptr->path);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the search path to the launch command: '%s'\n", launch_node_ptr->path);		goto launch_failure;	    }	}	if (launch_node_ptr->clique[0] != '\0')	{	    result = smpd_add_command_arg(cmd_ptr, "q", launch_node_ptr->clique);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the clique string to the launch command: '%s'\n", launch_node_ptr->clique);		goto launch_failure;	    }	}	/*printf("creating launch command for rank %d\n", launch_node_ptr->iproc);*/	result = smpd_add_command_int_arg(cmd_ptr, "i", launch_node_ptr->iproc);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the rank to the launch command: %d\n", launch_node_ptr->iproc);	    goto launch_failure;	}	result = smpd_add_command_int_arg(cmd_ptr, "n", launch_node_ptr->nproc);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the nproc field to the launch command: %d\n", launch_node_ptr->nproc);	    goto launch_failure;	}	result = smpd_add_command_arg(cmd_ptr, "k", kvs_name);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the kvs name('%s') to the launch command\n", kvs_name);	    goto launch_failure;	}	result = smpd_add_command_arg(cmd_ptr, "kd", domain_name);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the domain name('%s') to the launch command\n", domain_name);	    goto launch_failure;	}	if (launch_node_ptr->priority_class != SMPD_DEFAULT_PRIORITY_CLASS)	{	    result = smpd_add_command_int_arg(cmd_ptr, "pc", launch_node_ptr->priority_class);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the priority class field to the launch command: %d\n", launch_node_ptr->priority_class);		goto launch_failure;	    }	}	if (launch_node_ptr->priority_thread != SMPD_DEFAULT_PRIORITY)	{	    result = smpd_add_command_int_arg(cmd_ptr, "pt", launch_node_ptr->priority_thread);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the priority field to the launch command: %d\n", launch_node_ptr->priority_thread);		goto launch_failure;	    }	}	map_iter = launch_node_ptr->map_list;	i = 0;	while (map_iter)	{	    i++;	    map_iter = map_iter->next;	}	if (i > 0)	{	    result = smpd_add_command_int_arg(cmd_ptr, "mn", i);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the drive mapping arg to the launch command: 'mn=%d'\n", i);		goto launch_failure;	    }	}	map_iter = launch_node_ptr->map_list;	i = 0;	while (map_iter)	{	    sprintf(drive_arg_str, "m%d", i);	    i++;	    sprintf(drive_map_str, "%c:%s", map_iter->drive, map_iter->share);	    result = smpd_add_command_arg(cmd_ptr, drive_arg_str, drive_map_str);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the drive mapping to the launch command: '%s'\n", drive_map_str);		goto launch_failure;	    }	    map_iter = map_iter->next;	}	/* send the launch command */	result = smpd_post_write_command(smpd_process.left_context, cmd_ptr);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to post a write for the launch command:\n id: %d\n rank: %d\n cmd: '%s'\n",		launch_node_ptr->host_id, launch_node_ptr->iproc, launch_node_ptr->exe);	    goto launch_failure;	}	/* increment the number of launched processes */	smpd_process.nproc++;	if (spawn_context)	    spawn_context->num_outstanding_launch_cmds++;	/* move to the next node */	launch_node_ptr = launch_node_ptr->next;    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;launch_failure:    smpd_exit_fn(FCNAME);    return SMPD_FAIL;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -