pml_base_module_exchange.c

来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 762 行 · 第 1/2 页

C
762
字号
                        bytes = NULL;                    }                    /*                     * Lookup the corresponding modex structure                     */                    if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) {                        opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n");                        OBJ_RELEASE(data);                        OPAL_THREAD_UNLOCK(&modex->modex_lock);                        OBJ_RELEASE(modex);                        return;                    }                    modex_module->module_data = bytes;                    modex_module->module_data_size = num_bytes;                    modex_module->module_data_avail = true;                    opal_condition_signal(&modex_module->module_data_cond);                    if (opal_list_get_size(&modex_module->module_cbs)) {                        if (NULL == proc) {                            proc = ompi_proc_find(proc_name);                        }                        if (NULL != proc) {                            OPAL_THREAD_LOCK(&proc->proc_lock);                            /* call any registered callbacks */                            for (item = opal_list_get_first(&modex_module->module_cbs);                                 item != opal_list_get_end(&modex_module->module_cbs);                                 item = opal_list_get_next(item)) {                                mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item;                                cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);                            }                            OPAL_THREAD_UNLOCK(&proc->proc_lock);                        }                    }                }                OPAL_THREAD_UNLOCK(&modex->modex_lock);            } /* if value[i]->cnt > 0 */        }  /* if value[i] != NULL */    }}/** * Make sure we have subscribed to this segment. */static intmca_pml_base_modex_subscribe(orte_process_name_t * name){    char *segment, *sub_name, *trig_name;    orte_gpr_subscription_id_t sub_id;    orte_jobid_t jobid;    opal_list_item_t *item;    mca_pml_base_modex_subscription_t *subscription;    int rc;    char *keys[] = {        ORTE_PROC_NAME_KEY,        OMPI_MODEX_KEY,        NULL    };    /* check for an existing subscription */    OPAL_LOCK(&mca_pml_base_modex_lock);    if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) {	for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions);	     item != opal_list_get_end(&mca_pml_base_modex_subscriptions);	     item = opal_list_get_next(item)) {	    subscription = (mca_pml_base_modex_subscription_t *) item;	    if (subscription->jobid == name->jobid) {		OPAL_UNLOCK(&mca_pml_base_modex_lock);		return OMPI_SUCCESS;	    }	}    }    OPAL_UNLOCK(&mca_pml_base_modex_lock);    /* otherwise - subscribe to get this jobid's contact info */    jobid = name->jobid;    if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,				      OMPI_MODEX_SUBSCRIPTION, jobid))) {	ORTE_ERROR_LOG(rc);	return rc;    }    /* attach to the stage-1 standard trigger */    if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,					    ORTE_STG1_TRIGGER, jobid))) {	ORTE_ERROR_LOG(rc);	free(sub_name);	return rc;    }    /* define the segment */    if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {	ORTE_ERROR_LOG(rc);	free(sub_name);	free(trig_name);	return rc;    }    if (jobid != orte_process_info.my_name->jobid) {        if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, NULL, NULL,                              ORTE_GPR_NOTIFY_ADD_ENTRY |                              ORTE_GPR_NOTIFY_VALUE_CHG |                              ORTE_GPR_NOTIFY_PRE_EXISTING,                              ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,                              segment,                              NULL,	/* look at all                                     * containers on this                                     * segment */                              2, keys,                              mca_pml_base_modex_registry_callback, NULL))) {            ORTE_ERROR_LOG(rc);            free(sub_name);            free(trig_name);            free(segment);            return rc;        }    } else {        if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, trig_name, sub_name,                              ORTE_GPR_NOTIFY_ADD_ENTRY |                              ORTE_GPR_NOTIFY_VALUE_CHG |                              ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,                              ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,                              segment,                              NULL,	/* look at all                                     * containers on this                                     * segment */                              2, keys,                              mca_pml_base_modex_registry_callback, NULL))) {            ORTE_ERROR_LOG(rc);            free(sub_name);            free(trig_name);            free(segment);            return rc;        }    }    free(sub_name);    free(trig_name);    free(segment);    /* add this jobid to our list of subscriptions */    OPAL_LOCK(&mca_pml_base_modex_lock);    subscription = OBJ_NEW(mca_pml_base_modex_subscription_t);    subscription->jobid = name->jobid;    opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item);    OPAL_UNLOCK(&mca_pml_base_modex_lock);    return OMPI_SUCCESS;}/** *  Store the data associated with the specified module in the *  gpr. Note that the gpr is in a mode where it caches *  individual puts during startup and sends them as an aggregate *  command. */intmca_pml_base_modex_send(mca_base_component_t * source_component,			const void *data,			size_t size){    orte_jobid_t jobid;    int rc;    orte_buffer_t buffer;    orte_std_cntr_t i, num_tokens;    char *ptr, *segment, **tokens;    orte_byte_object_t bo;    orte_data_value_t value = ORTE_DATA_VALUE_EMPTY;    jobid = ORTE_PROC_MY_NAME->jobid;    if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {	ORTE_ERROR_LOG(rc);	return rc;    }    if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,			      &num_tokens, orte_process_info.my_name))) {	ORTE_ERROR_LOG(rc);	free(segment);	return rc;    }    OBJ_CONSTRUCT(&buffer, orte_buffer_t);    ptr = source_component->mca_type_name;    if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    ptr = source_component->mca_component_name;    if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    if (0 != size) {	if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) {	    ORTE_ERROR_LOG(rc);	    goto cleanup;	}    }    if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    OBJ_DESTRUCT(&buffer);    /* setup the data_value structure to hold the byte object */    if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) {	ORTE_ERROR_LOG(rc);	goto cleanup;    }    rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,			segment, tokens, OMPI_MODEX_KEY, &value);cleanup:    free(segment);    for (i = 0; i < num_tokens; i++) {	free(tokens[i]);	tokens[i] = NULL;    }    if (NULL != tokens)	free(tokens);    return rc;}/** *  Retreive the data for the specified module from the source process. */intmca_pml_base_modex_recv(mca_base_component_t * component,			ompi_proc_t * proc,			void **buffer,			size_t * size){    mca_pml_base_modex_t *modex;    mca_pml_base_modex_module_t *modex_module;    /* make sure we could possibly have modex data */    if (0 == strcmp(orte_gpr_base_selected_component.gpr_version.mca_component_name,                    "null")) {        return OMPI_ERR_NOT_IMPLEMENTED;    }    /* check the proc for cached data */    if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {        /* see if we already have data for this proc... */        OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);        modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name);        if (NULL == modex) {            /* create an empty modex data... */            modex = OBJ_NEW(mca_pml_base_modex_t);            if (NULL == modex) {                opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n");                OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);                return OMPI_ERR_OUT_OF_RESOURCE;            }            orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex);            OBJ_RETAIN(modex);            proc->proc_modex = &modex->super.super;            OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);            /* verify that we have subscribed to this segment */            mca_pml_base_modex_subscribe(&proc->proc_name);        } else {            /* create a backpointer from the proc to the modex data */            OBJ_RETAIN(modex);            proc->proc_modex = &modex->super.super;            OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);        }    }    OPAL_THREAD_LOCK(&modex->modex_lock);    /* lookup/create the module */    if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) {	OPAL_THREAD_UNLOCK(&modex->modex_lock);	return OMPI_ERR_OUT_OF_RESOURCE;    }    /* wait until data is available */    while (modex_module->module_data_avail == false) {	opal_condition_wait(&modex_module->module_data_cond, &modex->modex_lock);    }    /* copy the data out to the user */    if (modex_module->module_data_size == 0) {	*buffer = NULL;	*size = 0;    } else {	void *copy = malloc(modex_module->module_data_size);	if (copy == NULL) {	    return OMPI_ERR_OUT_OF_RESOURCE;	}	memcpy(copy, modex_module->module_data, modex_module->module_data_size);	*buffer = copy;	*size = modex_module->module_data_size;    }    OPAL_THREAD_UNLOCK(&modex->modex_lock);    return OMPI_SUCCESS;}/** * */intmca_pml_base_modex_recv_nb(mca_base_component_t * component,			   ompi_proc_t * proc,			   mca_pml_base_modex_cb_fn_t cbfunc,			   void *cbdata){    mca_pml_base_modex_t *modex;    mca_pml_base_modex_module_t *module;    mca_pml_base_modex_cb_t *cb;    /* check the proc for cached data */    if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {        /* see if we already have data for this proc... */        OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);        modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name);        if (NULL == modex) {            /* create an empty modex data... */            modex = OBJ_NEW(mca_pml_base_modex_t);            if (NULL == modex) {                opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n");                OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);                return OMPI_ERR_OUT_OF_RESOURCE;            }            orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex);            OBJ_RETAIN(modex);            proc->proc_modex = &modex->super.super;            OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);            /* verify that we have subscribed to this segment */            mca_pml_base_modex_subscribe(&proc->proc_name);        } else {            /* create a backpointer from the proc to the modex data */            OBJ_RETAIN(modex);            proc->proc_modex = &modex->super.super;            OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);        }    }    OPAL_THREAD_LOCK(&modex->modex_lock);    /* lookup/create the module */    if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) {	OPAL_THREAD_UNLOCK(&modex->modex_lock);	return OMPI_ERR_OUT_OF_RESOURCE;    }    /* register the callback */    cb = OBJ_NEW(mca_pml_base_modex_cb_t);    cb->component = component;    cb->cbfunc = cbfunc;    cb->cbdata = cbdata;    opal_list_append(&module->module_cbs, (opal_list_item_t *) cb);    OPAL_THREAD_UNLOCK(&modex->modex_lock);    return OMPI_SUCCESS;}/** * Subscribe to the segment corresponding * to this job. */intmca_pml_base_modex_exchange(void){    return mca_pml_base_modex_subscribe(orte_process_info.my_name);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?