⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 smpd_handle_command.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "smpd.h"#ifdef HAVE_STDARG_H#include <stdarg.h>#endif#ifdef HAVE_WINDOWS_H#include "smpd_service.h"#include <Ntdsapi.h>#endif/* prototype local functions */static int write_to_stdout(const char *buffer, size_t num_bytes);static int write_to_stderr(const char *buffer, size_t num_bytes);static int smpd_do_abort_job(char *name, int rank, char *error_str, int exit_code);static int create_process_group(int nproc, char *kvs_name, smpd_process_group_t **pg_pptr);static int get_name_key_value(char *str, char *name, char *key, char *value);/* FIXME: smpd_get_hostname() does the same thing */#undef FCNAME#define FCNAME "get_hostname"static int get_hostname(char *host, int hostlen){    int retval = SMPD_SUCCESS;#ifdef HAVE_WINDOWS_H	    DWORD len = hostlen;#endif	    smpd_enter_fn(FCNAME);    if(host == NULL){        smpd_err_printf("host == NULL\n");        smpd_exit_fn(FCNAME);        return SMPD_FAIL;            }#ifdef HAVE_WINDOWS_H    if(!GetComputerNameEx(ComputerNameDnsFullyQualified, host, &len)){        smpd_err_printf("GetComputerNameEx failed, error = %d\n", GetLastError());        smpd_exit_fn(FCNAME);        return SMPD_FAIL;    }#else    if(gethostname(host, hostlen)){        smpd_err_printf("gethostname failed, error = %d\n", errno);        smpd_exit_fn(FCNAME);        return SMPD_FAIL;    }#endif        smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_do_abort_job"static int smpd_do_abort_job(char *name, int rank, char *error_str, int exit_code){    int result;    smpd_command_t *cmd_ptr;    smpd_process_group_t *pg;    int i;    smpd_enter_fn(FCNAME);    pg = smpd_process.pg_list;    while (pg)    {	if (strcmp(pg->kvs, name) == 0)	{	    if (rank >= pg->num_procs)	    {		smpd_err_printf("invalid abort_job command - rank %d out of range, number of processes = %d", rank, pg->num_procs);		smpd_exit_fn(FCNAME);		return SMPD_FAIL;	    }	    break;	}	pg = pg->next;    }    if (pg == NULL)    {	smpd_err_printf("no process group structure found to match the abort_job command: pg <%s>\n", name);	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    /* save the abort message */    pg->processes[rank].errmsg = MPIU_Strdup(error_str);    if (pg->aborted == SMPD_TRUE)    {	smpd_dbg_printf("job already aborted.\n");	smpd_exit_fn(FCNAME);	return SMPD_SUCCESS;    }    smpd_process.use_abort_exit_code = SMPD_TRUE;    smpd_process.abort_exit_code = exit_code;    pg->aborted = SMPD_TRUE;    pg->num_pending_suspends = 0;    for (i=0; i<pg->num_procs; i++)    {	if (!pg->processes[i].exited)	{	    /* create the suspend command */	    result = smpd_create_command("suspend", 0, pg->processes[i].node_id, SMPD_TRUE, &cmd_ptr);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to create a suspend command.\n");		goto do_abort_failure;	    }	    result = smpd_add_command_arg(cmd_ptr, "name", name);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the kvs name to the suspend command: '%s'\n", name);		goto do_abort_failure;	    }	    result = smpd_add_command_int_arg(cmd_ptr, "rank", i);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the rank %d to the suspend command\n", i);		goto do_abort_failure;	    }	    result = smpd_add_command_int_arg(cmd_ptr, "exit_code", exit_code);	    if (result != SMPD_SUCCESS)	    {		smpd_err_printf("unable to add the exit_code %d to the suspend command\n", exit_code);		goto do_abort_failure;	    }	    if (pg->processes[i].ctx_key[0] != '\0')	    {		result = smpd_add_command_arg(cmd_ptr, "ctx_key", pg->processes[i].ctx_key);		if (result != SMPD_SUCCESS)		{		    smpd_err_printf("unable to add the ctx_key to the suspend command: '%s'\n", pg->processes[i].ctx_key);		    goto do_abort_failure;		}		/* send the suspend command */		result = smpd_post_write_command(smpd_process.left_context, cmd_ptr);		if (result != SMPD_SUCCESS)		{		    smpd_err_printf("unable to post a write for the suspend command: rank %d\n", rank);		    goto do_abort_failure;		}	    }	    else	    {		pg->processes[i].suspend_cmd = cmd_ptr;	    }	    pg->num_pending_suspends++;	}    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;do_abort_failure:    smpd_exit_fn(FCNAME);    return SMPD_FAIL;}#undef FCNAME#define FCNAME "smpd_abort_job"int smpd_abort_job(char *name, int rank, char *fmt, ...){    int result;    char error_str[2048] = "";    smpd_command_t *cmd_ptr;    smpd_context_t *context;    va_list list;    smpd_enter_fn(FCNAME);    va_start(list, fmt);    vsnprintf(error_str, 2048, fmt, list);    va_end(list);    smpd_command_destination(0, &context);    if (context == NULL)    {	result = smpd_do_abort_job(name, rank, error_str, 123);	smpd_exit_fn(FCNAME);	return result;    }    result = smpd_create_command("abort_job", smpd_process.id, 0, SMPD_FALSE, &cmd_ptr);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("unable to create an abort_job command.\n");	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    result = smpd_add_command_arg(cmd_ptr, "name", name);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("Unable to add the job name to the abort_job command.\n");	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    result = smpd_add_command_int_arg(cmd_ptr, "rank", rank);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("Unable to add the rank to the abort_job command.\n");	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    result = smpd_add_command_arg(cmd_ptr, "error", error_str);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("Unable to add the error string to the abort_job command.\n");	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_dbg_printf("sending abort_job command to %s context: \"%s\"\n", smpd_get_context_str(context), cmd_ptr->cmd);    result = smpd_post_write_command(context, cmd_ptr);    if (result != SMPD_SUCCESS)    {	smpd_err_printf("unable to post a write of the abort_job command to the %s context.\n", smpd_get_context_str(context));	smpd_exit_fn(FCNAME);	return SMPD_FAIL;    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_stdin_command"int smpd_handle_stdin_command(smpd_context_t *context){    char data[SMPD_MAX_STDOUT_LENGTH];    smpd_command_t *cmd;    smpd_process_t *piter;    smpd_stdin_write_node_t *node, *iter;    int result;    MPIU_Size_t num_written, num_decoded;    int nd;    smpd_enter_fn(FCNAME);    cmd = &context->read_cmd;    if (MPIU_Str_get_string_arg(cmd->cmd, "data", data, SMPD_MAX_STDOUT_LENGTH) == MPIU_STR_SUCCESS)    {	smpd_decode_buffer(data, data, SMPD_MAX_STDOUT_LENGTH, &nd);	num_decoded = nd;	/*printf("[%d]", rank);*/	piter = smpd_process.process_list;	while (piter)	{	    if (piter->rank == 0 || smpd_process.stdin_toall)	    {		if (piter->stdin_write_list != NULL)		{		    node = (smpd_stdin_write_node_t*)MPIU_Malloc(sizeof(smpd_stdin_write_node_t));		    if (node == NULL)			smpd_write(piter->in->sock, data, num_decoded);		    else		    {			node->buffer = (char*)MPIU_Malloc(num_decoded);			if (node->buffer == NULL)			{			    MPIU_Free(node);			    smpd_write(piter->in->sock, data, num_decoded);			}			else			{			    /* add the node to the end of the write list */			    node->length = num_decoded;			    memcpy(node->buffer, data, num_decoded);			    node->next = NULL;			    iter = piter->stdin_write_list;			    while (iter->next != NULL)				iter = iter->next;			    iter->next = node;			}		    }		}		else		{		    /* attempt to write the data immediately */		    num_written = 0;		    result = MPIDU_Sock_write(piter->in->sock, data, num_decoded, &num_written);		    if (result != MPI_SUCCESS)		    {			smpd_err_printf("unable to write data to the stdin context of process %d\n", piter->rank);		    }		    else		    {			if (num_written != num_decoded)			{			    /* If not all the data is written, copy it to a temp buffer and post a write for the remaining data */			    node = (smpd_stdin_write_node_t*)MPIU_Malloc(sizeof(smpd_stdin_write_node_t));			    if (node == NULL)				smpd_write(piter->in->sock, &data[num_written], num_decoded-num_written);			    else			    {				node->buffer = (char*)MPIU_Malloc(num_decoded-num_written);				if (node->buffer == NULL)				{				    MPIU_Free(node);				    smpd_write(piter->in->sock, &data[num_written], num_decoded-num_written);				}				else				{				    /* add the node to write list */				    node->length = num_decoded - num_written;				    memcpy(node->buffer, &data[num_written], num_decoded-num_written);				    node->next = NULL;				    piter->stdin_write_list = node;				    piter->in->write_state = SMPD_WRITING_DATA_TO_STDIN;				    result = MPIDU_Sock_post_write(piter->in->sock, node->buffer, node->length, node->length, NULL);				    if (result != MPI_SUCCESS)				    {					smpd_err_printf("unable to post a write of %d bytes to stdin for rank %d\n",					    node->length, piter->rank);				    }				}			    }			}		    }		}	    }	    piter = piter->next;	}    }    else    {	smpd_err_printf("unable to get the data from the stdin command: '%s'\n", cmd->cmd);    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_close_stdin_command"int smpd_handle_close_stdin_command(smpd_context_t *context){    smpd_process_t *piter;    int result;    smpd_enter_fn(FCNAME);    SMPD_UNREFERENCED_ARG(context);    piter = smpd_process.process_list;    while (piter)    {	if ((piter->rank == 0 || smpd_process.stdin_toall) && piter->in != NULL)	{	    piter->in->state = SMPD_CLOSING;	    piter->in->process = NULL; /* NULL this out so the closing of the stdin context won't access it after it has been freed */	    result = MPIDU_Sock_post_close(piter->in->sock);	    if (result == MPI_SUCCESS)	    {		piter->in = NULL;	    }	}	piter = piter->next;    }    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "write_to_stdout"static int write_to_stdout(const char *buffer, size_t num_bytes){#ifdef HAVE_WINDOWS_H    HANDLE hStdout;    DWORD num_written;    smpd_enter_fn(FCNAME);    /* MT - This should acquire the hLaunchProcessMutex so that it doesn't acquire a redirected handle. */    /* But since the code is not multi-threaded yet this doesn't matter */    hStdout = GetStdHandle(STD_OUTPUT_HANDLE);    WriteFile(hStdout, buffer, num_bytes, &num_written, NULL);#else    smpd_enter_fn(FCNAME);    fwrite(buffer, 1, num_bytes, stdout);    fflush(stdout);#endif    smpd_exit_fn(FCNAME);    return SMPD_SUCCESS;}#undef FCNAME#define FCNAME "smpd_handle_stdout_command"int smpd_handle_stdout_command(smpd_context_t *context){    int rank;    char data[SMPD_MAX_STDOUT_LENGTH];    smpd_command_t *cmd;    int num_decoded = 0;    int first;    char prefix[20];    size_t prefix_length;    char *token;    SMPD_BOOL ends_in_cr = SMPD_FALSE;    smpd_enter_fn(FCNAME);    cmd = &context->read_cmd;    if (MPIU_Str_get_int_arg(cmd->cmd, "rank", &rank) != MPIU_STR_SUCCESS)    {	rank = -1;	smpd_err_printf("no rank in the stdout command: '%s'\n", cmd->cmd);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -