📄 smpd_handle_spawn.c
字号:
/*printf("sending first connect command to add new hosts for the spawn command.\n");fflush(stdout);*/ /* post a write of the command */ result = smpd_post_write_command(context, cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the connect command.\n"); goto spawn_failed; } context->spawn_context->result_cmd = temp_cmd; smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } host_iter = host_iter->next; }#endif { SMPD_BOOL first = SMPD_TRUE; host_iter = smpd_process.host_list; while (host_iter) { if (host_iter->connected) { if (host_iter->left != NULL && !host_iter->left->connected) { context->connect_to = host_iter->left; /* create a connect command to be sent to the parent */ result = smpd_create_command("connect", 0, context->connect_to->parent, SMPD_TRUE, &cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a connect command.\n"); goto spawn_failed; } context->connect_to->connect_cmd_tag = cmd->tag; result = smpd_add_command_arg(cmd, "host", context->connect_to->host); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the host parameter to the connect command for host %s\n", context->connect_to->host); goto spawn_failed; } result = smpd_add_command_int_arg(cmd, "id", context->connect_to->id); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the id parameter to the connect command for host %s\n", context->connect_to->host); goto spawn_failed; } if (smpd_process.plaintext) { /* propagate the plaintext option to the manager doing the connect */ result = smpd_add_command_arg(cmd, "plaintext", "yes"); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the plaintext parameter to the connect command for host %s\n", context->connect_to->host); goto spawn_failed; } } smpd_dbg_printf("sending connect command to add new hosts for the spawn command.\n"); /*printf("sending first connect command to add new hosts for the spawn command.\n");fflush(stdout);*/ /* post a write of the command */ result = smpd_post_write_command(context, cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the connect command.\n"); goto spawn_failed; } if (first) { context->spawn_context->result_cmd = temp_cmd; first = SMPD_FALSE; } } if (host_iter->right != NULL && !host_iter->right->connected) { context->connect_to = host_iter->right; /* create a connect command to be sent to the parent */ result = smpd_create_command("connect", 0, context->connect_to->parent, SMPD_TRUE, &cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a connect command.\n"); goto spawn_failed; } context->connect_to->connect_cmd_tag = cmd->tag; result = smpd_add_command_arg(cmd, "host", context->connect_to->host); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the host parameter to the connect command for host %s\n", context->connect_to->host); goto spawn_failed; } result = smpd_add_command_int_arg(cmd, "id", context->connect_to->id); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the id parameter to the connect command for host %s\n", context->connect_to->host); goto spawn_failed; } if (smpd_process.plaintext) { /* propagate the plaintext option to the manager doing the connect */ result = smpd_add_command_arg(cmd, "plaintext", "yes"); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the plaintext parameter to the connect command for host %s\n", context->connect_to->host); goto spawn_failed; } } smpd_dbg_printf("sending connect command to add new hosts for the spawn command.\n"); /*printf("sending first connect command to add new hosts for the spawn command.\n");fflush(stdout);*/ /* post a write of the command */ result = smpd_post_write_command(context, cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the connect command.\n"); goto spawn_failed; } if (first) { context->spawn_context->result_cmd = temp_cmd; first = SMPD_FALSE; } } } host_iter = host_iter->next; } if (!first) { /* At least one connect command was issued so return here */ smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } } context->spawn_context->result_cmd = temp_cmd; if (launch_list == NULL) { smpd_process.spawning = SMPD_FALSE; /* spawn command received for zero processes, return a success result immediately */ result = smpd_add_command_arg(context->spawn_context->result_cmd, "result", SMPD_SUCCESS_STR); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the result string to the result command.\n"); goto spawn_failed; } /* send the spawn result command */ result = smpd_post_write_command(context, context->spawn_context->result_cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the spawn result command.\n"); goto spawn_failed; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } /* create the new kvs space */ smpd_dbg_printf("all hosts needed for the spawn command are available, sending start_dbs command.\n"); /*printf("all hosts needed for the spawn command are available, sending start_dbs command.\n");fflush(stdout);*/ /* create the start_dbs command to be sent to the first host */ result = smpd_create_command("start_dbs", 0, 1, SMPD_TRUE, &cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to create a start_dbs command.\n"); goto spawn_failed; } result = smpd_add_command_int_arg(cmd, "npreput", context->spawn_context->npreput); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the npreput value to the start_dbs command for a spawn command.\n"); goto spawn_failed; } result = smpd_add_command_arg(cmd, "preput", context->spawn_context->preput); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the preput keyvals to the start_dbs command for a spawn command.\n"); goto spawn_failed; } /* post a write of the command */ result = smpd_post_write_command(context, cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the start_dbs command.\n"); goto spawn_failed; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS; /* send the launch commands *//* printf("host tree:\n"); host_iter = smpd_process.host_list; if (!host_iter) printf("<none>\n"); while (host_iter) { printf(" host: %s, parent: %d, id: %d, connected: %s\n", host_iter->host, host_iter->parent, host_iter->id, host_iter->connected ? "yes" : "no"); host_iter = host_iter->next; } printf("launch nodes:\n"); launch_iter = launch_list; if (!launch_iter) printf("<none>\n"); while (launch_iter) { printf(" launch_node:\n"); printf(" id : %d\n", launch_iter->host_id); printf(" rank: %d\n", launch_iter->iproc); printf(" size: %d\n", launch_iter->nproc); printf(" clique: %s\n", launch_iter->clique); printf(" exe : %s\n", launch_iter->exe); if (launch_iter->args[0] != '\0') printf(" args: %s\n", launch_iter->args); if (launch_iter->path[0] != '\0') printf(" path: %s\n", launch_iter->path); launch_temp = launch_iter; launch_iter = launch_iter->next; free(launch_temp); } fflush(stdout);*/spawn_failed: smpd_process.spawning = SMPD_FALSE; /* add the result */ result = smpd_add_command_arg(temp_cmd, "result", SMPD_FAIL_STR); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to add the result string to the result command for a spawn command.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } /* send result back */ smpd_dbg_printf("replying to spawn command: \"%s\"\n", temp_cmd->cmd); result = smpd_post_write_command(context, temp_cmd); if (result != SMPD_SUCCESS) { smpd_err_printf("unable to post a write of the result command to the context.\n"); smpd_exit_fn(FCNAME); return SMPD_FAIL; } smpd_exit_fn(FCNAME); return result;}#undef FCNAME#define FCNAME "smpd_delayed_spawn_enqueue"int smpd_delayed_spawn_enqueue(smpd_context_t *context){ smpd_delayed_spawn_node_t *iter, *node; smpd_enter_fn(FCNAME); node = (smpd_delayed_spawn_node_t*)MPIU_Malloc(sizeof(smpd_delayed_spawn_node_t)); if (node == NULL) { smpd_exit_fn(FCNAME); return SMPD_FAIL; } node->next = NULL; node->context = context; node->cmd = context->read_cmd; iter = smpd_process.delayed_spawn_queue; if (iter == NULL) { smpd_process.delayed_spawn_queue = node; smpd_exit_fn(FCNAME); return SMPD_SUCCESS; } while (iter->next != NULL) { iter = iter->next; } iter->next = node; smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_delayed_spawn_dequeue"int smpd_delayed_spawn_dequeue(smpd_context_t **context_pptr){ smpd_delayed_spawn_node_t *node; smpd_enter_fn(FCNAME); if (smpd_process.delayed_spawn_queue != NULL) { *context_pptr = smpd_process.delayed_spawn_queue->context; /* Copy the command in the queue to the context read command * restoring the context to the state it was when it was enqueued. */ (*context_pptr)->read_cmd = smpd_process.delayed_spawn_queue->cmd; node = smpd_process.delayed_spawn_queue; smpd_process.delayed_spawn_queue = smpd_process.delayed_spawn_queue->next; MPIU_Free(node); } else { *context_pptr = NULL; } smpd_exit_fn(FCNAME); return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_delayed_spawn_command"int smpd_handle_delayed_spawn_command(void){ smpd_context_t *context = NULL; int result = SMPD_SUCCESS; smpd_enter_fn(FCNAME); /* Handle delayed spawn commands until a spawn is in progress or the queue is empty */ while (smpd_process.spawning == SMPD_FALSE && smpd_process.delayed_spawn_queue != NULL) { result = smpd_delayed_spawn_dequeue(&context); if (result != SMPD_SUCCESS) { smpd_exit_fn(FCNAME); return result; } if (context != NULL) { result = smpd_handle_spawn_command(context); if (result != SMPD_SUCCESS) { smpd_exit_fn(FCNAME); return result; } } } smpd_exit_fn(FCNAME); return result;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -