⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oob_tcp.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 4 页
字号:
           return;        }        hdr->msg_src.cellid = ORTE_PROC_MY_NAME->cellid;    }    /* lookup the corresponding process */    peer = mca_oob_tcp_peer_lookup(&hdr->msg_src);    if(NULL == peer) {        opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: unable to locate peer",                ORTE_NAME_ARGS(orte_process_info.my_name));        CLOSE_THE_SOCKET(sd);        return;    }    /* is the peer instance willing to accept this connection */    if(mca_oob_tcp_peer_accept(peer, sd) == false) {        if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) {            opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_recv_handler: "                    "rejected connection from [%lu,%lu,%lu] connection state %d",                    ORTE_NAME_ARGS(orte_process_info.my_name),                    ORTE_NAME_ARGS(&(peer->peer_name)),                    ORTE_NAME_ARGS(&(hdr->msg_src)),                    peer->peer_state);        }        CLOSE_THE_SOCKET(sd);        return;    }}/* * Event callback when there is data available on the registered * socket to recv. */static void mca_oob_tcp_recv_handler(int sd, short flags, void* user){    mca_oob_tcp_hdr_t hdr;    mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user;    int rc;    /* accept new connections on the listen socket */    if(mca_oob_tcp_component.tcp_listen_sd == sd) {        mca_oob_tcp_accept();        return;    }    OBJ_RELEASE(event);    /* Some mem checkers don't realize that hdr will guarantee to be       fully filled in during the read(), below :-( */    OMPI_DEBUG_ZERO(hdr);    /* recv the process identifier */    while((rc = recv(sd, (char *)&hdr, sizeof(hdr), 0)) != sizeof(hdr)) {        if(rc >= 0) {            if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) {                opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: peer closed connection",                    ORTE_NAME_ARGS(orte_process_info.my_name));            }            CLOSE_THE_SOCKET(sd);            return;        }        if(opal_socket_errno != EINTR) {            opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: recv() failed: %s (%d)\n",                ORTE_NAME_ARGS(orte_process_info.my_name), strerror(opal_socket_errno), opal_socket_errno);            CLOSE_THE_SOCKET(sd);            return;        }    }    MCA_OOB_TCP_HDR_NTOH(&hdr);    /* dispatch based on message type */    switch(hdr.msg_type) {        case MCA_OOB_TCP_PROBE:            mca_oob_tcp_recv_probe(sd, &hdr);            break;        case MCA_OOB_TCP_CONNECT:            mca_oob_tcp_recv_connect(sd, &hdr);            break;        default:            opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: invalid message type: %d\n",                ORTE_NAME_ARGS(orte_process_info.my_name), hdr.msg_type);            CLOSE_THE_SOCKET(sd);            break;    }}/* * Component initialization - create a module. * (1) initialize static resources * (2) create listen socket */mca_oob_t* mca_oob_tcp_component_init(int* priority){    int i;    bool found_local = false;    bool found_nonlocal = false;    *priority = 1;    /* are there any interfaces? */    if(opal_ifcount() <= 0)        return NULL;    for (i = opal_ifbegin() ; i > 0 ; i = opal_ifnext(i)) {        char name[32];        mca_oob_tcp_device_t *dev;        opal_ifindextoname(i, name, sizeof(name));        if (mca_oob_tcp_component.tcp_include != NULL &&            strstr(mca_oob_tcp_component.tcp_include,name) == NULL) {            continue;        }        if (mca_oob_tcp_component.tcp_exclude != NULL &&            strstr(mca_oob_tcp_component.tcp_exclude,name) != NULL) {            continue;        }        dev = OBJ_NEW(mca_oob_tcp_device_t);        dev->if_index = i;        opal_ifindextoaddr(i, (struct sockaddr*) &dev->if_addr, sizeof(struct sockaddr_in));        if(opal_ifislocalhost((struct sockaddr*) &dev->if_addr)) {            dev->if_local = true;            found_local = true;        } else {            dev->if_local = false;            found_nonlocal = true;        }        opal_list_append(&mca_oob_tcp_component.tcp_available_devices,                         &dev->super);    }    if (found_local && found_nonlocal) {        opal_list_item_t *item;        for (item = opal_list_get_first(&mca_oob_tcp_component.tcp_available_devices) ;             item != opal_list_get_end(&mca_oob_tcp_component.tcp_available_devices) ;             item = opal_list_get_next(item)) {            mca_oob_tcp_device_t *dev = (mca_oob_tcp_device_t*) item;            if (dev->if_local) {                item = opal_list_remove_item(&mca_oob_tcp_component.tcp_available_devices,                                             item);            }        }    }    if (opal_list_get_size(&mca_oob_tcp_component.tcp_available_devices) == 0) {        if (NULL != mca_oob_tcp_component.tcp_include) {            opal_output(0, "None of the specified TCP interfaces found (%s)",                        mca_oob_tcp_component.tcp_include);        }        return NULL;    }    /* initialize data structures */    opal_hash_table_init(&mca_oob_tcp_component.tcp_peers, 128);    opal_hash_table_init(&mca_oob_tcp_component.tcp_peer_names, 128);    opal_free_list_init(&mca_oob_tcp_component.tcp_peer_free,        sizeof(mca_oob_tcp_peer_t),        OBJ_CLASS(mca_oob_tcp_peer_t),        8,  /* initial number */        mca_oob_tcp_component.tcp_peer_limit, /* maximum number */        8);  /* increment to grow by */    opal_free_list_init(&mca_oob_tcp_component.tcp_msgs,        sizeof(mca_oob_tcp_msg_t),        OBJ_CLASS(mca_oob_tcp_msg_t),        8,  /* initial number */       -1,  /* maximum number */        8);  /* increment to grow by */    /* intialize event library */    memset(&mca_oob_tcp_component.tcp_recv_event, 0, sizeof(opal_event_t));    memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(opal_event_t));    return &mca_oob_tcp;}/* * Callback from registry on change to subscribed segments. */void mca_oob_tcp_registry_callback(    orte_gpr_notify_data_t* data,    void* cbdata){    orte_std_cntr_t i, j, k;    int rc;    orte_gpr_value_t **values, *value;    orte_gpr_keyval_t *keyval;    orte_byte_object_t *bo;    orte_buffer_t buffer;    mca_oob_tcp_addr_t* addr, *existing;    mca_oob_tcp_peer_t* peer;    if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {        opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback\n",            ORTE_NAME_ARGS(orte_process_info.my_name));    }    /* process the callback */    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    values = (orte_gpr_value_t**)(data->values)->addr;    for(i = 0, k=0; k < data->cnt &&                    i < (data->values)->size; i++) {        if (NULL != values[i]) {            k++;            value = values[i];            for(j = 0; j < value->cnt; j++) {                /* check to make sure this is the requested key */                keyval = value->keyvals[j];                if(strcmp(keyval->key,"oob-tcp") != 0)                    continue;                /* transfer ownership of registry object to buffer and unpack */                OBJ_CONSTRUCT(&buffer, orte_buffer_t);                if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bo, keyval->value, ORTE_BYTE_OBJECT))) {                    ORTE_ERROR_LOG(rc);                    continue;                }                if(orte_dss.load(&buffer, bo->bytes, bo->size) != ORTE_SUCCESS) {                    /* TSW - throw ERROR */                    continue;                }                /* protect the values from the release */                keyval->value->type = ORTE_NULL;                keyval->value->data = NULL;                /* unpack the buffer */                addr = mca_oob_tcp_addr_unpack(&buffer);                OBJ_DESTRUCT(&buffer);                if(NULL == addr) {                    opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: unable to unpack peer address\n",                        ORTE_NAME_ARGS(orte_process_info.my_name));                    continue;                }                if(mca_oob_tcp_component.tcp_debug > OOB_TCP_DEBUG_INFO) {                    opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: received peer [%lu,%lu,%lu]\n",                        ORTE_NAME_ARGS(orte_process_info.my_name),                        ORTE_NAME_ARGS(&(addr->addr_name)));                }                /* check for existing cache entry */                existing = (mca_oob_tcp_addr_t *)orte_hash_table_get_proc(                    &mca_oob_tcp_component.tcp_peer_names, &addr->addr_name);                if(NULL != existing) {                    /* TSW - need to update existing entry */                    OBJ_RELEASE(addr);                    continue;                }                /* insert into cache and notify peer */                orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);                peer = (mca_oob_tcp_peer_t *)orte_hash_table_get_proc(                    &mca_oob_tcp_component.tcp_peers, &addr->addr_name);                if(NULL != peer)                    mca_oob_tcp_peer_resolved(peer, addr);            }        }    }    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);}/* * Attempt to resolve peer name. */int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer){    mca_oob_tcp_addr_t* addr;    mca_oob_tcp_subscription_t* subscription;    char *segment, *sub_name, *trig_name;    char *key="oob-tcp";    orte_gpr_subscription_id_t sub_id;    opal_list_item_t* item;    int rc;    /* if the address is already cached - simply return it */    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    addr = (mca_oob_tcp_addr_t *)orte_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names,         &peer->peer_name);    if(NULL != addr) {         OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);         mca_oob_tcp_peer_resolved(peer, addr);         return ORTE_SUCCESS;    }    /* check to see if we have subscribed to this registry segment */    for( item =  opal_list_get_first(&mca_oob_tcp_component.tcp_subscriptions);         item != opal_list_get_end(&mca_oob_tcp_component.tcp_subscriptions);         item =  opal_list_get_next(item)) {        subscription = (mca_oob_tcp_subscription_t*)item;        if(subscription->jobid == peer->peer_name.jobid) {            OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);            return ORTE_SUCCESS;        }    }    if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,                                ORTE_OOB_SUBSCRIPTION, peer->peer_name.jobid))) {        ORTE_ERROR_LOG(rc);        return rc;    }    /* attach to the stage-1 standard trigger */    if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,                                    ORTE_STG1_TRIGGER, peer->peer_name.jobid))) {        ORTE_ERROR_LOG(rc);        free(sub_name);        return rc;    }    /* define the segment */    if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment,                                peer->peer_name.jobid))) {        ORTE_ERROR_LOG(rc);        free(sub_name);        free(trig_name);        return rc;    }    /* If we do not release the mutex before the subscrition we will deadlock     * as the suscription involve oob_tcp_send who involve again a lookup     * call. Before unlocking the mutex (just to be protected by it) we can     * create the subscription and add it to the list.     */    subscription = OBJ_NEW(mca_oob_tcp_subscription_t);    subscription->jobid = peer->peer_name.jobid;    opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);    if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL,                                         ORTE_GPR_NOTIFY_ADD_ENTRY |                                         ORTE_GPR_NOTIFY_VALUE_CHG |                                         ORTE_GPR_NOTIFY_PRE_EXISTING,                                         ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,                                         segment,                                         NULL,  /* look at all containers on this segment */                                         key,                                         mca_oob_tcp_registry_callback, NULL))) {        ORTE_ERROR_LOG(rc);        free(sub_name);        free(trig_name);        free(segment);        /* Subscription registration failed, we should activate the cleaning logic:         * remove the subscription from the list (protected by the mutex).         */        OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);        opal_list_remove_item( &mca_oob_tcp_component.tcp_subscriptions,                               &subscription->item );        OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);        return rc;    }    /* the id of each subscription is recorded     * here so we can (if desired) cancel that subscription later     */    subscription->subid = sub_id;    /* done with these, so release any memory */    free(trig_name);    free(sub_name);    free(segment);    return rc;}/* * Setup contact information in the registry. */int mca_oob_tcp_init(void){    orte_jobid_t jobid;    orte_buffer_t *buffer;    orte_gpr_subscription_id_t sub_id;    char *sub_name, *segment, *trig_name, **tokens;    char *keys[] = {"oob-tcp", ORTE_PROC_RML_IP_ADDRESS_KEY};    orte_data_value_t *values[2];

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -