📄 iof_svc_sub.c
字号:
} /* If everyone has ACK'ed, then push the ACK up to the original message's proxy */ if(seq_min == hdr->msg_seq+hdr->msg_len) { /* If the original message was initiated from this process, then the ACK delivery is local. */ if(orte_ns.compare_fields(ORTE_NS_CMP_ALL,orte_process_info.my_name,&hdr->msg_origin) == 0) { orte_iof_base_endpoint_t* endpoint; endpoint = orte_iof_base_endpoint_match(&hdr->msg_origin, ORTE_NS_CMP_ALL, hdr->msg_tag); if(endpoint != NULL) { opal_output(orte_iof_base.iof_output, "ack: forwarding ack locally: %u", seq_min); orte_iof_base_endpoint_ack(endpoint, seq_min); OBJ_RELEASE(endpoint); } } /* Otherwise, the original message was initiated in another process, and we need to forward the ACK to it. */ else { orte_iof_base_frag_t* frag; int rc; ORTE_IOF_BASE_FRAG_ALLOC(frag,rc); if(NULL == frag) { ORTE_ERROR_LOG(rc); return; } frag->frag_hdr.hdr_msg = *hdr; frag->frag_iov[0].iov_base = (IOVBASE_TYPE*)&frag->frag_hdr; frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr); ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg); opal_output(orte_iof_base.iof_output, "ack: forwarding ack remotely: %u", seq_min); rc = orte_rml.send_nb( &hdr->msg_proxy, frag->frag_iov, 1, ORTE_RML_TAG_IOF_SVC, 0, ack_send_cb, frag); if(rc < 0) { ORTE_ERROR_LOG(rc); } } }}/** * Delete all matching subscriptions. */int orte_iof_svc_sub_delete( const orte_process_name_t *origin_name, orte_ns_cmp_bitmask_t origin_mask, orte_iof_base_tag_t origin_tag, const orte_process_name_t *target_name, orte_ns_cmp_bitmask_t target_mask, orte_iof_base_tag_t target_tag){ opal_list_item_t *item; OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock); item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed); while(item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed)) { opal_list_item_t* next = opal_list_get_next(item); orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item; if (sub->origin_mask == origin_mask && orte_ns.compare_fields(sub->origin_mask,&sub->origin_name,origin_name) == 0 && sub->origin_tag == origin_tag && sub->target_mask == target_mask && orte_ns.compare_fields(sub->target_mask,&sub->target_name,target_name) == 0 && sub->target_tag == target_tag) { opal_list_remove_item(&mca_iof_svc_component.svc_subscribed, item); OBJ_RELEASE(item); } item = next; } OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock); return ORTE_SUCCESS;}int orte_iof_svc_sub_delete_all( const orte_process_name_t *name){ opal_list_item_t *item; OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock); item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed); while(item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed)) { opal_list_item_t* next = opal_list_get_next(item); orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item; if ((sub->origin_mask == ORTE_NS_CMP_ALL && orte_ns.compare_fields(ORTE_NS_CMP_ALL,&sub->origin_name,name) == 0) || (sub->target_mask == ORTE_NS_CMP_ALL && orte_ns.compare_fields(ORTE_NS_CMP_ALL,&sub->target_name,name) == 0)) { opal_list_remove_item(&mca_iof_svc_component.svc_subscribed, item); OBJ_RELEASE(item); } item = next; } OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock); return ORTE_SUCCESS;}/* * Callback on send completion. Release send resources (fragment). */static void orte_iof_svc_sub_send_cb( int status, orte_process_name_t* peer, struct iovec* msg, int count, orte_rml_tag_t tag, void* cbdata){ orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata; ORTE_IOF_BASE_FRAG_RETURN(frag); if(status < 0) { ORTE_ERROR_LOG(status); }}/** * Check for matching endpoints that have been published to the * server. Forward data out each matching endpoint. */int orte_iof_svc_sub_forward( orte_iof_svc_sub_t* sub, const orte_process_name_t* src, orte_iof_base_msg_header_t* hdr, const unsigned char* data, bool *forward){ opal_list_item_t* item; for(item = opal_list_get_first(&sub->sub_forward); item != opal_list_get_end(&sub->sub_forward); item = opal_list_get_next(item)) { orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item; orte_iof_svc_pub_t* pub = fwd->fwd_pub; int rc; if(pub->pub_endpoint != NULL) { rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data); } else { /* forward */ orte_iof_base_frag_t* frag; ORTE_IOF_BASE_FRAG_ALLOC(frag,rc); frag->frag_hdr.hdr_msg = *hdr; frag->frag_len = frag->frag_hdr.hdr_msg.msg_len; frag->frag_iov[0].iov_base = (IOVBASE_TYPE*)&frag->frag_hdr; frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr); frag->frag_iov[1].iov_base = (IOVBASE_TYPE*)frag->frag_data; frag->frag_iov[1].iov_len = frag->frag_len; memcpy(frag->frag_data, data, frag->frag_len); ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg); rc = orte_rml.send_nb( &pub->pub_proxy, frag->frag_iov, 2, ORTE_RML_TAG_IOF_SVC, 0, orte_iof_svc_sub_send_cb, frag); } if(rc != ORTE_SUCCESS) { return rc; } *forward = true; } if(sub->sub_endpoint != NULL) { *forward = true; return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data); } return ORTE_SUCCESS;}/** * I/O Forwarding entry - relates a published endpoint to * a subscription. */static void orte_iof_svc_fwd_construct(orte_iof_svc_fwd_t* fwd){ fwd->fwd_pub = NULL; OBJ_CONSTRUCT(&fwd->fwd_seq_hash, opal_hash_table_t); opal_hash_table_init(&fwd->fwd_seq_hash, 256);}static void orte_iof_svc_fwd_destruct(orte_iof_svc_fwd_t* fwd){ if(NULL != fwd->fwd_pub) { OBJ_RELEASE(fwd->fwd_pub); } OBJ_DESTRUCT(&fwd->fwd_seq_hash);}OBJ_CLASS_INSTANCE( orte_iof_svc_fwd_t, opal_list_item_t, orte_iof_svc_fwd_construct, orte_iof_svc_fwd_destruct);/** * Does the published endpoint match the destination specified * in the subscription? */bool orte_iof_svc_fwd_match( orte_iof_svc_sub_t* sub, orte_iof_svc_pub_t* pub){ if (orte_ns.compare_fields(sub->target_mask,&sub->target_name,&pub->pub_name) == 0 && sub->origin_tag == pub->pub_tag) { return true; } else { return false; }}/** * Create a forwarding entry */int orte_iof_svc_fwd_create( orte_iof_svc_sub_t* sub, orte_iof_svc_pub_t* pub){ orte_iof_svc_fwd_t* fwd = OBJ_NEW(orte_iof_svc_fwd_t); if(NULL == fwd) { return ORTE_ERR_OUT_OF_RESOURCE; } OBJ_RETAIN(pub); fwd->fwd_pub = pub; opal_output(orte_iof_base.iof_output, "created svc forward, sub origin [%lu,%lu,%lu], tag %d / mask %x, sub target [%lu,%lu,%lu], tag %d / mask %x :::: pub name [%lu,%lu,%lu], tag %d / mask %x\n", ORTE_NAME_ARGS(&sub->origin_name), sub->origin_tag, sub->origin_mask, ORTE_NAME_ARGS(&sub->target_name), sub->target_tag, sub->target_mask, ORTE_NAME_ARGS(&pub->pub_name), pub->pub_tag, pub->pub_mask); opal_list_append(&sub->sub_forward, &fwd->super); return ORTE_SUCCESS;}/** * Remove any forwarding entries that match the * published endpoint. */int orte_iof_svc_fwd_delete( orte_iof_svc_sub_t* sub, orte_iof_svc_pub_t* pub){ opal_list_item_t* item; for(item = opal_list_get_first(&sub->sub_forward); item != opal_list_get_end(&sub->sub_forward); item = opal_list_get_next(item)) { orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item; if(fwd->fwd_pub == pub) { opal_list_remove_item(&sub->sub_forward,item); OBJ_RELEASE(fwd); return ORTE_SUCCESS; } } return ORTE_ERR_NOT_FOUND;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -