📄 oob_tcp.c
字号:
orte_byte_object_t bo; mca_oob_tcp_subscription_t *subscription; int rc; opal_list_item_t* item; char *tmp, *tmp2, *tmp3; orte_std_cntr_t i, num_tokens; int randval = orte_process_info.num_procs; if (0 == randval) randval = 10; /* random delay to stagger connections back to seed */#if defined(__WINDOWS__) if(1 == mca_oob_tcp_component.connect_sleep) { Sleep((orte_process_info.my_name->vpid % randval % 1000) * 100); }#else if(1 == mca_oob_tcp_component.connect_sleep) { usleep((orte_process_info.my_name->vpid % randval % 1000) * 1000); }#endif /* get my jobid */ jobid = ORTE_PROC_MY_NAME->jobid; /* create a listen socket */ if ((OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) && orte_process_info.seed) { if (mca_oob_tcp_create_listen_thread() != ORTE_SUCCESS) { opal_output(0, "mca_oob_tcp_init: unable to create listen thread"); return ORTE_ERROR; } opal_free_list_init(&mca_oob_tcp_component.tcp_pending_connections_fl, sizeof(mca_oob_tcp_pending_connection_t), OBJ_CLASS(mca_oob_tcp_pending_connection_t), 16, /* initial number */ -1, /* maximum number */ 16); /* increment to grow by */ opal_progress_register(mca_oob_tcp_listen_progress); if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) { opal_output(0, "[%lu,%lu,%lu] accepting connections via listen thread", ORTE_NAME_ARGS(orte_process_info.my_name)); } } else { /* fix up the listen_type, since we might have been in thread, but can't do that since we weren't the HNP. */ mca_oob_tcp_component.tcp_listen_type = OOB_TCP_EVENT; if(mca_oob_tcp_create_listen() != ORTE_SUCCESS) { opal_output(0, "mca_oob_tcp_init: unable to create listen socket"); return ORTE_ERROR; } if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) { opal_output(0, "[%lu,%lu,%lu] accepting connections via event library", ORTE_NAME_ARGS(orte_process_info.my_name)); } } /* iterate through the open connections and send an ident message to all peers - * note that we initially come up w/out knowing our process name - and are assigned * a temporary name by our peer. once we have determined our real name - we send it * to the peer. */ OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); for(item = opal_list_get_first(&mca_oob_tcp_component.tcp_peer_list); item != opal_list_get_end(&mca_oob_tcp_component.tcp_peer_list); item = opal_list_get_next(item)) { mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item; mca_oob_tcp_peer_send_ident(peer); } /* register subscribe callback to receive notification when all processes have registered */ subscription = OBJ_NEW(mca_oob_tcp_subscription_t); subscription->jobid = jobid; opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_ALL) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: calling orte_gpr.subscribe\n", ORTE_NAME_ARGS(orte_process_info.my_name)); } if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name, ORTE_OOB_SUBSCRIPTION, jobid))) { ORTE_ERROR_LOG(rc); return rc; } /* attach to the stage-1 standard trigger */ if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name, ORTE_STG1_TRIGGER, jobid))) { ORTE_ERROR_LOG(rc); free(sub_name); return rc; } /* define the segment */ if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); return rc; } if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name, ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* look at all containers on this segment */ keys[0], mca_oob_tcp_registry_callback, NULL))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); free(segment); return rc; } /* the id of each subscription is recorded * here so we can (if desired) cancel that subscription later */ subscription->subid = sub_id; /* done with these, so release any memory */ free(trig_name); free(sub_name); /* now setup to put our contact info on registry */ buffer = OBJ_NEW(orte_buffer_t); if(buffer == NULL) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } if (ORTE_SUCCESS != (rc = mca_oob_tcp_addr_pack(buffer))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buffer); return rc; } /* extract payload for storage */ if (ORTE_SUCCESS != (rc = orte_dss.unload(buffer, (void**)&(bo.bytes), &(bo.size)))) { ORTE_ERROR_LOG(rc); free(segment); OBJ_RELEASE(buffer); return rc; } OBJ_RELEASE(buffer); values[0] = OBJ_NEW(orte_data_value_t); if (NULL == values[0]) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } values[0]->type = ORTE_BYTE_OBJECT; if (ORTE_SUCCESS != (rc = orte_dss.copy(&(values[0]->data), &bo, ORTE_BYTE_OBJECT))) { ORTE_ERROR_LOG(rc); free(segment); return rc; } /* setup the IP address for storage */ tmp = mca_oob.oob_get_addr(); tmp2 = strrchr(tmp, '/') + 1; tmp3 = strrchr(tmp, ':'); if(NULL == tmp2 || NULL == tmp3) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: invalid address \'%s\' " "returned for selected oob interfaces.\n", ORTE_NAME_ARGS(orte_process_info.my_name), tmp); ORTE_ERROR_LOG(ORTE_ERROR); free(segment); free(tmp); free(bo.bytes); return ORTE_ERROR; } *tmp3 = '\0'; values[1] = OBJ_NEW(orte_data_value_t); if (NULL == values[1]) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } values[1]->type = ORTE_STRING; values[1]->data = strdup(tmp2); free(tmp); /* get the process tokens */ if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, &num_tokens, orte_process_info.my_name))) { ORTE_ERROR_LOG(rc); free(segment); OBJ_RELEASE(values[0]); OBJ_RELEASE(values[1]); return rc; } /* put our contact info in registry */ if (ORTE_SUCCESS != (rc = orte_gpr.put_N(ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_XAND, segment, tokens, 2, keys, values))) { ORTE_ERROR_LOG(rc); } free(segment); for(i=0; i < num_tokens; i++) { free(tokens[i]); tokens[i] = NULL; } if (NULL != tokens) free(tokens); OBJ_RELEASE(values[0]); OBJ_RELEASE(values[1]); return rc;}/* * Module cleanup. */int mca_oob_tcp_fini(void){ opal_list_item_t *item; OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); opal_event_disable(); /* disable event processing */ /* close listen socket */ if (mca_oob_tcp_component.tcp_listen_sd >= 0) { if (OOB_TCP_EVENT == mca_oob_tcp_component.tcp_listen_type) { opal_event_del(&mca_oob_tcp_component.tcp_recv_event); close(mca_oob_tcp_component.tcp_listen_sd); } else if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) { void *data; mca_oob_tcp_component.tcp_shutdown = true; close(mca_oob_tcp_component.tcp_listen_sd); opal_thread_join(&mca_oob_tcp_component.tcp_listen_thread, &data); opal_progress_unregister(mca_oob_tcp_listen_progress); } mca_oob_tcp_component.tcp_listen_sd = -1; } /* cleanup all peers */ for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list); item != NULL; item = opal_list_remove_first(&mca_oob_tcp_component.tcp_peer_list)) { mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item; MCA_OOB_TCP_PEER_RETURN(peer); } /* delete any pending events */ for(item = opal_list_remove_first(&mca_oob_tcp_component.tcp_events); item != NULL; item = opal_list_remove_first(&mca_oob_tcp_component.tcp_events)) { mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t*)item; opal_event_del(&event->event); OBJ_RELEASE(event); } opal_event_enable(); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return ORTE_SUCCESS;}/** Compare two process names for equality.** @param n1 Process name 1.* @param n2 Process name 2.* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)** Note that the definition of < or > is somewhat arbitrary -* just needs to be consistently applied to maintain an ordering* when process names are used as indices.** Currently, this function is ONLY used in one place - in oob_tcp_send.c to* determine if the recipient of the message-to-be-sent is ourselves. Hence,* this comparison is okay to be LITERAL and can/should use the ns.compare_fields* function*/int mca_oob_tcp_process_name_compare(const orte_process_name_t* n1, const orte_process_name_t* n2){ return orte_ns.compare_fields(ORTE_NS_CMP_ALL, n1, n2);}/** Return local process address as a URI string.*/char* mca_oob_tcp_get_addr(void){ char *contact_info = malloc(opal_list_get_size(&mca_oob_tcp_component.tcp_available_devices) * 32); char *ptr = contact_info; opal_list_item_t *item; *ptr = 0; for (item = opal_list_get_first(&mca_oob_tcp_component.tcp_available_devices) ; item != opal_list_get_end(&mca_oob_tcp_component.tcp_available_devices) ; item = opal_list_get_next(item)) { mca_oob_tcp_device_t *dev = (mca_oob_tcp_device_t*) item; if (ptr != contact_info) { ptr += sprintf(ptr, ";"); } ptr += sprintf(ptr, "tcp://%s:%d", inet_ntoa(dev->if_addr.sin_addr), ntohs(mca_oob_tcp_component.tcp_listen_port)); } return contact_info;}/** Parse a URI string into an IP address and port number.*/int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr){ char* tmp = strdup(uri); char* ptr = tmp + 6; char* addr = ptr; char* port; if(strncmp(tmp, "tcp://", 6) != 0) { free(tmp); return ORTE_ERR_BAD_PARAM; } ptr = strchr(addr, ':'); if(NULL == ptr) { free(tmp); return ORTE_ERR_BAD_PARAM; } *ptr = '\0'; ptr++; port = ptr; memset(inaddr, 0, sizeof(inaddr)); inaddr->sin_family = AF_INET; inaddr->sin_addr.s_addr = inet_addr(addr); if(inaddr->sin_addr.s_addr == INADDR_ANY) { free(tmp); return ORTE_ERR_BAD_PARAM; } inaddr->sin_port = htons(atoi(port)); free(tmp); return ORTE_SUCCESS;}/* * Setup address in the cache. Note that this could be called multiple * times if a given destination exports multiple addresses. */int mca_oob_tcp_set_addr(const orte_process_name_t* name, const char* uri){ struct sockaddr_in inaddr; mca_oob_tcp_addr_t* addr; mca_oob_tcp_peer_t* peer; int rc; if((rc = mca_oob_tcp_parse_uri(uri,&inaddr)) != ORTE_SUCCESS) return rc; OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); addr = (mca_oob_tcp_addr_t*)orte_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names, name); if(NULL == addr) { addr = OBJ_NEW(mca_oob_tcp_addr_t); addr->addr_name = *name; orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr); } rc = mca_oob_tcp_addr_insert(addr, &inaddr); peer = (mca_oob_tcp_peer_t *)orte_hash_table_get_proc( &mca_oob_tcp_component.tcp_peers, &addr->addr_name); if(NULL != peer) { mca_oob_tcp_peer_resolved(peer, addr); } OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return rc;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -