pml_base_module_exchange.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 762 行 · 第 1/2 页
C
762 行
bytes = NULL; } /* * Lookup the corresponding modex structure */ if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) { opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n"); OBJ_RELEASE(data); OPAL_THREAD_UNLOCK(&modex->modex_lock); OBJ_RELEASE(modex); return; } modex_module->module_data = bytes; modex_module->module_data_size = num_bytes; modex_module->module_data_avail = true; opal_condition_signal(&modex_module->module_data_cond); if (opal_list_get_size(&modex_module->module_cbs)) { if (NULL == proc) { proc = ompi_proc_find(proc_name); } if (NULL != proc) { OPAL_THREAD_LOCK(&proc->proc_lock); /* call any registered callbacks */ for (item = opal_list_get_first(&modex_module->module_cbs); item != opal_list_get_end(&modex_module->module_cbs); item = opal_list_get_next(item)) { mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item; cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata); } OPAL_THREAD_UNLOCK(&proc->proc_lock); } } } OPAL_THREAD_UNLOCK(&modex->modex_lock); } /* if value[i]->cnt > 0 */ } /* if value[i] != NULL */ }}/** * Make sure we have subscribed to this segment. */static intmca_pml_base_modex_subscribe(orte_process_name_t * name){ char *segment, *sub_name, *trig_name; orte_gpr_subscription_id_t sub_id; orte_jobid_t jobid; opal_list_item_t *item; mca_pml_base_modex_subscription_t *subscription; int rc; char *keys[] = { ORTE_PROC_NAME_KEY, OMPI_MODEX_KEY, NULL }; /* check for an existing subscription */ OPAL_LOCK(&mca_pml_base_modex_lock); if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) { for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions); item != opal_list_get_end(&mca_pml_base_modex_subscriptions); item = opal_list_get_next(item)) { subscription = (mca_pml_base_modex_subscription_t *) item; if (subscription->jobid == name->jobid) { OPAL_UNLOCK(&mca_pml_base_modex_lock); return OMPI_SUCCESS; } } } OPAL_UNLOCK(&mca_pml_base_modex_lock); /* otherwise - subscribe to get this jobid's contact info */ jobid = name->jobid; if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name, OMPI_MODEX_SUBSCRIPTION, jobid))) { ORTE_ERROR_LOG(rc); return rc; } /* attach to the stage-1 standard trigger */ if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name, ORTE_STG1_TRIGGER, jobid))) { ORTE_ERROR_LOG(rc); free(sub_name); return rc; } /* define the segment */ if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); return rc; } if (jobid != orte_process_info.my_name->jobid) { if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, NULL, NULL, ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_PRE_EXISTING, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* look at all * containers on this * segment */ 2, keys, mca_pml_base_modex_registry_callback, NULL))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); free(segment); return rc; } } else { if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, trig_name, sub_name, ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED, segment, NULL, /* look at all * containers on this * segment */ 2, keys, mca_pml_base_modex_registry_callback, NULL))) { ORTE_ERROR_LOG(rc); free(sub_name); free(trig_name); free(segment); return rc; } } free(sub_name); free(trig_name); free(segment); /* add this jobid to our list of subscriptions */ OPAL_LOCK(&mca_pml_base_modex_lock); subscription = OBJ_NEW(mca_pml_base_modex_subscription_t); subscription->jobid = name->jobid; opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item); OPAL_UNLOCK(&mca_pml_base_modex_lock); return OMPI_SUCCESS;}/** * Store the data associated with the specified module in the * gpr. Note that the gpr is in a mode where it caches * individual puts during startup and sends them as an aggregate * command. */intmca_pml_base_modex_send(mca_base_component_t * source_component, const void *data, size_t size){ orte_jobid_t jobid; int rc; orte_buffer_t buffer; orte_std_cntr_t i, num_tokens; char *ptr, *segment, **tokens; orte_byte_object_t bo; orte_data_value_t value = ORTE_DATA_VALUE_EMPTY; jobid = ORTE_PROC_MY_NAME->jobid; if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { ORTE_ERROR_LOG(rc); return rc; } if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, &num_tokens, orte_process_info.my_name))) { ORTE_ERROR_LOG(rc); free(segment); return rc; } OBJ_CONSTRUCT(&buffer, orte_buffer_t); ptr = source_component->mca_type_name; if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) { ORTE_ERROR_LOG(rc); goto cleanup; } ptr = source_component->mca_component_name; if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) { ORTE_ERROR_LOG(rc); goto cleanup; } if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; } if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; } if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) { ORTE_ERROR_LOG(rc); goto cleanup; } if (0 != size) { if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) { ORTE_ERROR_LOG(rc); goto cleanup; } } if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) { ORTE_ERROR_LOG(rc); goto cleanup; } OBJ_DESTRUCT(&buffer); /* setup the data_value structure to hold the byte object */ if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) { ORTE_ERROR_LOG(rc); goto cleanup; } rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR, segment, tokens, OMPI_MODEX_KEY, &value);cleanup: free(segment); for (i = 0; i < num_tokens; i++) { free(tokens[i]); tokens[i] = NULL; } if (NULL != tokens) free(tokens); return rc;}/** * Retreive the data for the specified module from the source process. */intmca_pml_base_modex_recv(mca_base_component_t * component, ompi_proc_t * proc, void **buffer, size_t * size){ mca_pml_base_modex_t *modex; mca_pml_base_modex_module_t *modex_module; /* make sure we could possibly have modex data */ if (0 == strcmp(orte_gpr_base_selected_component.gpr_version.mca_component_name, "null")) { return OMPI_ERR_NOT_IMPLEMENTED; } /* check the proc for cached data */ if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { /* see if we already have data for this proc... */ OPAL_THREAD_LOCK(&mca_pml_base_modex_lock); modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name); if (NULL == modex) { /* create an empty modex data... */ modex = OBJ_NEW(mca_pml_base_modex_t); if (NULL == modex) { opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n"); OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); return OMPI_ERR_OUT_OF_RESOURCE; } orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex); OBJ_RETAIN(modex); proc->proc_modex = &modex->super.super; OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); /* verify that we have subscribed to this segment */ mca_pml_base_modex_subscribe(&proc->proc_name); } else { /* create a backpointer from the proc to the modex data */ OBJ_RETAIN(modex); proc->proc_modex = &modex->super.super; OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); } } OPAL_THREAD_LOCK(&modex->modex_lock); /* lookup/create the module */ if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) { OPAL_THREAD_UNLOCK(&modex->modex_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* wait until data is available */ while (modex_module->module_data_avail == false) { opal_condition_wait(&modex_module->module_data_cond, &modex->modex_lock); } /* copy the data out to the user */ if (modex_module->module_data_size == 0) { *buffer = NULL; *size = 0; } else { void *copy = malloc(modex_module->module_data_size); if (copy == NULL) { return OMPI_ERR_OUT_OF_RESOURCE; } memcpy(copy, modex_module->module_data, modex_module->module_data_size); *buffer = copy; *size = modex_module->module_data_size; } OPAL_THREAD_UNLOCK(&modex->modex_lock); return OMPI_SUCCESS;}/** * */intmca_pml_base_modex_recv_nb(mca_base_component_t * component, ompi_proc_t * proc, mca_pml_base_modex_cb_fn_t cbfunc, void *cbdata){ mca_pml_base_modex_t *modex; mca_pml_base_modex_module_t *module; mca_pml_base_modex_cb_t *cb; /* check the proc for cached data */ if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { /* see if we already have data for this proc... */ OPAL_THREAD_LOCK(&mca_pml_base_modex_lock); modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name); if (NULL == modex) { /* create an empty modex data... */ modex = OBJ_NEW(mca_pml_base_modex_t); if (NULL == modex) { opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n"); OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); return OMPI_ERR_OUT_OF_RESOURCE; } orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex); OBJ_RETAIN(modex); proc->proc_modex = &modex->super.super; OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); /* verify that we have subscribed to this segment */ mca_pml_base_modex_subscribe(&proc->proc_name); } else { /* create a backpointer from the proc to the modex data */ OBJ_RETAIN(modex); proc->proc_modex = &modex->super.super; OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); } } OPAL_THREAD_LOCK(&modex->modex_lock); /* lookup/create the module */ if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) { OPAL_THREAD_UNLOCK(&modex->modex_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* register the callback */ cb = OBJ_NEW(mca_pml_base_modex_cb_t); cb->component = component; cb->cbfunc = cbfunc; cb->cbdata = cbdata; opal_list_append(&module->module_cbs, (opal_list_item_t *) cb); OPAL_THREAD_UNLOCK(&modex->modex_lock); return OMPI_SUCCESS;}/** * Subscribe to the segment corresponding * to this job. */intmca_pml_base_modex_exchange(void){ return mca_pml_base_modex_subscribe(orte_process_info.my_name);}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?