⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oob_tcp.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 4 页
字号:
    orte_byte_object_t bo;    mca_oob_tcp_subscription_t *subscription;    int rc;    opal_list_item_t* item;    char *tmp, *tmp2, *tmp3;    orte_std_cntr_t i, num_tokens;    int randval = orte_process_info.num_procs;    if (0 == randval) randval = 10;     /* random delay to stagger connections back to seed */#if defined(__WINDOWS__)    if(1 == mca_oob_tcp_component.connect_sleep) {        Sleep((orte_process_info.my_name->vpid % randval % 1000) * 100);    }#else    if(1 == mca_oob_tcp_component.connect_sleep) {        usleep((orte_process_info.my_name->vpid % randval % 1000) * 1000);    }#endif    /* get my jobid */    jobid = ORTE_PROC_MY_NAME->jobid;        /* create a listen socket */    if ((OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) &&         orte_process_info.seed) {          if (mca_oob_tcp_create_listen_thread() != ORTE_SUCCESS) {            opal_output(0, "mca_oob_tcp_init: unable to create listen thread");            return ORTE_ERROR;        }        opal_free_list_init(&mca_oob_tcp_component.tcp_pending_connections_fl,                            sizeof(mca_oob_tcp_pending_connection_t),                            OBJ_CLASS(mca_oob_tcp_pending_connection_t),                            16,  /* initial number */                            -1,  /* maximum number */                            16);  /* increment to grow by */        opal_progress_register(mca_oob_tcp_listen_progress);        if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {            opal_output(0, "[%lu,%lu,%lu] accepting connections via listen thread",                        ORTE_NAME_ARGS(orte_process_info.my_name));        }    } else {        /* fix up the listen_type, since we might have been in thread,           but can't do that since we weren't the HNP. */        mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT;        if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) {            opal_output(0, "mca_oob_tcp_init: unable to create listen socket");            return ORTE_ERROR;        }        if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {            opal_output(0, "[%lu,%lu,%lu] accepting connections via event library",                        ORTE_NAME_ARGS(orte_process_info.my_name));        }    }    /* iterate through the open connections and send an ident message to all peers -     * note that we initially come up w/out knowing our process name - and are assigned     * a temporary name by our peer. once we have determined our real name - we send it     * to the peer.    */    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    for(item =  opal_list_get_first(&mca_oob_tcp_component.tcp_peer_list);        item != opal_list_get_end(&mca_oob_tcp_component.tcp_peer_list);        item =  opal_list_get_next(item)) {        mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;        mca_oob_tcp_peer_send_ident(peer);    }    /* register subscribe callback to receive notification when all processes have registered */    subscription = OBJ_NEW(mca_oob_tcp_subscription_t);    subscription->jobid = jobid;    opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);    if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_ALL) {        opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: calling orte_gpr.subscribe\n",            ORTE_NAME_ARGS(orte_process_info.my_name));    }    if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,                                ORTE_OOB_SUBSCRIPTION, jobid))) {        ORTE_ERROR_LOG(rc);        return rc;    }    /* attach to the stage-1 standard trigger */    if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,                                    ORTE_STG1_TRIGGER, jobid))) {        ORTE_ERROR_LOG(rc);        free(sub_name);        return rc;    }    /* define the segment */    if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {        ORTE_ERROR_LOG(rc);        free(sub_name);        free(trig_name);        return rc;    }    if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name,                                         ORTE_GPR_NOTIFY_ADD_ENTRY |                                         ORTE_GPR_NOTIFY_VALUE_CHG |                                         ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,                                         ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,                                         segment,                                         NULL,  /* look at all containers on this segment */                                         keys[0],                                         mca_oob_tcp_registry_callback, NULL))) {        ORTE_ERROR_LOG(rc);        free(sub_name);        free(trig_name);        free(segment);        return rc;    }    /* the id of each subscription is recorded     * here so we can (if desired) cancel that subscription later     */    subscription->subid = sub_id;    /* done with these, so release any memory */    free(trig_name);    free(sub_name);    /* now setup to put our contact info on registry */    buffer = OBJ_NEW(orte_buffer_t);    if(buffer == NULL) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    if (ORTE_SUCCESS != (rc = mca_oob_tcp_addr_pack(buffer))) {        ORTE_ERROR_LOG(rc);        OBJ_RELEASE(buffer);        return rc;    }    /* extract payload for storage */    if (ORTE_SUCCESS != (rc = orte_dss.unload(buffer, (void**)&(bo.bytes), &(bo.size)))) {        ORTE_ERROR_LOG(rc);        free(segment);        OBJ_RELEASE(buffer);        return rc;    }    OBJ_RELEASE(buffer);    values[0] = OBJ_NEW(orte_data_value_t);    if (NULL == values[0]) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    values[0]->type = ORTE_BYTE_OBJECT;    if (ORTE_SUCCESS != (rc = orte_dss.copy(&(values[0]->data), &bo, ORTE_BYTE_OBJECT))) {        ORTE_ERROR_LOG(rc);        free(segment);        return rc;    }    /* setup the IP address for storage */    tmp = mca_oob.oob_get_addr();    tmp2 = strrchr(tmp, '/') + 1;    tmp3 = strrchr(tmp, ':');    if(NULL == tmp2 || NULL == tmp3) {        opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: invalid address \'%s\' "                    "returned for selected oob interfaces.\n",                    ORTE_NAME_ARGS(orte_process_info.my_name), tmp);        ORTE_ERROR_LOG(ORTE_ERROR);        free(segment);        free(tmp);        free(bo.bytes);        return ORTE_ERROR;    }    *tmp3 = '\0';    values[1] = OBJ_NEW(orte_data_value_t);    if (NULL == values[1]) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    values[1]->type = ORTE_STRING;    values[1]->data = strdup(tmp2);    free(tmp);    /* get the process tokens */    if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, &num_tokens,                                    orte_process_info.my_name))) {        ORTE_ERROR_LOG(rc);        free(segment);        OBJ_RELEASE(values[0]);        OBJ_RELEASE(values[1]);        return rc;    }    /* put our contact info in registry */    if (ORTE_SUCCESS != (rc = orte_gpr.put_N(ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_XAND,                                        segment, tokens, 2, keys, values))) {        ORTE_ERROR_LOG(rc);    }    free(segment);    for(i=0; i < num_tokens; i++) {        free(tokens[i]);        tokens[i] = NULL;    }    if (NULL != tokens) free(tokens);    OBJ_RELEASE(values[0]);    OBJ_RELEASE(values[1]);    return rc;}/* * Module cleanup. */int mca_oob_tcp_fini(void){    opal_list_item_t *item;    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    opal_event_disable(); /* disable event processing */    /* close listen socket */    if (mca_oob_tcp_component.tcp_listen_sd >= 0) {        if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) {            opal_event_del(&mca_oob_tcp_component.tcp_recv_event);            close(mca_oob_tcp_component.tcp_listen_sd);        } else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {            void *data;            mca_oob_tcp_component.tcp_shutdown = true;            close(mca_oob_tcp_component.tcp_listen_sd);            opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data);            opal_progress_unregister(mca_oob_tcp_listen_progress);        }        mca_oob_tcp_component.tcp_listen_sd = -1;    }    /* cleanup all peers */    for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list);        item != NULL;        item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list)) {        mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;        MCA_OOB_TCP_PEER_RETURN(peer);    }    /* delete any pending events */    for(item =  opal_list_remove_first(&mca_oob_tcp_component.tcp_events);        item != NULL;        item =  opal_list_remove_first(&mca_oob_tcp_component.tcp_events)) {        mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t*)item;        opal_event_del(&event->event);        OBJ_RELEASE(event);    }    opal_event_enable();    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);    return ORTE_SUCCESS;}/** Compare two process names for equality.** @param  n1  Process name 1.* @param  n2  Process name 2.* @return     (-1 for n1<n2 0 for equality, 1 for n1>n2)** Note that the definition of < or > is somewhat arbitrary -* just needs to be consistently applied to maintain an ordering* when process names are used as indices.** Currently, this function is ONLY used in one place - in oob_tcp_send.c to* determine if the recipient of the message-to-be-sent is ourselves. Hence,* this comparison is okay to be LITERAL and can/should use the ns.compare_fields* function*/int mca_oob_tcp_process_name_compare(const orte_process_name_t* n1, const orte_process_name_t* n2){    return orte_ns.compare_fields(ORTE_NS_CMP_ALL, n1, n2);}/** Return local process address as a URI string.*/char* mca_oob_tcp_get_addr(void){    char *contact_info = malloc(opal_list_get_size(&mca_oob_tcp_component.tcp_available_devices) * 32);    char *ptr = contact_info;    opal_list_item_t *item;    *ptr = 0;    for (item = opal_list_get_first(&mca_oob_tcp_component.tcp_available_devices) ;         item != opal_list_get_end(&mca_oob_tcp_component.tcp_available_devices) ;         item = opal_list_get_next(item)) {        mca_oob_tcp_device_t *dev = (mca_oob_tcp_device_t*) item;        if (ptr != contact_info) {            ptr += sprintf(ptr, ";");        }        ptr += sprintf(ptr, "tcp://%s:%d", inet_ntoa(dev->if_addr.sin_addr),                    ntohs(mca_oob_tcp_component.tcp_listen_port));    }    return contact_info;}/** Parse a URI string into an IP address and port number.*/int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr){    char* tmp = strdup(uri);    char* ptr = tmp + 6;    char* addr = ptr;    char* port;    if(strncmp(tmp, "tcp://", 6) != 0) {        free(tmp);        return ORTE_ERR_BAD_PARAM;    }    ptr = strchr(addr, ':');    if(NULL == ptr) {        free(tmp);        return ORTE_ERR_BAD_PARAM;    }    *ptr = '\0';    ptr++;    port = ptr;    memset(inaddr, 0, sizeof(inaddr));    inaddr->sin_family = AF_INET;    inaddr->sin_addr.s_addr = inet_addr(addr);    if(inaddr->sin_addr.s_addr == INADDR_ANY) {        free(tmp);        return ORTE_ERR_BAD_PARAM;    }    inaddr->sin_port = htons(atoi(port));    free(tmp);    return ORTE_SUCCESS;}/* * Setup address in the cache. Note that this could be called multiple * times if a given destination exports multiple addresses. */int mca_oob_tcp_set_addr(const orte_process_name_t* name, const char* uri){    struct sockaddr_in inaddr;    mca_oob_tcp_addr_t* addr;    mca_oob_tcp_peer_t* peer;    int rc;    if((rc = mca_oob_tcp_parse_uri(uri,&inaddr)) != ORTE_SUCCESS)        return rc;    OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);    addr = (mca_oob_tcp_addr_t*)orte_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names, name);    if(NULL == addr) {        addr = OBJ_NEW(mca_oob_tcp_addr_t);        addr->addr_name = *name;        orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);    }    rc = mca_oob_tcp_addr_insert(addr, &inaddr);    peer = (mca_oob_tcp_peer_t *)orte_hash_table_get_proc(        &mca_oob_tcp_component.tcp_peers, &addr->addr_name);    if(NULL != peer) {        mca_oob_tcp_peer_resolved(peer, addr);    }    OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);    return rc;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -