📄 smpd_state_machine.c
字号:
smpd_err_printf("unable to read the connect result, %s.\n", get_sock_error_string(event_ptr->error)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("read connect result: '%s'\n", context->pszChallengeResponse); context->read_state = SMPD_IDLE; if (strcmp(context->pszChallengeResponse, SMPD_AUTHENTICATION_ACCEPTED_STR)) { char post_message[100]; /* rejected connection, close */ smpd_dbg_printf("connection rejected, server returned - %s\n", context->pszChallengeResponse); context->read_state = SMPD_IDLE; context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("unable to close sock, error:\n%s\n", get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* connection failed, abort? */ /* when does a forming context get assinged it's global place? At creation? At connection? */ if (strcmp(context->pszChallengeResponse, SMPD_AUTHENTICATION_REJECTED_VERSION_STR) == 0) { /* Customize the abort message to state that the smpd version did not match */ strcpy(post_message, ", smpd version mismatch"); } else { post_message[0] = '\0'; } if (smpd_process.left_context == context) smpd_process.left_context = NULL; if (smpd_process.do_console && smpd_process.console_host[0] != '\0') result = smpd_post_abort_command("unable to connect to %s%s", smpd_process.console_host, post_message); else if (context->connect_to && context->connect_to->host[0] != '\0') result = smpd_post_abort_command("unable to connect to %s%s", context->connect_to->host, post_message); else { if (context->host[0] != '\0') { result = smpd_post_abort_command("unable to connect to %s%s", context->host, post_message); } else { result = smpd_post_abort_command("connection to smpd rejected%s", post_message); } } if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create the close command to tear down the job tree.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else { context->write_state = SMPD_WRITING_PROCESS_SESSION_REQUEST; context->target = SMPD_TARGET_PROCESS; switch (context->state) { case SMPD_MPIEXEC_CONNECTING_TREE: case SMPD_CONNECTING: strcpy(context->session, SMPD_PROCESS_SESSION_STR); break; case SMPD_MPIEXEC_CONNECTING_SMPD: if (smpd_process.use_process_session) { strcpy(context->session, SMPD_PROCESS_SESSION_STR); } else { context->target = SMPD_TARGET_SMPD; strcpy(context->session, SMPD_SMPD_SESSION_STR); context->write_state = SMPD_WRITING_SMPD_SESSION_REQUEST; } break; case SMPD_CONNECTING_RPMI: context->target = SMPD_TARGET_PMI; context->write_state = SMPD_WRITING_PMI_SESSION_REQUEST; strcpy(context->session, SMPD_PMI_SESSION_STR); break; default: strcpy(context->session, SMPD_PROCESS_SESSION_STR); break; } result = MPIDU_Sock_post_write(context->sock, context->session, SMPD_SESSION_REQUEST_LEN, SMPD_SESSION_REQUEST_LEN, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a write of the session request '%s',\nsock error: %s\n", context->session, get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_writing_challenge_string"int smpd_state_writing_challenge_string(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to write the challenge string, %s.\n", get_sock_error_string(event_ptr->error)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); result = (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; smpd_exit_fn(FCNAME); return result; } smpd_dbg_printf("wrote challenge string: '%s'\n", context->pszChallengeResponse); context->read_state = SMPD_READING_CHALLENGE_RESPONSE; context->write_state = SMPD_IDLE; result = MPIDU_Sock_post_read(context->sock, context->pszChallengeResponse, SMPD_AUTHENTICATION_STR_LEN, SMPD_AUTHENTICATION_STR_LEN, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("posting a read of the challenge response string failed,\nsock error: %s\n", get_sock_error_string(result)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); result = (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; smpd_exit_fn(FCNAME); return result; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_challenge_response"int smpd_state_reading_challenge_response(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to read the challenge response, %s.\n", get_sock_error_string(event_ptr->error)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_dbg_printf("read challenge response: '%s'\n", context->pszChallengeResponse); context->read_state = SMPD_IDLE; context->write_state = SMPD_WRITING_CONNECT_RESULT; if (strcmp(context->pszChallengeResponse, SMPD_VERSION_FAILURE) == 0) { strcpy(context->pszChallengeResponse, SMPD_AUTHENTICATION_REJECTED_VERSION_STR); } else if (strcmp(context->pszChallengeResponse, context->pszCrypt) == 0) { strcpy(context->pszChallengeResponse, SMPD_AUTHENTICATION_ACCEPTED_STR); } else { strcpy(context->pszChallengeResponse, SMPD_AUTHENTICATION_REJECTED_STR); } result = MPIDU_Sock_post_write(context->sock, context->pszChallengeResponse, SMPD_AUTHENTICATION_STR_LEN, SMPD_AUTHENTICATION_STR_LEN, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a write of the connect result '%s',\nsock error: %s\n", context->pszChallengeResponse, get_sock_error_string(result)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_writing_connect_result"int smpd_state_writing_connect_result(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { smpd_err_printf("unable to write the connect result, %s.\n", get_sock_error_string(event_ptr->error)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_dbg_printf("wrote connect result: '%s'\n", context->pszChallengeResponse); context->write_state = SMPD_IDLE; if (strcmp(context->pszChallengeResponse, SMPD_AUTHENTICATION_REJECTED_STR) == 0) { context->state = SMPD_CLOSING; smpd_dbg_printf("connection reject string written, closing sock.\n"); result = MPIDU_Sock_post_close(context->sock); if (result != MPI_SUCCESS) { smpd_err_printf("MPIDU_Sock_post_close failed, error:\n%s\n", get_sock_error_string(result)); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } context->read_state = SMPD_READING_SESSION_REQUEST; result = MPIDU_Sock_post_read(context->sock, context->session, SMPD_SESSION_REQUEST_LEN, SMPD_SESSION_REQUEST_LEN, NULL); if (result != MPI_SUCCESS) { smpd_err_printf("unable to post a read for the session header,\nsock error: %s\n", get_sock_error_string(result)); context->state = SMPD_CLOSING; result = MPIDU_Sock_post_close(context->sock); smpd_exit_fn(FCNAME); return (result == MPI_SUCCESS) ? SMPD_SUCCESS : SMPD_FAIL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_state_reading_stdin"int smpd_state_reading_stdin(smpd_context_t *context, MPIDU_Sock_event_t *event_ptr){ int result; smpd_command_t *cmd_ptr; MPIU_Size_t num_read; char buffer[SMPD_MAX_CMD_LENGTH]; int num_encoded; smpd_enter_fn(FCNAME); if (event_ptr->error != MPI_SUCCESS) { /*smpd_err_printf("unable to read from stdin, %s.\n", get_sock_error_string(event_ptr->error));*/ smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_dbg_printf("read from stdin\n"); if (context->type == SMPD_CONTEXT_MPIEXEC_STDIN) { smpd_dbg_printf("read from %s\n", smpd_get_context_str(context)); /* one byte read, attempt to read up to the buffer size */ result = MPIDU_Sock_read(context->sock, &context->read_cmd.cmd[1], SMPD_STDIN_PACKET_SIZE-1, &num_read); if (result != MPI_SUCCESS) { num_read = 0; smpd_dbg_printf("MPIDU_Sock_read(%d) failed (%s), assuming %s is closed.\n", MPIDU_Sock_get_sock_id(context->sock), get_sock_error_string(result), smpd_get_context_str(context)); } smpd_dbg_printf("%d bytes read from %s\n", num_read+1, smpd_get_context_str(context)); smpd_encode_buffer(buffer, SMPD_MAX_CMD_LENGTH, context->read_cmd.cmd, num_read+1, &num_encoded); buffer[num_encoded*2] = '\0'; /*smpd_dbg_printf("encoded %d characters: %d '%s'\n", num_encoded, strlen(buffer), buffer);*/ /* create an stdin command */ result = smpd_create_command("stdin", 0, 1 /* input always goes to node 1? */, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create an stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_add_command_arg(cmd_ptr, "data", buffer); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the data to the stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* send the stdin command */ result = smpd_post_write_command(smpd_process.left_context, cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } else if (context->type == SMPD_CONTEXT_MPIEXEC_STDIN_RSH) { unsigned char *buf; MPIU_Size_t total, num_written; smpd_dbg_printf("read from %s\n", smpd_get_context_str(context)); /* one byte read, attempt to read up to the buffer size */ result = MPIDU_Sock_read(context->sock, &context->read_cmd.cmd[1], SMPD_STDIN_PACKET_SIZE-1, &num_read); if (result != MPI_SUCCESS) { num_read = 0; smpd_dbg_printf("MPIDU_Sock_read(%d) failed (%s), assuming %s is closed.\n", MPIDU_Sock_get_sock_id(context->sock), get_sock_error_string(result), smpd_get_context_str(context)); } smpd_dbg_printf("%d bytes read from %s\n", num_read+1, smpd_get_context_str(context)); /* write stdin to root rsh process */ total = num_read+1; buf = (unsigned char *)context->read_cmd.cmd; while (total > 0) { result = MPIDU_Sock_write(context->process->in->sock, buf, total, &num_written); if (result != MPI_SUCCESS) { num_read = 0; total = 0; smpd_dbg_printf("MPIDU_Sock_write(%d) failed (%s), assuming %s is closed.\n", MPIDU_Sock_get_sock_id(context->process->in->sock), get_sock_error_string(result), smpd_get_context_str(context)); } else { total = total - num_written; buf = buf + num_written; if (num_written == 0) { /* FIXME: what does 0 bytes written mean? * Does it mean that no bytes could be written at that moment * or does it mean that there is an error on the socket? */ } } } } else { if (context->read_cmd.stdin_read_offset == SMPD_STDIN_PACKET_SIZE || context->read_cmd.cmd[context->read_cmd.stdin_read_offset] == '\n') { if (context->read_cmd.cmd[context->read_cmd.stdin_read_offset] != '\n') smpd_err_printf("truncated command.\n"); context->read_cmd.cmd[context->read_cmd.stdin_read_offset] = '\0'; /* remove the \n character */ result = smpd_create_command("", -1, -1, SMPD_FALSE, &cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a command structure for the stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } result = smpd_init_command(cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to initialize a command structure for the stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } strcpy(cmd_ptr->cmd, context->read_cmd.cmd); if (MPIU_Str_get_int_arg(cmd_ptr->cmd, "src", &cmd_ptr->src) != MPIU_STR_SUCCESS) { result = smpd_add_command_int_arg(cmd_ptr, "src", 0); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the default src parameter to the stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } if (MPIU_Str_get_int_arg(cmd_ptr->cmd, "dest", &cmd_ptr->dest) != MPIU_STR_SUCCESS) { result = smpd_add_command_int_arg(cmd_ptr, "dest", 1); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the default dest parameter to the stdin command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } } result = smpd_parse_command(cmd_ptr); if (result != SMPD_SUCCESS) { smpd_err_printf("invalid command read from stdin, ignoring: \"%s\"\n", context->read_cmd.cmd); } else { if (strcmp(cmd_ptr->cmd_str, "connect") == 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -