📄 oob_tcp.c
字号:
return; } hdr->msg_src.cellid = ORTE_PROC_MY_NAME->cellid; } /* lookup the corresponding process */ peer = mca_oob_tcp_peer_lookup(&hdr->msg_src); if(NULL == peer) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: unable to locate peer", ORTE_NAME_ARGS(orte_process_info.my_name)); CLOSE_THE_SOCKET(sd); return; } /* is the peer instance willing to accept this connection */ if(mca_oob_tcp_peer_accept(peer, sd) == false) { if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) { opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_recv_handler: " "rejected connection from [%lu,%lu,%lu] connection state %d", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(peer->peer_name)), ORTE_NAME_ARGS(&(hdr->msg_src)), peer->peer_state); } CLOSE_THE_SOCKET(sd); return; }}/* * Event callback when there is data available on the registered * socket to recv. */static void mca_oob_tcp_recv_handler(int sd, short flags, void* user){ mca_oob_tcp_hdr_t hdr; mca_oob_tcp_event_t* event = (mca_oob_tcp_event_t *)user; int rc; /* accept new connections on the listen socket */ if(mca_oob_tcp_component.tcp_listen_sd == sd) { mca_oob_tcp_accept(); return; } OBJ_RELEASE(event); /* Some mem checkers don't realize that hdr will guarantee to be fully filled in during the read(), below :-( */ OMPI_DEBUG_ZERO(hdr); /* recv the process identifier */ while((rc = recv(sd, (char *)&hdr, sizeof(hdr), 0)) != sizeof(hdr)) { if(rc >= 0) { if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: peer closed connection", ORTE_NAME_ARGS(orte_process_info.my_name)); } CLOSE_THE_SOCKET(sd); return; } if(opal_socket_errno != EINTR) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: recv() failed: %s (%d)\n", ORTE_NAME_ARGS(orte_process_info.my_name), strerror(opal_socket_errno), opal_socket_errno); CLOSE_THE_SOCKET(sd); return; } } MCA_OOB_TCP_HDR_NTOH(&hdr); /* dispatch based on message type */ switch(hdr.msg_type) { case MCA_OOB_TCP_PROBE: mca_oob_tcp_recv_probe(sd, &hdr); break; case MCA_OOB_TCP_CONNECT: mca_oob_tcp_recv_connect(sd, &hdr); break; default: opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: invalid message type: %d\n", ORTE_NAME_ARGS(orte_process_info.my_name), hdr.msg_type); CLOSE_THE_SOCKET(sd); break; }}/* * Component initialization - create a module. * (1) initialize static resources * (2) create listen socket */mca_oob_t* mca_oob_tcp_component_init(int* priority){ int i; bool found_local = false; bool found_nonlocal = false; *priority = 1; /* are there any interfaces? */ if(opal_ifcount() <= 0) return NULL; for (i = opal_ifbegin() ; i > 0 ; i = opal_ifnext(i)) { char name[32]; mca_oob_tcp_device_t *dev; opal_ifindextoname(i, name, sizeof(name)); if (mca_oob_tcp_component.tcp_include != NULL && strstr(mca_oob_tcp_component.tcp_include,name) == NULL) { continue; } if (mca_oob_tcp_component.tcp_exclude != NULL && strstr(mca_oob_tcp_component.tcp_exclude,name) != NULL) { continue; } dev = OBJ_NEW(mca_oob_tcp_device_t); dev->if_index = i; opal_ifindextoaddr(i, (struct sockaddr*) &dev->if_addr, sizeof(struct sockaddr_in)); if(opal_ifislocalhost((struct sockaddr*) &dev->if_addr)) { dev->if_local = true; found_local = true; } else { dev->if_local = false; found_nonlocal = true; } opal_list_append(&mca_oob_tcp_component.tcp_available_devices, &dev->super); } if (found_local && found_nonlocal) { opal_list_item_t *item; for (item = opal_list_get_first(&mca_oob_tcp_component.tcp_available_devices) ; item != opal_list_get_end(&mca_oob_tcp_component.tcp_available_devices) ; item = opal_list_get_next(item)) { mca_oob_tcp_device_t *dev = (mca_oob_tcp_device_t*) item; if (dev->if_local) { item = opal_list_remove_item(&mca_oob_tcp_component.tcp_available_devices, item); } } } if (opal_list_get_size(&mca_oob_tcp_component.tcp_available_devices) == 0) { if (NULL != mca_oob_tcp_component.tcp_include) { opal_output(0, "None of the specified TCP interfaces found (%s)", mca_oob_tcp_component.tcp_include); } return NULL; } /* initialize data structures */ opal_hash_table_init(&mca_oob_tcp_component.tcp_peers, 128); opal_hash_table_init(&mca_oob_tcp_component.tcp_peer_names, 128); opal_free_list_init(&mca_oob_tcp_component.tcp_peer_free, sizeof(mca_oob_tcp_peer_t), OBJ_CLASS(mca_oob_tcp_peer_t), 8, /* initial number */ mca_oob_tcp_component.tcp_peer_limit, /* maximum number */ 8); /* increment to grow by */ opal_free_list_init(&mca_oob_tcp_component.tcp_msgs, sizeof(mca_oob_tcp_msg_t), OBJ_CLASS(mca_oob_tcp_msg_t), 8, /* initial number */ -1, /* maximum number */ 8); /* increment to grow by */ /* intialize event library */ memset(&mca_oob_tcp_component.tcp_recv_event, 0, sizeof(opal_event_t)); memset(&mca_oob_tcp_component.tcp_send_event, 0, sizeof(opal_event_t)); return &mca_oob_tcp;}/* * Callback from registry on change to subscribed segments. */void mca_oob_tcp_registry_callback( orte_gpr_notify_data_t* data, void* cbdata){ orte_std_cntr_t i, j, k; int rc; orte_gpr_value_t **values, *value; orte_gpr_keyval_t *keyval; orte_byte_object_t *bo; orte_buffer_t buffer; mca_oob_tcp_addr_t* addr, *existing; mca_oob_tcp_peer_t* peer; if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback\n", ORTE_NAME_ARGS(orte_process_info.my_name)); } /* process the callback */ OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); values = (orte_gpr_value_t**)(data->values)->addr; for(i = 0, k=0; k < data->cnt && i < (data->values)->size; i++) { if (NULL != values[i]) { k++; value = values[i]; for(j = 0; j < value->cnt; j++) { /* check to make sure this is the requested key */ keyval = value->keyvals[j]; if(strcmp(keyval->key,"oob-tcp") != 0) continue; /* transfer ownership of registry object to buffer and unpack */ OBJ_CONSTRUCT(&buffer, orte_buffer_t); if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bo, keyval->value, ORTE_BYTE_OBJECT))) { ORTE_ERROR_LOG(rc); continue; } if(orte_dss.load(&buffer, bo->bytes, bo->size) != ORTE_SUCCESS) { /* TSW - throw ERROR */ continue; } /* protect the values from the release */ keyval->value->type = ORTE_NULL; keyval->value->data = NULL; /* unpack the buffer */ addr = mca_oob_tcp_addr_unpack(&buffer); OBJ_DESTRUCT(&buffer); if(NULL == addr) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: unable to unpack peer address\n", ORTE_NAME_ARGS(orte_process_info.my_name)); continue; } if(mca_oob_tcp_component.tcp_debug > OOB_TCP_DEBUG_INFO) { opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: received peer [%lu,%lu,%lu]\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(&(addr->addr_name))); } /* check for existing cache entry */ existing = (mca_oob_tcp_addr_t *)orte_hash_table_get_proc( &mca_oob_tcp_component.tcp_peer_names, &addr->addr_name); if(NULL != existing) { /* TSW - need to update existing entry */ OBJ_RELEASE(addr); continue; } /* insert into cache and notify peer */ orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr); peer = (mca_oob_tcp_peer_t *)orte_hash_table_get_proc( &mca_oob_tcp_component.tcp_peers, &addr->addr_name); if(NULL != peer) mca_oob_tcp_peer_resolved(peer, addr); } } } OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);}/* * Attempt to resolve peer name. */int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer){ mca_oob_tcp_addr_t* addr; mca_oob_tcp_subscription_t* subscription; char *segment, *sub_name, *trig_name; char *key="oob-tcp"; orte_gpr_subscription_id_t sub_id; opal_list_item_t* item; int rc; /* if the address is already cached - simply return it */ OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); addr = (mca_oob_tcp_addr_t *)orte_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names, &peer->peer_name); if(NULL != addr) { OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); mca_oob_tcp_peer_resolved(peer, addr); return ORTE_SUCCESS; } /* check to see if we have subscribed to this registry segment */ for( item = opal_list_get_first(&mca_oob_tcp_component.tcp_subscriptions); item != opal_list_get_end(&mca_oob_tcp_component.tcp_subscriptions); item = opal_list_get_next(item)) { subscription = (mca_oob_tcp_subscription_t*)item; if(subscription->jobid == peer->peer_name.jobid) { OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return ORTE_SUCCESS; } } if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name, ORTE_OOB_SUBSCRIPTION, peer->peer_name.jobid))) { ORTE_ERROR_LOG(rc); return rc; } /* attach to the stage-1 standard trigger */ if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name, ORTE_STG1_TRIGGER, peer->peer_name.jobid))) { ORTE_ERROR_LOG(rc); free(sub_name); return rc; } /* define the segment */ if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, peer->peer_name.jobid))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); return rc; } /* If we do not release the mutex before the subscrition we will deadlock * as the suscription involve oob_tcp_send who involve again a lookup * call. Before unlocking the mutex (just to be protected by it) we can * create the subscription and add it to the list. */ subscription = OBJ_NEW(mca_oob_tcp_subscription_t); subscription->jobid = peer->peer_name.jobid; opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL, ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_PRE_EXISTING, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* look at all containers on this segment */ key, mca_oob_tcp_registry_callback, NULL))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); free(segment); /* Subscription registration failed, we should activate the cleaning logic: * remove the subscription from the list (protected by the mutex). */ OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); opal_list_remove_item( &mca_oob_tcp_component.tcp_subscriptions, &subscription->item ); OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); return rc; } /* the id of each subscription is recorded * here so we can (if desired) cancel that subscription later */ subscription->subid = sub_id; /* done with these, so release any memory */ free(trig_name); free(sub_name); free(segment); return rc;}/* * Setup contact information in the registry. */int mca_oob_tcp_init(void){ orte_jobid_t jobid; orte_buffer_t *buffer; orte_gpr_subscription_id_t sub_id; char *sub_name, *segment, *trig_name, **tokens; char *keys[] = {"oob-tcp", ORTE_PROC_RML_IP_ADDRESS_KEY}; orte_data_value_t *values[2];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -