⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iof_base_endpoint.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * Copyright (c) 2007      Cisco, Inc.  All rights reserved. * Copyright (c) 2007      Sun Microsystems, Inc.  All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "orte_config.h"#include <stdlib.h>#include <string.h>#ifdef HAVE_UNISTD_H#include <unistd.h>#endif#include <errno.h>#ifdef HAVE_SYS_TYPES_H#include <sys/types.h>#endif#ifdef HAVE_FCNTL_H#include <fcntl.h>#else#ifdef HAVE_SYS_FCNTL_H#include <sys/fcntl.h>#endif#endif#ifdef HAVE_NETINET_IN_H#include <netinet/in.h>#endif#ifdef HAVE_SIGNAL_H#include <signal.h>#endif  /* HAVE_SIGNAL_H */#include "opal/util/output.h"#include "orte/mca/ns/ns.h"#include "orte/mca/rml/rml.h"#include "orte/mca/iof/base/base.h"#include "orte/mca/iof/base/iof_base_endpoint.h"#include "orte/mca/iof/base/iof_base_fragment.h"/* * Globals */static bool sigpipe_event_initialized = false;static struct opal_event sigpipe_event;static void sigpipe_signal_callback(int fd, short event, void *arg) {    /* Do nothing -- the purpose of this handler is so that we don't       die due to SIGPIPE, but we don't need to *do* anything in this       handler. */}/** *  Construct/Destructor */static void orte_iof_base_endpoint_construct(orte_iof_base_endpoint_t* endpoint){    endpoint->ep_mode = ORTE_IOF_SOURCE;  /* default value */    endpoint->ep_seq = 0;    endpoint->ep_ack = 0;    endpoint->ep_fd = -1;    memset(&endpoint->ep_event,0,sizeof(endpoint->ep_event));    OBJ_CONSTRUCT(&endpoint->ep_source_frags, opal_list_t);    OBJ_CONSTRUCT(&endpoint->ep_sink_frags, opal_list_t);    OBJ_CONSTRUCT(&endpoint->ep_callbacks, opal_list_t);}static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint){    if(endpoint->ep_fd >= 0) {        opal_event_del(&endpoint->ep_event);    }    OBJ_DESTRUCT(&endpoint->ep_source_frags);    OBJ_DESTRUCT(&endpoint->ep_sink_frags);    OBJ_DESTRUCT(&endpoint->ep_callbacks);}OBJ_CLASS_INSTANCE(    orte_iof_base_endpoint_t,    opal_list_item_t,    orte_iof_base_endpoint_construct,    orte_iof_base_endpoint_destruct);/** *  Construct/Destructor */static void orte_iof_base_callback_construct(orte_iof_base_callback_t* cb){    cb->cb_func = 0;    cb->cb_data = NULL;}OBJ_CLASS_INSTANCE(    orte_iof_base_callback_t,    opal_list_item_t,    orte_iof_base_callback_construct,    NULL);/* * Callback when non-blocking RML send completes. */static void orte_iof_base_endpoint_send_cb(    int status,    orte_process_name_t* peer,    struct iovec* msg,    int count,    orte_rml_tag_t tag,    void* cbdata){    orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;    orte_iof_base_endpoint_t* endpoint = frag->frag_owner;    opal_list_remove_item(&endpoint->ep_source_frags, &frag->super.super);    opal_output(orte_iof_base.iof_output, "iof_base_endpoint: send cb, source_frags list len: %d",                (int) opal_list_get_size(&endpoint->ep_source_frags));    ORTE_IOF_BASE_FRAG_RETURN(frag);    /* Decrement the refcount on the endpoint; matches the RETAIN for       when this frag's send was initiated in       orte_iof_base_endpoint_read_handler() */    OBJ_RELEASE(endpoint);}/* *  Callback when data is available on the endpoint to read. */static void orte_iof_base_endpoint_read_handler(int fd, short flags, void *cbdata){    orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)cbdata;    orte_iof_base_frag_t* frag;    orte_iof_base_header_t* hdr;    int rc;    /* allocate a fragment */    ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);    if(NULL == frag) {        /* JMS shouldn't we do something here? */        return;    }    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    /* read up to the fragment size */#if !defined(__WINDOWS__)    rc = read(fd, frag->frag_data, sizeof(frag->frag_data));#else    {        DWORD readed;        HANDLE handle = (HANDLE)_get_osfhandle(fd);        ReadFile(handle, frag->frag_data, sizeof(frag->frag_data), &readed, NULL);        rc = (int)readed;    }#endif  /* !defined(__WINDOWS__) */    if (rc < 0) {        /* non-blocking, retry */        if (EAGAIN == errno || EINTR == errno) {            ORTE_IOF_BASE_FRAG_RETURN(frag);            OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);            return;        }         /* Error on the connection */        orte_iof_base_endpoint_closed(endpoint);        /* Fall through to send 0 byte message to other side           indicating that the endpoint is now closed. */        rc = 0;    } else if (rc == 0) {        /* peer has closed connection (will fall through to send a 0           byte message, therefore telling the RML side that the fd           side has closed its connection) */        orte_iof_base_endpoint_closed(endpoint);    }    /* Do not append the fragment before we know that we have some       data (even a 0 byte mesage is OK -- that indicates that the       file descriptor has closed) */    frag->frag_owner = endpoint;    opal_list_append(&endpoint->ep_source_frags, &frag->super.super);    opal_output(orte_iof_base.iof_output, "iof_base_endpoint: read handler, source_frags list len: %d",                (int) opal_list_get_size(&endpoint->ep_source_frags));    frag->frag_iov[1].iov_len = frag->frag_len = rc;    /* fill in the header */    hdr = &frag->frag_hdr;    hdr->hdr_common.hdr_type = ORTE_IOF_BASE_HDR_MSG;    hdr->hdr_msg.msg_origin = endpoint->ep_origin;    hdr->hdr_msg.msg_proxy = *ORTE_PROC_MY_NAME;    hdr->hdr_msg.msg_tag = endpoint->ep_tag;    hdr->hdr_msg.msg_seq = endpoint->ep_seq;    hdr->hdr_msg.msg_len = frag->frag_len;    ORTE_IOF_BASE_HDR_MSG_HTON(hdr->hdr_msg);    /* if window size has been exceeded - disable forwarding */    endpoint->ep_seq += frag->frag_len;    if(ORTE_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) > orte_iof_base.iof_window_size) {        opal_output(orte_iof_base.iof_output, "iof_base_endpoint read handler: window exceeded -- reading disabled");        opal_event_del(&endpoint->ep_event);    }    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    /* Increment the refcount on the endpoint so that it doesn't get       deleted before the frag */    OBJ_RETAIN(endpoint);    /* start non-blocking RML call to forward received data */    rc = orte_rml.send_nb(        orte_iof_base.iof_service,         frag->frag_iov,         2,        ORTE_RML_TAG_IOF_SVC,        0,        orte_iof_base_endpoint_send_cb,        frag);}/** * Callback when the endpoint is available for write. */static void orte_iof_base_endpoint_write_handler(int sd, short flags, void *user){    int errno_save;    orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)user;     /*     * step through the list of queued fragments and attempt to write     * until the output descriptor would block    */    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    while(opal_list_get_size(&endpoint->ep_sink_frags)) {        orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)opal_list_get_first(&endpoint->ep_sink_frags);        int rc;        /* close connection on zero byte message */        if(frag->frag_len == 0) {            orte_iof_base_endpoint_closed(endpoint);            OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);            return;        }        /* progress pending messages */        rc = write(endpoint->ep_fd, frag->frag_ptr, frag->frag_len);        errno_save = errno;        if (rc < 0) {            if (EAGAIN == errno_save) {               break;            }            if (EINTR == errno_save) {                continue;            }            /* All other errors -- to include sigpipe -- mean that               Something Bad happened and we should abort in               despair. */            orte_iof_base_endpoint_closed(endpoint);            /* Send a ACK-AND-CLOSE back to the service so that it               knows not to wait for any further ACKs */            orte_iof_base_frag_ack(frag, true);            OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);            return;        }        frag->frag_len -= rc;        frag->frag_ptr += rc;        if(frag->frag_len > 0) {            break;        }        opal_list_remove_item(&endpoint->ep_sink_frags, &frag->super.super);        OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);        orte_iof_base_frag_ack(frag, false);        OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    }    /* is there anything left to write? */    if(opal_list_get_size(&endpoint->ep_sink_frags) == 0) {        opal_event_del(&endpoint->ep_event);        if(orte_iof_base.iof_waiting) {            opal_condition_signal(&orte_iof_base.iof_condition);        }    }    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);}/* return true if we should read stdin from fd, false otherwise */static bool orte_iof_base_endpoint_stdin_check(int fd){#if !defined(__WINDOWS__) && defined(HAVE_TCGETPGRP)    if( isatty(fd) && (getpgrp() != tcgetpgrp(fd)) ) {        return false;    }#endif  /* !defined(__WINDOWS__) */    return true;}static void orte_iof_base_endpoint_stdin_cb(int sd, short flags, void *user){    orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)user;     bool should_process = orte_iof_base_endpoint_stdin_check(endpoint->ep_fd);    if (should_process) {        opal_event_add(&endpoint->ep_event, 0);    } else {        opal_event_del(&endpoint->ep_event);    }}/* * Lookup existing endpoint matching parameters * supplied to create. */ static orte_iof_base_endpoint_t* orte_iof_base_endpoint_lookup(    const orte_process_name_t* proc,    orte_iof_base_mode_t mode,    int tag){    opal_list_item_t* item;    for(item =  opal_list_get_first(&orte_iof_base.iof_endpoints);        item != opal_list_get_end(&orte_iof_base.iof_endpoints);        item =  opal_list_get_next(item)) {        orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item;        if(orte_ns.compare_fields(ORTE_NS_CMP_ALL,proc,&endpoint->ep_origin) == 0 &&           endpoint->ep_tag == tag && endpoint->ep_mode == mode) {            OBJ_RETAIN(endpoint);            return endpoint;        }    }    return NULL;}/* *  Create a local endpoint. */int orte_iof_base_endpoint_create(    const orte_process_name_t* proc,    orte_iof_base_mode_t mode,    int tag,    int fd){    orte_iof_base_endpoint_t* endpoint;    int flags;    int rc;     OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);#if !defined(__WINDOWS__)    /* If we haven't initialized the event yet, do so now */    if (!sigpipe_event_initialized) {        opal_signal_set(&sigpipe_event, SIGPIPE,                        sigpipe_signal_callback, &sigpipe_event);        opal_signal_add(&sigpipe_event, NULL);        sigpipe_event_initialized = true;    }#endif  /* !defined(__WINDOWS__) */    if((endpoint = orte_iof_base_endpoint_lookup(proc,mode,tag)) != NULL) {        OBJ_RETAIN(endpoint);        OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);        return ORTE_SUCCESS;    }    endpoint = OBJ_NEW(orte_iof_base_endpoint_t);    if(NULL == endpoint) {        OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);        return ORTE_ERR_OUT_OF_RESOURCE;    }    endpoint->ep_origin = *proc;    endpoint->ep_mode = mode;    endpoint->ep_tag = tag;    endpoint->ep_fd = fd;    /* If it looks like we're on the mpirun side of a standard IO       stream (like we're a SOURCE and tag is STDIN and we're mucking       with fd 0), we don't want to set nonblocking.  If we do so, we       set the file descriptor to non-blocking for everyone that has       that file descriptor, which includes everyone else in our shell       pipeline chain.  (See       http://lists.freebsd.org/pipermail/freebsd-hackers/2005-January/009742.html).       This causes things like "mpirun -np 1 big_app | cat" to lose       output, because cat's stdout is then ALSO non-blocking and cat       isn't built to deal with that case (same with almost all other       unix text utils). 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -