⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iof_base_endpoint.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
       Otherwise, we're probably on the non-mpirun end of things, and       should be non-blocking.    */    if ( ! ((ORTE_IOF_SOURCE == mode && ORTE_IOF_STDIN == tag && 0 == fd) ||            (ORTE_IOF_SINK == mode && ORTE_IOF_STDOUT == tag && 1 == fd) ||            (ORTE_IOF_SINK == mode && ORTE_IOF_STDERR == tag && 2 == fd))) {        if((flags = fcntl(fd, F_GETFL, 0)) < 0) {            opal_output(orte_iof_base.iof_output, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",                         __FILE__, __LINE__, errno);        } else {            flags |= O_NONBLOCK;            fcntl(fd, F_SETFL, flags);        }    }    /* setup event handler */    switch(mode) {        case ORTE_IOF_SOURCE:            if (tag == ORTE_IOF_STDIN && isatty(endpoint->ep_fd)) {                /* We should avoid trying to read from stdin if we                   have a terminal, but are backgrounded.  Catch the                   signals that are commonly used when we switch                   between being backgrounded and not.  If the                   filedescriptor is not a tty, don't worry about it                   and always stay connected. */#if !defined(__WINDOWS__)                opal_signal_set(&(endpoint->ep_stdin_event),                                SIGCONT,                                orte_iof_base_endpoint_stdin_cb,                                endpoint);                opal_signal_add(&(endpoint->ep_stdin_event), NULL);#endif  /* !defined(__WINDOWS__) */            }            /* always setup the event, but only add it if we should be               reading from stdin right now (per rules above) */            opal_event_set(                           &endpoint->ep_event,                           endpoint->ep_fd,                           OPAL_EV_READ|OPAL_EV_PERSIST,                           orte_iof_base_endpoint_read_handler,                           endpoint);            if (tag != ORTE_IOF_STDIN ||                 orte_iof_base_endpoint_stdin_check(endpoint->ep_fd)) {                rc = opal_event_add(&endpoint->ep_event, 0);                if (ORTE_SUCCESS != rc) return rc;            }            break;        case ORTE_IOF_SINK:            /* Create the event for use later; don't add it now */            opal_event_set(                &endpoint->ep_event,                endpoint->ep_fd,                OPAL_EV_WRITE|OPAL_EV_PERSIST,                orte_iof_base_endpoint_write_handler,                endpoint);            break;        default:            opal_output(orte_iof_base.iof_output, "orte_iof_base_endpoint_create: invalid mode %d\n", mode);            return ORTE_ERR_BAD_PARAM;    }    opal_list_append(&orte_iof_base.iof_endpoints, &endpoint->super);    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    return ORTE_SUCCESS;}/* * Close one or more matching endpoints. */int orte_iof_base_endpoint_delete(    const orte_process_name_t* proc,    orte_ns_cmp_bitmask_t mask,    int tag){    opal_list_item_t* item;    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    item =  opal_list_get_first(&orte_iof_base.iof_endpoints);    while(item != opal_list_get_end(&orte_iof_base.iof_endpoints)) {        opal_list_item_t* next =  opal_list_get_next(item);        orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item;        if (orte_ns.compare_fields(mask,proc,&endpoint->ep_origin) == 0) {            if (endpoint->ep_tag == tag ||                 ORTE_IOF_ANY == endpoint->ep_tag ||                 ORTE_IOF_ANY == tag) {                opal_list_remove_item(&orte_iof_base.iof_endpoints,&endpoint->super);                OBJ_RELEASE(endpoint);                OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);                return ORTE_SUCCESS;            }        }        item = next;    }    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    return ORTE_ERR_NOT_FOUND;}/* *  Connection has gone away - cleanup and signal SOH monitor. */void orte_iof_base_endpoint_closed(orte_iof_base_endpoint_t* endpoint){    /* For sinks: discard any fragments that were waiting to be       written down the fd (because the process on the other side of       the fd is no longer there -- we're just about to close the       fd). */    if (ORTE_IOF_SINK == endpoint->ep_mode) {        while (NULL != opal_list_remove_first(&(endpoint->ep_sink_frags))){            continue;        }        /* Upper layer will take care of signaling any waiting           condition variable -- no need to do it here */    }    /* Special case: if we're a sink and one of the special streams       (stdout or stderr), don't close anything because we don't want       to *actually* close stdout or stderr just because a remote       process closes theirs (but we do if a remote source/stdin       closes theirs, for example). */    if (ORTE_IOF_SINK == endpoint->ep_mode &&         (ORTE_IOF_STDOUT == endpoint->ep_tag ||         ORTE_IOF_STDERR == endpoint->ep_tag)) {        return;    }    /* remove any event handlers */    opal_event_del(&endpoint->ep_event);    /* close associated file descriptor */    close(endpoint->ep_fd);    endpoint->ep_fd = -1;}/* *  Lookup endpoint based on destination process name/mask/tag. */orte_iof_base_endpoint_t* orte_iof_base_endpoint_match(    const orte_process_name_t* target_name,     orte_ns_cmp_bitmask_t target_mask,    int target_tag){    opal_list_item_t* item;    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    for(item =  opal_list_get_first(&orte_iof_base.iof_endpoints);        item != opal_list_get_end(&orte_iof_base.iof_endpoints);        item =  opal_list_get_next(item)) {        orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item;        if(orte_ns.compare_fields(target_mask,target_name,&endpoint->ep_origin) == 0) {            if(endpoint->ep_tag == target_tag ||                endpoint->ep_tag == ORTE_IOF_ANY ||                target_tag == ORTE_IOF_ANY) {                OBJ_RETAIN(endpoint);                OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);                return endpoint;            }        }    }    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    return NULL;}/* * Forward data out the endpoint as the destination  * is available. Queue incomplete fragments in order * received and process as the destination becomes available. */int orte_iof_base_endpoint_forward(    orte_iof_base_endpoint_t* endpoint,     const orte_process_name_t* origin,    orte_iof_base_msg_header_t* hdr,    const unsigned char* data){    opal_list_item_t* item;    orte_iof_base_frag_t* frag;    size_t len = hdr->msg_len;    int rc = 0;    if(endpoint->ep_mode != ORTE_IOF_SINK) {        return ORTE_ERR_BAD_PARAM;    }    /* allocate and initialize a fragment */    ORTE_IOF_BASE_FRAG_ALLOC(frag, rc);    if(NULL == frag) {        return ORTE_ERR_OUT_OF_RESOURCE;    }    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    frag->frag_owner = endpoint;    frag->frag_src = *origin;    frag->frag_hdr.hdr_msg = *hdr;    frag->frag_len = len;    /* call any registered callbacks */     for(item =  opal_list_get_first(&endpoint->ep_callbacks);        item != opal_list_get_end(&endpoint->ep_callbacks);        item =  opal_list_get_next(item)) {        orte_iof_base_callback_t* cb = (orte_iof_base_callback_t*)item;        cb->cb_func(           &hdr->msg_origin,             hdr->msg_tag,             cb->cb_data,             data,             hdr->msg_len);    }    if(endpoint->ep_fd >= 0) {        /* try to write w/out copying data */               if(opal_list_get_size(&endpoint->ep_sink_frags) == 0) {            if(len == 0) {                /* No ACK required because the frag is of 0 length                   (ACKs are based on fragment length; an ACK of 0                   bytes would do nothing) */                ORTE_IOF_BASE_FRAG_RETURN(frag);                orte_iof_base_endpoint_closed(endpoint);                OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);                return ORTE_SUCCESS;            }            rc = write(endpoint->ep_fd,data,len);            if(rc < 0) {                if (errno != EAGAIN && errno != EINTR) {                    orte_iof_base_endpoint_closed(endpoint);                    /* Send a ACK-AND-CLOSE back to the service so                       that it knows not to wait for any further                       ACKs */                    orte_iof_base_frag_ack(frag, true);                    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);                    return ORTE_SUCCESS;                }                rc = 0;  /* don't affect the remaining length of the data */            }            frag->frag_len -= rc;        }        /* Ensure to handle both cases:           1. When ep_sink_frags was not empty (regardless of frag_len)           2. When ep_sink_frags was empty, but we fell through from above */        if(frag->frag_len > 0 || 0 == len) {            /* handle incomplete write - also queue up 0 byte message              * and recognize this as a request to close the descriptor             * when all pending operations complete             */            frag->frag_ptr = frag->frag_data;            memcpy(frag->frag_ptr, data+rc, frag->frag_len);            opal_list_append(&endpoint->ep_sink_frags, &frag->super.super);            /* If we're the first frag to be put on the sink_frags               list, then enable the event that will tell us when the               fd becomes writeable */            if(opal_list_get_size(&endpoint->ep_sink_frags) == 1) {                opal_output(orte_iof_base.iof_output, "iof_base_endpoint forwarding frag; re-enabled reading for endpoint");                opal_event_add(&endpoint->ep_event,0);            }            OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);        } else {            OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);            /* acknowledge fragment */            orte_iof_base_frag_ack(frag, false);        }    } else {        OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);        /* acknowledge fragment */        orte_iof_base_frag_ack(frag, false);    }    return ORTE_SUCCESS;}/** * Register a callback */int orte_iof_base_callback_create(    const orte_process_name_t* proc,    int tag,    orte_iof_base_callback_fn_t cbfunc,    void *cbdata){    orte_iof_base_callback_t* cb = OBJ_NEW(orte_iof_base_callback_t);    orte_iof_base_endpoint_t* endpoint;    if(NULL == cb)        return ORTE_ERR_OUT_OF_RESOURCE;    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    if((endpoint = orte_iof_base_endpoint_lookup(proc,ORTE_IOF_SINK,tag)) == NULL) {        endpoint = OBJ_NEW(orte_iof_base_endpoint_t);        if(NULL == endpoint) {            OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);            return ORTE_ERR_OUT_OF_RESOURCE;        }        endpoint->ep_origin = *proc;        endpoint->ep_mode = ORTE_IOF_SINK;        endpoint->ep_tag = tag;        endpoint->ep_fd = -1;        opal_list_append(&orte_iof_base.iof_endpoints, &endpoint->super);    } else {        OBJ_RETAIN(endpoint);    }    cb->cb_func = cbfunc;    cb->cb_data = cbdata;    opal_list_append(&endpoint->ep_callbacks, (opal_list_item_t*)cb);    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    return ORTE_SUCCESS;}/** * Remove a callback */int orte_iof_base_callback_delete(    const orte_process_name_t* proc,    int tag){    orte_iof_base_endpoint_t* endpoint;    opal_list_item_t* item;        OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);     if(NULL == (endpoint = orte_iof_base_endpoint_lookup(proc,ORTE_IOF_SINK, tag))) {        OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);        return ORTE_ERR_NOT_FOUND;    }    while(NULL != (item = opal_list_remove_first(&endpoint->ep_callbacks))) {        OBJ_RELEASE(item);    }    OBJ_RELEASE(endpoint);    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    return ORTE_SUCCESS;}/** * Update the acknowledged sequence number. If forwarding had * previously been disabled as the window closed, and the window * is now open, re-enable forwarding. */int orte_iof_base_endpoint_ack(    orte_iof_base_endpoint_t* endpoint,    uint32_t seq){    bool window_closed, window_open;    OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);    window_closed =            ORTE_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) >= orte_iof_base.iof_window_size;    endpoint->ep_ack = seq;    window_open =            ORTE_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) < orte_iof_base.iof_window_size;    /* someone is waiting on all output to be flushed */    if(orte_iof_base.iof_waiting && endpoint->ep_seq == endpoint->ep_ack) {        opal_condition_signal(&orte_iof_base.iof_condition);    }    /* check to see if we need to reenable forwarding */    if(window_closed && window_open) {        opal_output(orte_iof_base.iof_output, "iof_base_endpoint ack; re-enabled reading for endpoint");        opal_event_add(&endpoint->ep_event, 0);    }    OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);    return ORTE_SUCCESS;}/* * See description in iof_base_endpoint.h */bool orte_iof_base_endpoint_have_pending_frags(    orte_iof_base_endpoint_t* endpoint){    if (ORTE_IOF_SOURCE == endpoint->ep_mode) {        return !opal_list_is_empty(&endpoint->ep_source_frags);    } else {        return !opal_list_is_empty(&endpoint->ep_sink_frags);    }}/* * See description in iof_base_endpoint.h */bool orte_iof_base_endpoint_have_pending_acks(    orte_iof_base_endpoint_t* endpoint){    if (ORTE_IOF_SOURCE == endpoint->ep_mode) {        return (endpoint->ep_seq == endpoint->ep_ack);    } else {        return true;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -