📄 iof_base_endpoint.c
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007 Cisco, Inc. All rights reserved. * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "orte_config.h"#include <stdlib.h>#include <string.h>#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#include <errno.h>#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#ifdef HAVE_FCNTL_H#include <fcntl.h>#else#ifdef HAVE_SYS_FCNTL_H#include <sys/fcntl.h>#endif#endif#ifdef HAVE_NETINET_IN_H#include <netinet/in.h>#endif#ifdef HAVE_SIGNAL_H#include <signal.h>#endif /* HAVE_SIGNAL_H */#include "opal/util/output.h"#include "orte/mca/ns/ns.h"#include "orte/mca/rml/rml.h"#include "orte/mca/iof/base/base.h"#include "orte/mca/iof/base/iof_base_endpoint.h"#include "orte/mca/iof/base/iof_base_fragment.h"/* * Globals */static bool sigpipe_event_initialized = false;static struct opal_event sigpipe_event;static void sigpipe_signal_callback(int fd, short event, void *arg) { /* Do nothing -- the purpose of this handler is so that we don't die due to SIGPIPE, but we don't need to *do* anything in this handler. */}/** * Construct/Destructor */static void orte_iof_base_endpoint_construct(orte_iof_base_endpoint_t* endpoint){ endpoint->ep_mode = ORTE_IOF_SOURCE; /* default value */ endpoint->ep_seq = 0; endpoint->ep_ack = 0; endpoint->ep_fd = -1; memset(&endpoint->ep_event,0,sizeof(endpoint->ep_event)); OBJ_CONSTRUCT(&endpoint->ep_source_frags, opal_list_t); OBJ_CONSTRUCT(&endpoint->ep_sink_frags, opal_list_t); OBJ_CONSTRUCT(&endpoint->ep_callbacks, opal_list_t);}static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint){ if(endpoint->ep_fd >= 0) { opal_event_del(&endpoint->ep_event); } OBJ_DESTRUCT(&endpoint->ep_source_frags); OBJ_DESTRUCT(&endpoint->ep_sink_frags); OBJ_DESTRUCT(&endpoint->ep_callbacks);}OBJ_CLASS_INSTANCE( orte_iof_base_endpoint_t, opal_list_item_t, orte_iof_base_endpoint_construct, orte_iof_base_endpoint_destruct);/** * Construct/Destructor */static void orte_iof_base_callback_construct(orte_iof_base_callback_t* cb){ cb->cb_func = 0; cb->cb_data = NULL;}OBJ_CLASS_INSTANCE( orte_iof_base_callback_t, opal_list_item_t, orte_iof_base_callback_construct, NULL);/* * Callback when non-blocking RML send completes. */static void orte_iof_base_endpoint_send_cb( int status, orte_process_name_t* peer, struct iovec* msg, int count, orte_rml_tag_t tag, void* cbdata){ orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata; orte_iof_base_endpoint_t* endpoint = frag->frag_owner; opal_list_remove_item(&endpoint->ep_source_frags, &frag->super.super); opal_output(orte_iof_base.iof_output, "iof_base_endpoint: send cb, source_frags list len: %d", (int) opal_list_get_size(&endpoint->ep_source_frags)); ORTE_IOF_BASE_FRAG_RETURN(frag); /* Decrement the refcount on the endpoint; matches the RETAIN for when this frag's send was initiated in orte_iof_base_endpoint_read_handler() */ OBJ_RELEASE(endpoint);}/* * Callback when data is available on the endpoint to read. */static void orte_iof_base_endpoint_read_handler(int fd, short flags, void *cbdata){ orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)cbdata; orte_iof_base_frag_t* frag; orte_iof_base_header_t* hdr; int rc; /* allocate a fragment */ ORTE_IOF_BASE_FRAG_ALLOC(frag,rc); if(NULL == frag) { /* JMS shouldn't we do something here? */ return; } OPAL_THREAD_LOCK(&orte_iof_base.iof_lock); /* read up to the fragment size */#if !defined(__WINDOWS__) rc = read(fd, frag->frag_data, sizeof(frag->frag_data));#else { DWORD readed; HANDLE handle = (HANDLE)_get_osfhandle(fd); ReadFile(handle, frag->frag_data, sizeof(frag->frag_data), &readed, NULL); rc = (int)readed; }#endif /* !defined(__WINDOWS__) */ if (rc < 0) { /* non-blocking, retry */ if (EAGAIN == errno || EINTR == errno) { ORTE_IOF_BASE_FRAG_RETURN(frag); OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); return; } /* Error on the connection */ orte_iof_base_endpoint_closed(endpoint); /* Fall through to send 0 byte message to other side indicating that the endpoint is now closed. */ rc = 0; } else if (rc == 0) { /* peer has closed connection (will fall through to send a 0 byte message, therefore telling the RML side that the fd side has closed its connection) */ orte_iof_base_endpoint_closed(endpoint); } /* Do not append the fragment before we know that we have some data (even a 0 byte mesage is OK -- that indicates that the file descriptor has closed) */ frag->frag_owner = endpoint; opal_list_append(&endpoint->ep_source_frags, &frag->super.super); opal_output(orte_iof_base.iof_output, "iof_base_endpoint: read handler, source_frags list len: %d", (int) opal_list_get_size(&endpoint->ep_source_frags)); frag->frag_iov[1].iov_len = frag->frag_len = rc; /* fill in the header */ hdr = &frag->frag_hdr; hdr->hdr_common.hdr_type = ORTE_IOF_BASE_HDR_MSG; hdr->hdr_msg.msg_origin = endpoint->ep_origin; hdr->hdr_msg.msg_proxy = *ORTE_PROC_MY_NAME; hdr->hdr_msg.msg_tag = endpoint->ep_tag; hdr->hdr_msg.msg_seq = endpoint->ep_seq; hdr->hdr_msg.msg_len = frag->frag_len; ORTE_IOF_BASE_HDR_MSG_HTON(hdr->hdr_msg); /* if window size has been exceeded - disable forwarding */ endpoint->ep_seq += frag->frag_len; if(ORTE_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) > orte_iof_base.iof_window_size) { opal_output(orte_iof_base.iof_output, "iof_base_endpoint read handler: window exceeded -- reading disabled"); opal_event_del(&endpoint->ep_event); } OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); /* Increment the refcount on the endpoint so that it doesn't get deleted before the frag */ OBJ_RETAIN(endpoint); /* start non-blocking RML call to forward received data */ rc = orte_rml.send_nb( orte_iof_base.iof_service, frag->frag_iov, 2, ORTE_RML_TAG_IOF_SVC, 0, orte_iof_base_endpoint_send_cb, frag);}/** * Callback when the endpoint is available for write. */static void orte_iof_base_endpoint_write_handler(int sd, short flags, void *user){ int errno_save; orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)user; /* * step through the list of queued fragments and attempt to write * until the output descriptor would block */ OPAL_THREAD_LOCK(&orte_iof_base.iof_lock); while(opal_list_get_size(&endpoint->ep_sink_frags)) { orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)opal_list_get_first(&endpoint->ep_sink_frags); int rc; /* close connection on zero byte message */ if(frag->frag_len == 0) { orte_iof_base_endpoint_closed(endpoint); OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); return; } /* progress pending messages */ rc = write(endpoint->ep_fd, frag->frag_ptr, frag->frag_len); errno_save = errno; if (rc < 0) { if (EAGAIN == errno_save) { break; } if (EINTR == errno_save) { continue; } /* All other errors -- to include sigpipe -- mean that Something Bad happened and we should abort in despair. */ orte_iof_base_endpoint_closed(endpoint); /* Send a ACK-AND-CLOSE back to the service so that it knows not to wait for any further ACKs */ orte_iof_base_frag_ack(frag, true); OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); return; } frag->frag_len -= rc; frag->frag_ptr += rc; if(frag->frag_len > 0) { break; } opal_list_remove_item(&endpoint->ep_sink_frags, &frag->super.super); OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); orte_iof_base_frag_ack(frag, false); OPAL_THREAD_LOCK(&orte_iof_base.iof_lock); } /* is there anything left to write? */ if(opal_list_get_size(&endpoint->ep_sink_frags) == 0) { opal_event_del(&endpoint->ep_event); if(orte_iof_base.iof_waiting) { opal_condition_signal(&orte_iof_base.iof_condition); } } OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);}/* return true if we should read stdin from fd, false otherwise */static bool orte_iof_base_endpoint_stdin_check(int fd){#if !defined(__WINDOWS__) && defined(HAVE_TCGETPGRP) if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) { return false; }#endif /* !defined(__WINDOWS__) */ return true;}static void orte_iof_base_endpoint_stdin_cb(int sd, short flags, void *user){ orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)user; bool should_process = orte_iof_base_endpoint_stdin_check(endpoint->ep_fd); if (should_process) { opal_event_add(&endpoint->ep_event, 0); } else { opal_event_del(&endpoint->ep_event); }}/* * Lookup existing endpoint matching parameters * supplied to create. */ static orte_iof_base_endpoint_t* orte_iof_base_endpoint_lookup( const orte_process_name_t* proc, orte_iof_base_mode_t mode, int tag){ opal_list_item_t* item; for(item = opal_list_get_first(&orte_iof_base.iof_endpoints); item != opal_list_get_end(&orte_iof_base.iof_endpoints); item = opal_list_get_next(item)) { orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item; if(orte_ns.compare_fields(ORTE_NS_CMP_ALL,proc,&endpoint->ep_origin) == 0 && endpoint->ep_tag == tag && endpoint->ep_mode == mode) { OBJ_RETAIN(endpoint); return endpoint; } } return NULL;}/* * Create a local endpoint. */int orte_iof_base_endpoint_create( const orte_process_name_t* proc, orte_iof_base_mode_t mode, int tag, int fd){ orte_iof_base_endpoint_t* endpoint; int flags; int rc; OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);#if !defined(__WINDOWS__) /* If we haven't initialized the event yet, do so now */ if (!sigpipe_event_initialized) { opal_signal_set(&sigpipe_event, SIGPIPE, sigpipe_signal_callback, &sigpipe_event); opal_signal_add(&sigpipe_event, NULL); sigpipe_event_initialized = true; }#endif /* !defined(__WINDOWS__) */ if((endpoint = orte_iof_base_endpoint_lookup(proc,mode,tag)) != NULL) { OBJ_RETAIN(endpoint); OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); return ORTE_SUCCESS; } endpoint = OBJ_NEW(orte_iof_base_endpoint_t); if(NULL == endpoint) { OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); return ORTE_ERR_OUT_OF_RESOURCE; } endpoint->ep_origin = *proc; endpoint->ep_mode = mode; endpoint->ep_tag = tag; endpoint->ep_fd = fd; /* If it looks like we're on the mpirun side of a standard IO stream (like we're a SOURCE and tag is STDIN and we're mucking with fd 0), we don't want to set nonblocking. If we do so, we set the file descriptor to non-blocking for everyone that has that file descriptor, which includes everyone else in our shell pipeline chain. (See http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html). This causes things like "mpirun -np 1 big_app | cat" to lose output, because cat's stdout is then ALSO non-blocking and cat isn't built to deal with that case (same with almost all other unix text utils).
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -