⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 smpd_state_machine.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 5 页
字号:
		{		    if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "tag", &cmd_ptr->tag) != MPIU_STR_SUCCESS)		    {			smpd_dbg_printf("adding tag %d to connect command.\n", smpd_process.cur_tag);			smpd_add_command_int_arg(cmd_ptr, "tag", smpd_process.cur_tag);			cmd_ptr->tag = smpd_process.cur_tag;			smpd_process.cur_tag++;		    }		    cmd_ptr->wait = SMPD_TRUE;		}		if (strcmp(cmd_ptr->cmd_str, "set") == 0 || strcmp(cmd_ptr->cmd_str, "delete") == 0 ||		    strcmp(cmd_ptr->cmd_str, "stat") == 0 || strcmp(cmd_ptr->cmd_str, "get") == 0)		{		    if (MPIU_Str_get_int_arg(context->read_cmd.cmd, "tag", &cmd_ptr->tag) != MPIU_STR_SUCCESS)		    {			smpd_dbg_printf("adding tag %d to %s command.\n", smpd_process.cur_tag, cmd_ptr->cmd_str);			smpd_add_command_int_arg(cmd_ptr, "tag", smpd_process.cur_tag);			cmd_ptr->tag = smpd_process.cur_tag;			smpd_process.cur_tag++;		    }		    cmd_ptr->wait = SMPD_TRUE;		}		smpd_dbg_printf("command read from stdin, forwarding to left_child smpd\n");		result = smpd_post_write_command(smpd_process.left_context, cmd_ptr);		if (result != SMPD_SUCCESS)		{		    smpd_err_printf("unable to post a write of the command read from stdin: \"%s\"\n", cmd_ptr->cmd);		    smpd_free_command(cmd_ptr);		    smpd_exit_fn(FCNAME);		    return SMPD_FAIL;		}		smpd_dbg_printf("posted write of command: \"%s\"\n", cmd_ptr->cmd);	    }	    context->read_cmd.stdin_read_offset = 0;	}	else	{	    context->read_cmd.stdin_read_offset++;	}    }    result = MPIDU_Sock_post_read(context->sock, &context->read_cmd.cmd[context->read_cmd.stdin_read_offset], 1, 1, NULL);    if (result != MPI_SUCCESS)    {	/*	if (result != SOCK_EOF)	{	    smpd_dbg_printf("MPIDU_Sock_post_read failed (%s), assuming %s is closed, calling sock_post_close(%d).\n",		get_sock_error_string(result), smpd_get_context_str(context), sock_getid(context->sock));	}	*/	context->state = SMPD_CLOSING;	result = MPIDU_Sock_post_close(context->sock);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("unable to post a close on a broken %s context.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_smpd_writing_data_to_stdin"int smpd_state_smpd_writing_data_to_stdin(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){    int result;    smpd_stdin_write_node_t *node;    smpd_enter_fn(FCNAME);    SMPD_UNREFERENCED_ARG(event_ptr);    node = context->process->stdin_write_list;    if (node == NULL)    {	smpd_err_printf("write completed to process stdin context with no write posted in the list.\n");	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_dbg_printf("wrote %d bytes to stdin of rank %d\n", node->length, context->process->rank);    MPIU_Free(node->buffer);    MPIU_Free(node);    context->process->stdin_write_list = context->process->stdin_write_list->next;    if (context->process->stdin_write_list != NULL)    {	context->process->in->write_state = SMPD_WRITING_DATA_TO_STDIN;	result = MPIDU_Sock_post_write(context->process->in->sock,	    node->buffer, node->length, node->length, NULL);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("unable to post a write of %d bytes to stdin for rank %d\n",		node->length, context->process->rank);	    smpd_exit_fn(FCNAME);	}    }    else    {	context->process->in->write_state = SMPD_IDLE;    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_stdouterr"int smpd_state_reading_stdouterr(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){    int result;    smpd_command_t *cmd_ptr;    MPIU_Size_t num_read;    char buffer[SMPD_MAX_CMD_LENGTH];    int num_encoded;    SMPD_BOOL ends_in_cr = SMPD_FALSE;    smpd_enter_fn(FCNAME);    if (context->state == SMPD_CLOSING)    {	smpd_exit_fn(FCNAME);	return SMPD_SUCCESS;    }    if (event_ptr->error != MPI_SUCCESS)    {	if (MPIR_ERR_GET_CLASS(event_ptr->error) != MPIDU_SOCK_ERR_CONN_CLOSED)	{	    smpd_dbg_printf("reading failed(%s), assuming %s is closed.\n",		get_sock_error_string(event_ptr->error), smpd_get_context_str(context));	}	/* Return an error an then handle_sock_op_read will post a close	context->state = SMPD_CLOSING;	result = MPIDU_Sock_post_close(context->sock);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("unable to post a close on a broken %s context.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}	*/	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_dbg_printf("read from %s\n", smpd_get_context_str(context));    /* one byte read, attempt to read up to the buffer size */    result = MPIDU_Sock_read(context->sock, &context->read_cmd.cmd[1], (SMPD_MAX_STDOUT_LENGTH/2)-2, &num_read);    if (result != MPI_SUCCESS)    {	num_read = 0;	smpd_dbg_printf("MPIDU_Sock_read(%d) failed (%s), assuming %s is closed.\n",	    MPIDU_Sock_get_sock_id(context->sock), get_sock_error_string(result), smpd_get_context_str(context));    }    /* Use num_read instead of num_read-1 because one byte was already read before increasing the buffer length by one */    if (context->read_cmd.cmd[num_read] == '\n')    {	ends_in_cr = SMPD_TRUE;    }    smpd_dbg_printf("%d bytes read from %s\n", num_read+1, smpd_get_context_str(context));    if (context->type == SMPD_CONTEXT_STDOUT_RSH || context->type == SMPD_CONTEXT_STDERR_RSH)    {	size_t total;	size_t num_written;	char *buffer;	total = num_read+1;	buffer = context->read_cmd.cmd;	while (total > 0)	{	    num_written = fwrite(buffer, 1, total, context->type == SMPD_CONTEXT_STDOUT_RSH ? stdout : stderr);	    if (num_written < 1)	    {		num_read = 0;		total = 0;		smpd_err_printf("fwrite failed: error %d\n", ferror(context->type == SMPD_CONTEXT_STDOUT_RSH ? stdout : stderr));	    }	    else	    {		total = total - num_written;		buffer = buffer + num_written;	    }	}    }    else    {	smpd_encode_buffer(buffer, SMPD_MAX_CMD_LENGTH, context->read_cmd.cmd, num_read+1, &num_encoded);	buffer[num_encoded*2] = '\0';	/*smpd_dbg_printf("encoded %d characters: %d '%s'\n", num_encoded, strlen(buffer), buffer);*/	/* create an output command */	result = smpd_create_command(	    smpd_get_context_str(context),	    smpd_process.id, 0 /* output always goes to node 0? */, SMPD_FALSE, &cmd_ptr);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to create an output command.\n");	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}	result = smpd_add_command_int_arg(cmd_ptr, "rank", context->rank);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the rank to the %s command.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}	/* Use the context->first_output_stdout/err flag to indicate that this is the first output since	 * an end of a line or the very first output of the application.  mpiexec uses this flag when the	 * -l option is specified to prefix lines with process information.  This flag only handles end of	 * line situations where the end of line is the last entry in the output.  mpiexec handles end of	 * line characters in the middle of the output.	 */	switch (context->type)	{	case SMPD_CONTEXT_STDOUT:	    if (context->first_output_stdout == SMPD_TRUE)	    {		result = smpd_add_command_int_arg(cmd_ptr, "first", 1);	    }	    else	    {		result = smpd_add_command_int_arg(cmd_ptr, "first", 0);	    }	    context->first_output_stdout = ends_in_cr;	    break;	case SMPD_CONTEXT_STDERR:	    if (context->first_output_stderr == SMPD_TRUE)	    {		result = smpd_add_command_int_arg(cmd_ptr, "first", 1);	    }	    else	    {		result = smpd_add_command_int_arg(cmd_ptr, "first", 0);	    }	    context->first_output_stderr = ends_in_cr;	    break;	default:	    result = smpd_add_command_int_arg(cmd_ptr, "first", 0);	    break;	}	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the first flag to the %s command.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}	result = smpd_add_command_arg(cmd_ptr, "data", buffer);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to add the data to the %s command.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}	/* send the stdout command */	result = smpd_post_write_command(smpd_process.parent_context, cmd_ptr);	if (result != SMPD_SUCCESS)	{	    smpd_err_printf("unable to post a write of the %s command.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}    }    /* post a read for the next byte of data */    result = MPIDU_Sock_post_read(context->sock, &context->read_cmd.cmd, 1, 1, NULL);    if (result != MPI_SUCCESS)    {	/*	if (result != SOCK_EOF)	{	    smpd_dbg_printf("MPIDU_Sock_post_read failed (%s), assuming %s is closed, calling sock_post_close(%d).\n",		get_sock_error_string(result), smpd_get_context_str(context), MPIDU_Sock_get_sock_id(context->sock));	}	*/	context->state = SMPD_CLOSING;	result = MPIDU_Sock_post_close(context->sock);	if (result != MPI_SUCCESS)	{	    smpd_err_printf("unable to post a close on a broken %s context.\n", smpd_get_context_str(context));	    smpd_exit_fn(FCNAME);	    return SMPD_FAIL;	}    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_cmd_header"int smpd_state_reading_cmd_header(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){    int result;    smpd_enter_fn(FCNAME);    if (event_ptr->error != MPI_SUCCESS)    {	smpd_err_printf("unable to read the cmd header on the %s context, %s.\n",	    smpd_get_context_str(context), get_sock_error_string(event_ptr->error));	if (smpd_process.root_smpd)	{	    context->state = SMPD_CLOSING;	    result = MPIDU_Sock_post_close(context->sock);	    smpd_exit_fn(FCNAME);	    return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL;	}	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_dbg_printf("read command header\n");    context->read_cmd.length = atoi(context->read_cmd.cmd_hdr_str);    if ((context->read_cmd.length < 1) || (context->read_cmd.length > SMPD_MAX_CMD_LENGTH))    {	smpd_err_printf("unable to read the command, invalid command length: %d\n", context->read_cmd.length);	if (smpd_process.root_smpd)	{	    context->state = SMPD_CLOSING;	    result = MPIDU_Sock_post_close(context->sock);	    smpd_exit_fn(FCNAME);	    return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL;	}	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_dbg_printf("command header read, posting read for data: %d bytes\n", context->read_cmd.length);    context->read_cmd.state = SMPD_CMD_READING_CMD;    context->read_state = SMPD_READING_CMD;    result = MPIDU_Sock_post_read(context->sock, context->read_cmd.cmd, context->read_cmd.length, context->read_cmd.length, NULL);    if (result != MPI_SUCCESS)    {	smpd_err_printf("unable to post a read for the command string,\nsock error: %s\n", get_sock_error_string(result));	if (smpd_process.root_smpd)	{	    context->state = SMPD_CLOSING;	    result = MPIDU_Sock_post_close(context->sock);	    smpd_exit_fn(FCNAME);	    return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL;	}	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_cmd"int smpd_state_reading_cmd(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){    int result;    smpd_command_t *cmd_ptr;    smpd_host_node_t *left, *right, *host_node;    smpd_enter_fn(FCNAME);    if (event_ptr->error != MPI_SUCCESS)    {	smpd_err_printf("unable to read the command, %s.\n", get_sock_error_string(event_ptr->error));	if (smpd_process.root_smpd)	{	    context->state = SMPD_CLOSING;	    result = MPIDU_Sock_post_close(context->sock);	    smpd_exit_fn(FCNAME);	    return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL;	}	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_dbg_printf("read command\n");    result = smpd_parse_command(&context->read_cmd);    if (result != SMPD_SUCCESS)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -