⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iof_svc_sub.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
    }    /* If everyone has ACK'ed, then push the ACK up to the original       message's proxy */    if(seq_min == hdr->msg_seq+hdr->msg_len) {        /* If the original message was initiated from this process,           then the ACK delivery is local. */        if(orte_ns.compare_fields(ORTE_NS_CMP_ALL,orte_process_info.my_name,&hdr->msg_origin) == 0) {            orte_iof_base_endpoint_t* endpoint;            endpoint = orte_iof_base_endpoint_match(&hdr->msg_origin, ORTE_NS_CMP_ALL, hdr->msg_tag);            if(endpoint != NULL) {                opal_output(orte_iof_base.iof_output,                            "ack: forwarding ack locally: %u", seq_min);                orte_iof_base_endpoint_ack(endpoint, seq_min);                OBJ_RELEASE(endpoint);            }        }        /* Otherwise, the original message was initiated in another           process, and we need to forward the ACK to it. */        else {            orte_iof_base_frag_t* frag;            int rc;                ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);            if(NULL == frag) {                ORTE_ERROR_LOG(rc);                return;            }            frag->frag_hdr.hdr_msg = *hdr;            frag->frag_iov[0].iov_base = (IOVBASE_TYPE*)&frag->frag_hdr;            frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);            ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg);            opal_output(orte_iof_base.iof_output,                        "ack: forwarding ack remotely: %u", seq_min);            rc = orte_rml.send_nb(                &hdr->msg_proxy,                frag->frag_iov,                1,                ORTE_RML_TAG_IOF_SVC,                0,                ack_send_cb,                frag);            if(rc < 0) {                ORTE_ERROR_LOG(rc);            }        }    }}/** *  Delete all matching subscriptions. */int orte_iof_svc_sub_delete(    const orte_process_name_t *origin_name,    orte_ns_cmp_bitmask_t origin_mask,    orte_iof_base_tag_t origin_tag,    const orte_process_name_t *target_name,    orte_ns_cmp_bitmask_t target_mask,    orte_iof_base_tag_t target_tag){    opal_list_item_t *item;    OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);    item =  opal_list_get_first(&mca_iof_svc_component.svc_subscribed);    while(item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed)) {        opal_list_item_t* next =  opal_list_get_next(item);        orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;        if (sub->origin_mask == origin_mask &&            orte_ns.compare_fields(sub->origin_mask,&sub->origin_name,origin_name) == 0 &&            sub->origin_tag == origin_tag &&            sub->target_mask == target_mask &&            orte_ns.compare_fields(sub->target_mask,&sub->target_name,target_name) == 0 &&            sub->target_tag == target_tag) {            opal_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);            OBJ_RELEASE(item);        }        item = next;    }    OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);    return ORTE_SUCCESS;}int orte_iof_svc_sub_delete_all(    const orte_process_name_t *name){    opal_list_item_t *item;    OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);    item =  opal_list_get_first(&mca_iof_svc_component.svc_subscribed);    while(item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed)) {        opal_list_item_t* next =  opal_list_get_next(item);        orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;        if ((sub->origin_mask == ORTE_NS_CMP_ALL &&             orte_ns.compare_fields(ORTE_NS_CMP_ALL,&sub->origin_name,name) == 0) ||            (sub->target_mask == ORTE_NS_CMP_ALL &&             orte_ns.compare_fields(ORTE_NS_CMP_ALL,&sub->target_name,name) == 0)) {            opal_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);            OBJ_RELEASE(item);        }        item = next;    }    OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);    return ORTE_SUCCESS;}/* * Callback on send completion. Release send resources (fragment). */static void orte_iof_svc_sub_send_cb(    int status,    orte_process_name_t* peer,    struct iovec* msg,    int count,    orte_rml_tag_t tag,    void* cbdata){    orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;    ORTE_IOF_BASE_FRAG_RETURN(frag);    if(status < 0) {        ORTE_ERROR_LOG(status);    }}/** *  Check for matching endpoints that have been published to the *  server. Forward data out each matching endpoint. */int orte_iof_svc_sub_forward(    orte_iof_svc_sub_t* sub,    const orte_process_name_t* src,    orte_iof_base_msg_header_t* hdr,    const unsigned char* data,    bool *forward){    opal_list_item_t* item;    for(item  = opal_list_get_first(&sub->sub_forward);        item != opal_list_get_end(&sub->sub_forward);        item =  opal_list_get_next(item)) {        orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item;        orte_iof_svc_pub_t* pub = fwd->fwd_pub;        int rc;        if(pub->pub_endpoint != NULL) {            rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data);        } else {            /* forward */            orte_iof_base_frag_t* frag;            ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);            frag->frag_hdr.hdr_msg = *hdr;            frag->frag_len = frag->frag_hdr.hdr_msg.msg_len;            frag->frag_iov[0].iov_base = (IOVBASE_TYPE*)&frag->frag_hdr;            frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);            frag->frag_iov[1].iov_base = (IOVBASE_TYPE*)frag->frag_data;            frag->frag_iov[1].iov_len = frag->frag_len;            memcpy(frag->frag_data, data, frag->frag_len);            ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg);            rc = orte_rml.send_nb(                &pub->pub_proxy,                frag->frag_iov,                2,                ORTE_RML_TAG_IOF_SVC,                0,                orte_iof_svc_sub_send_cb,                frag);        }        if(rc != ORTE_SUCCESS) {            return rc;        }        *forward = true;    }    if(sub->sub_endpoint != NULL) {        *forward = true;        return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);     }    return ORTE_SUCCESS;}/** * I/O Forwarding entry - relates a published endpoint to * a subscription. */static void orte_iof_svc_fwd_construct(orte_iof_svc_fwd_t* fwd){    fwd->fwd_pub = NULL;    OBJ_CONSTRUCT(&fwd->fwd_seq_hash, opal_hash_table_t);    opal_hash_table_init(&fwd->fwd_seq_hash, 256);}static void orte_iof_svc_fwd_destruct(orte_iof_svc_fwd_t* fwd){    if(NULL != fwd->fwd_pub) {        OBJ_RELEASE(fwd->fwd_pub);    }    OBJ_DESTRUCT(&fwd->fwd_seq_hash);}OBJ_CLASS_INSTANCE(    orte_iof_svc_fwd_t,    opal_list_item_t,    orte_iof_svc_fwd_construct,    orte_iof_svc_fwd_destruct);/** *  Does the published endpoint match the destination specified *  in the subscription? */bool orte_iof_svc_fwd_match(    orte_iof_svc_sub_t* sub,    orte_iof_svc_pub_t* pub){    if (orte_ns.compare_fields(sub->target_mask,&sub->target_name,&pub->pub_name) == 0 &&        sub->origin_tag == pub->pub_tag) {        return true;    } else {        return false;    }}/** *  Create a forwarding entry */int orte_iof_svc_fwd_create(    orte_iof_svc_sub_t* sub,    orte_iof_svc_pub_t* pub){    orte_iof_svc_fwd_t* fwd = OBJ_NEW(orte_iof_svc_fwd_t);    if(NULL == fwd) {        return ORTE_ERR_OUT_OF_RESOURCE;    }    OBJ_RETAIN(pub);    fwd->fwd_pub = pub;    opal_output(orte_iof_base.iof_output, "created svc forward, sub origin [%lu,%lu,%lu], tag %d / mask %x, sub target [%lu,%lu,%lu], tag %d / mask %x :::: pub name [%lu,%lu,%lu], tag %d / mask %x\n",                ORTE_NAME_ARGS(&sub->origin_name), sub->origin_tag,                sub->origin_mask,                ORTE_NAME_ARGS(&sub->target_name), sub->target_tag,                sub->target_mask,                ORTE_NAME_ARGS(&pub->pub_name), pub->pub_tag, pub->pub_mask);    opal_list_append(&sub->sub_forward, &fwd->super);    return ORTE_SUCCESS;}/** *  Remove any forwarding entries that match the *  published endpoint. */int orte_iof_svc_fwd_delete(    orte_iof_svc_sub_t* sub,    orte_iof_svc_pub_t* pub){    opal_list_item_t* item;    for(item =  opal_list_get_first(&sub->sub_forward);        item != opal_list_get_end(&sub->sub_forward);        item =  opal_list_get_next(item)) {        orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item;        if(fwd->fwd_pub == pub) {            opal_list_remove_item(&sub->sub_forward,item);            OBJ_RELEASE(fwd);            return ORTE_SUCCESS;        }    }    return ORTE_ERR_NOT_FOUND;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -