pml_base_module_exchange.c
来自「MPI stands for the Message Passing Inter」· C语言 代码 · 共 762 行 · 第 1/2 页
C
762 行
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include "ompi/proc/proc.h"#include "opal/threads/condition.h"#include "opal/util/output.h"#include "orte/util/proc_info.h"#include "orte/class/orte_proc_table.h"#include "orte/dss/dss.h"#include "opal/mca/mca.h"#include "opal/mca/base/base.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/rml/rml.h"#include "orte/mca/schema/schema.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/gpr/base/base.h"#include "orte/mca/ns/ns.h"#include "ompi/constants.h"#include "ompi/mca/pml/pml.h"#include "ompi/mca/pml/base/pml_base_module_exchange.h"/* MODEX DESIGN * * Modex data is always associated with a given ompi_proc_t. However, * because modex data is received from the GPR for entire jobids, it * is possible that the modex callback will receive data for a process * not yet in the ompi_proc_all() list of processes. This information * must be kept for later use, because if accept/connect causes the * proc to be added to the ompi_proc_all() list, the subscription to * the mdoex information can not be reliably fired without causing a * potential connection storm. Therefore, we use an orte_proc_table * backing store to contain all modex information. Backpointers are * provided from the ompi_proc_t structure to improve lookup * performance in the common case. * * While we could add the now discovered proc into the ompi_proc_all() * list, this has some problems, in that we don't have the * architecture and hostname information needed to properly fill in * the ompi_proc_t structure and we don't want to cause GPR * communication to get it when we dont' really need to know anything * about the remote proc. * *//** * callback data for modex */struct mca_pml_base_modex_cb_t { opal_list_item_t super; mca_base_component_t *component; mca_pml_base_modex_cb_fn_t cbfunc; void *cbdata;};typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t;OBJ_CLASS_INSTANCE(mca_pml_base_modex_cb_t, opal_list_item_t, NULL, NULL);/** * mca_pml_base_modex_module_t * * Data for a specic proc and module. */struct mca_pml_base_modex_module_t { opal_list_item_t super; mca_base_component_t component; void *module_data; size_t module_data_size; bool module_data_avail; opal_list_t module_cbs; opal_condition_t module_data_cond;};typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t;static voidmca_pml_base_modex_module_construct(mca_pml_base_modex_module_t * module){ OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t); OBJ_CONSTRUCT(&module->module_cbs, opal_list_t); memset(&module->component, 0, sizeof(module->component)); module->module_data = NULL; module->module_data_size = 0; module->module_data_avail = false;}static voidmca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t * module){ OBJ_DESTRUCT(&module->module_data_cond);}OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t, opal_list_item_t, mca_pml_base_modex_module_construct, mca_pml_base_modex_module_destruct);/** * mca_pml_base_modex_t * * List of modules (mca_pml_base_modex_module_t) for which data has been * received from peers. */struct mca_pml_base_modex_t { opal_list_item_t super; opal_mutex_t modex_lock; opal_list_t modex_modules;};typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;static voidmca_pml_base_modex_construct(mca_pml_base_modex_t * modex){ OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t); OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t);}static voidmca_pml_base_modex_destruct(mca_pml_base_modex_t * modex){ OBJ_DESTRUCT(&modex->modex_modules); OBJ_DESTRUCT(&modex->modex_lock);}OBJ_CLASS_INSTANCE(mca_pml_base_modex_t, opal_object_t, mca_pml_base_modex_construct, mca_pml_base_modex_destruct);/** * mca_pml_base_modex_subscription_t * * Track segments we have subscribed to. */struct mca_pml_base_modex_subscription_t { opal_list_item_t item; orte_jobid_t jobid;};typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t;OBJ_CLASS_INSTANCE(mca_pml_base_modex_subscription_t, opal_list_item_t, NULL, NULL);/** * Globals to track the list of subscriptions. */static opal_list_t mca_pml_base_modex_subscriptions;static opal_hash_table_t mca_pml_base_modex_data;static opal_mutex_t mca_pml_base_modex_lock;/** * Initialize global state. */intmca_pml_base_modex_init(void){ OBJ_CONSTRUCT(&mca_pml_base_modex_data, opal_hash_table_t); OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t); OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t); opal_hash_table_init(&mca_pml_base_modex_data, 256); return OMPI_SUCCESS;}/** * Cleanup global state. */intmca_pml_base_modex_finalize(void){ opal_list_item_t *item; opal_hash_table_remove_all(&mca_pml_base_modex_data); OBJ_DESTRUCT(&mca_pml_base_modex_data); while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions))) OBJ_RELEASE(item); OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions); return OMPI_SUCCESS;}/** * Look to see if there is any data associated with a specified module. */static mca_pml_base_modex_module_t *mca_pml_base_modex_lookup_module(mca_pml_base_modex_t * modex, mca_base_component_t * component){ mca_pml_base_modex_module_t *modex_module; for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules); modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules); modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) { if (mca_base_component_compatible(&modex_module->component, component) == 0) { return modex_module; } } return NULL;}/** * Create a placeholder for data associated with the specified module. */static mca_pml_base_modex_module_t *mca_pml_base_modex_create_module(mca_pml_base_modex_t * modex, mca_base_component_t * component){ mca_pml_base_modex_module_t *modex_module; if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) { modex_module = OBJ_NEW(mca_pml_base_modex_module_t); if (NULL != modex_module) { modex_module->component = *component; opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module); } } return modex_module;}/** * Callback for registry notifications. */static voidmca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data, void *cbdata){ orte_std_cntr_t i, j, k; orte_gpr_value_t **values, *value; orte_gpr_keyval_t **keyval; orte_process_name_t *proc_name; mca_pml_base_modex_t *modex; mca_pml_base_modex_module_t *modex_module; mca_base_component_t component; int rc; ompi_proc_t *proc; /* process the callback */ values = (orte_gpr_value_t **) (data->values)->addr; for (i = 0, k = 0; k < data->cnt && i < (data->values)->size; i++) { if (NULL != values[i]) { k++; value = values[i]; if (0 < value->cnt) { /* needs to be at least one keyval */ /* Find the process name in the keyvals */ keyval = value->keyvals; for (j = 0; j < value->cnt; j++) { if (0 != strcmp(keyval[j]->key, ORTE_PROC_NAME_KEY)) continue; /* this is the process name - extract it */ if (ORTE_SUCCESS != orte_dss.get((void**)&proc_name, keyval[j]->value, ORTE_NAME)) { opal_output(0, "mca_pml_base_modex_registry_callback: unable to extract process name\n"); return; /* nothing we can do */ } goto GOTNAME; } opal_output(0, "mca_pml_base_modex_registry_callback: unable to find process name in notify message\n"); return; /* if the name wasn't here, there is nothing we can do */ GOTNAME: /* look up the modex data structure */ OPAL_THREAD_LOCK(&mca_pml_base_modex_lock); modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, proc_name); if (modex == NULL) { /* create a modex data structure for this proc */ modex = OBJ_NEW(mca_pml_base_modex_t); if (NULL == modex) { opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n"); OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); return; } orte_hash_table_set_proc(&mca_pml_base_modex_data, proc_name, modex); } OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock); OPAL_THREAD_LOCK(&modex->modex_lock); proc = NULL; /* * Extract the component name and version from the keyval object's key * Could be multiple keyvals returned since there is one for each * component type/name/version - process them all */ keyval = value->keyvals; for (j = 0; j < value->cnt; j++) { orte_buffer_t buffer; opal_list_item_t *item; char *ptr; void *bytes = NULL; orte_std_cntr_t cnt; size_t num_bytes; orte_byte_object_t *bo; if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0) continue; OBJ_CONSTRUCT(&buffer, orte_buffer_t); if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) { ORTE_ERROR_LOG(rc); continue; } if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) { ORTE_ERROR_LOG(rc); continue; } cnt = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { ORTE_ERROR_LOG(rc); continue; } strcpy(component.mca_type_name, ptr); free(ptr); cnt = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { ORTE_ERROR_LOG(rc); continue; } strcpy(component.mca_component_name, ptr); free(ptr); cnt = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &component.mca_component_major_version, &cnt, ORTE_INT32))) { ORTE_ERROR_LOG(rc); continue; } cnt = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &component.mca_component_minor_version, &cnt, ORTE_INT32))) { ORTE_ERROR_LOG(rc); continue; } cnt = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &num_bytes, &cnt, ORTE_SIZE))) { ORTE_ERROR_LOG(rc); continue; } if (num_bytes != 0) { if (NULL == (bytes = malloc(num_bytes))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); continue; } cnt = (orte_std_cntr_t) num_bytes; if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) { ORTE_ERROR_LOG(rc); continue; } num_bytes = cnt; } else {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?