⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gpr_proxy_component.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
字号:
/* -*- C -*- * * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ *//** @file: * * The Open MPI General Purpose Registry - Proxy component * *//* * includes */#include "orte_config.h"#include "orte/orte_constants.h"#include "orte/orte_types.h"#include "orte/dss/dss.h"#include "opal/util/output.h"#include "orte/util/proc_info.h"#include "orte/mca/ns/ns.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/oob/oob_types.h"#include "orte/mca/rml/rml.h"#include "orte/mca/errmgr/errmgr.h"#include "gpr_proxy.h"/* * Struct of function pointers that need to be initialized */mca_gpr_base_component_t mca_gpr_proxy_component = {    {    MCA_GPR_BASE_VERSION_1_0_0,    "proxy", /* MCA module name */    ORTE_MAJOR_VERSION,  /* MCA module major version */    ORTE_MINOR_VERSION,  /* MCA module minor version */    ORTE_RELEASE_VERSION,  /* MCA module release version */    orte_gpr_proxy_open,  /* module open */    orte_gpr_proxy_close /* module close */    },    {    false /* checkpoint / restart */    },    orte_gpr_proxy_component_init,    /* module init */    orte_gpr_proxy_finalize /* module shutdown */};/* * setup the function pointers for the module */static orte_gpr_base_module_t orte_gpr_proxy = {   /* INIT */    orte_gpr_proxy_module_init,   /* BLOCKING OPERATIONS */    orte_gpr_proxy_get,    orte_gpr_proxy_get_conditional,    orte_gpr_proxy_put,    orte_gpr_base_put_1,    orte_gpr_base_put_N,    orte_gpr_proxy_delete_entries,    orte_gpr_proxy_delete_segment,    orte_gpr_proxy_index,    /* NON-BLOCKING OPERATIONS */    orte_gpr_proxy_get_nb,    orte_gpr_proxy_put_nb,    orte_gpr_proxy_delete_entries_nb,    orte_gpr_proxy_delete_segment_nb,    orte_gpr_proxy_index_nb,    /* GENERAL OPERATIONS */    orte_gpr_base_create_value,    orte_gpr_base_create_keyval,    orte_gpr_proxy_preallocate_segment,    orte_gpr_proxy_deliver_notify_msg,    /* ARITHMETIC OPERATIONS */    orte_gpr_proxy_increment_value,    orte_gpr_proxy_decrement_value,    /* SUBSCRIBE OPERATIONS */    orte_gpr_proxy_subscribe,    orte_gpr_base_subscribe_1,    orte_gpr_base_subscribe_N,    orte_gpr_base_define_trigger,    orte_gpr_base_define_trigger_level,    orte_gpr_proxy_unsubscribe,    orte_gpr_proxy_cancel_trigger,    /* COMPOUND COMMANDS */    orte_gpr_proxy_begin_compound_cmd,    orte_gpr_proxy_stop_compound_cmd,    orte_gpr_proxy_exec_compound_cmd,    /* DIAGNOSTIC OPERATIONS */    orte_gpr_proxy_dump_all,    orte_gpr_proxy_dump_segments,    orte_gpr_proxy_dump_triggers,    orte_gpr_proxy_dump_subscriptions,    orte_gpr_proxy_dump_a_trigger,    orte_gpr_proxy_dump_a_subscription,    orte_gpr_proxy_dump_local_triggers,    orte_gpr_proxy_dump_local_subscriptions,    orte_gpr_proxy_dump_callbacks,    orte_gpr_proxy_dump_notify_msg,    orte_gpr_proxy_dump_notify_data,    orte_gpr_proxy_dump_value,    orte_gpr_proxy_dump_segment_size,    /* CLEANUP OPERATIONS */    orte_gpr_proxy_cleanup_job,    orte_gpr_proxy_cleanup_proc};/* * Whether or not we allowed this component to be selected */static bool initialized = false;/* * globals needed within proxy component */orte_gpr_proxy_globals_t orte_gpr_proxy_globals;/* SUBSCRIBER *//* constructor - used to initialize subscriber instance */static void orte_gpr_proxy_subscriber_construct(orte_gpr_proxy_subscriber_t* req){    req->callback = NULL;    req->user_tag = NULL;    req->id = 0;    req->name = NULL;}/* destructor - used to free any resources held by instance */static void orte_gpr_proxy_subscriber_destructor(orte_gpr_proxy_subscriber_t* req){    if (NULL != req->name) free(req->name);}/* define instance of opal_class_t */OBJ_CLASS_INSTANCE(            orte_gpr_proxy_subscriber_t,            /* type name */            opal_object_t,                          /* parent "class" name */            orte_gpr_proxy_subscriber_construct,    /* constructor */            orte_gpr_proxy_subscriber_destructor);  /* destructor *//* TRIGGER *//* constructor - used to initialize trigger instance */static void orte_gpr_proxy_trigger_construct(orte_gpr_proxy_trigger_t* req){    req->callback = NULL;    req->user_tag = NULL;    req->id = 0;    req->name = NULL;}/* destructor - used to free any resources held by instance */static void orte_gpr_proxy_trigger_destructor(orte_gpr_proxy_trigger_t* req){    if (NULL != req->name) free(req->name);}/* define instance of opal_class_t */OBJ_CLASS_INSTANCE(            orte_gpr_proxy_trigger_t,            /* type name */            opal_object_t,                          /* parent "class" name */            orte_gpr_proxy_trigger_construct,    /* constructor */            orte_gpr_proxy_trigger_destructor);  /* destructor *//* * Open the component */int orte_gpr_proxy_open(void){    int id, tmp;    id = mca_base_param_register_int("gpr", "proxy", "debug", NULL, 0);    mca_base_param_lookup_int(id, &tmp);    if (tmp) {        orte_gpr_proxy_globals.debug = true;    } else {        orte_gpr_proxy_globals.debug = false;    }    return ORTE_SUCCESS;}/* * Close the component */int orte_gpr_proxy_close(void){    return ORTE_SUCCESS;}orte_gpr_base_module_t*orte_gpr_proxy_component_init(bool *allow_multi_user_threads, bool *have_hidden_threads,                            int *priority){    orte_process_name_t name;    int ret, value;    if (orte_gpr_proxy_globals.debug) {        opal_output(0, "gpr_proxy_init called");    }    /* If we are NOT to host a replica, then we want to be selected, so do all       the setup and return the module */    if (NULL != orte_process_info.gpr_replica_uri) {        if (orte_gpr_proxy_globals.debug) {            opal_output(0, "[%lu,%lu,%lu] gpr_proxy_init: proxy selected",                        ORTE_NAME_ARGS(orte_process_info.my_name));        }        /* setup the replica location */       if(ORTE_SUCCESS != (ret = orte_rml.parse_uris(orte_process_info.gpr_replica_uri, &name, NULL))) {           ORTE_ERROR_LOG(ret);           return NULL;       }       if(ORTE_SUCCESS != (ret = orte_dss.copy((void**)&orte_process_info.gpr_replica, &name, ORTE_NAME))) {           ORTE_ERROR_LOG(ret);           return NULL;        }        /* Return a module (choose an arbitrary, positive priority --           it's only relevant compared to other ns components).  If           we're not the seed, then we don't want to be selected, so           return NULL. */        *priority = 10;        /* We allow multi user threads but don't have any hidden threads */        *allow_multi_user_threads = true;        *have_hidden_threads = false;        /* setup thread locks and condition variable */        OBJ_CONSTRUCT(&orte_gpr_proxy_globals.mutex, opal_mutex_t);        OBJ_CONSTRUCT(&orte_gpr_proxy_globals.wait_for_compound_mutex, opal_mutex_t);        OBJ_CONSTRUCT(&orte_gpr_proxy_globals.compound_cmd_condition, opal_condition_t);        /* initialize the registry compound mode */        orte_gpr_proxy_globals.compound_cmd_mode = false;        orte_gpr_proxy_globals.compound_cmd_waiting = 0;        orte_gpr_proxy_globals.compound_cmd = NULL;        /* initialize the subscription tracker */        if (ORTE_SUCCESS != (ret = orte_pointer_array_init(&(orte_gpr_proxy_globals.subscriptions),                                (orte_std_cntr_t)orte_gpr_array_block_size,                                (orte_std_cntr_t)orte_gpr_array_max_size,                                (orte_std_cntr_t)orte_gpr_array_block_size))) {            ORTE_ERROR_LOG(ret);            return NULL;        }        orte_gpr_proxy_globals.num_subs = 0;        /* initialize the trigger counter */        if (ORTE_SUCCESS != (ret = orte_pointer_array_init(&(orte_gpr_proxy_globals.triggers),                                (orte_std_cntr_t)orte_gpr_array_block_size,                                (orte_std_cntr_t)orte_gpr_array_max_size,                                (orte_std_cntr_t)orte_gpr_array_block_size))) {            ORTE_ERROR_LOG(ret);            return NULL;        }        orte_gpr_proxy_globals.num_trigs = 0;        /* check to see if we want timing information */        mca_base_param_reg_int_name("orte", "timing",                                    "Request that critical timing loops be measured",                                    false, false, 0, &value);        if (value != 0) {            orte_gpr_proxy_globals.timing = true;        }        initialized = true;        return &orte_gpr_proxy;    } else {        return NULL;    }}int orte_gpr_proxy_module_init(void){    /* issue the non-blocking receive */    int rc;    rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_GPR_NOTIFY, ORTE_RML_PERSISTENT, orte_gpr_proxy_notify_recv, NULL);    if(rc < 0) {        ORTE_ERROR_LOG(rc);        return rc;    }    return ORTE_SUCCESS;}/* * finalize routine */int orte_gpr_proxy_finalize(void){    orte_std_cntr_t i;    orte_gpr_subscription_id_t j;    orte_gpr_trigger_id_t k;    orte_gpr_proxy_subscriber_t **lsubs;    orte_gpr_proxy_trigger_t **ltrigs;        if (orte_gpr_proxy_globals.debug) {       opal_output(0, "[%lu,%lu,%lu] gpr_proxy_finalize called",                        ORTE_NAME_ARGS(orte_process_info.my_name));    }    if (initialized) {        /* destruct the mutex and condition variables */        OBJ_DESTRUCT(&orte_gpr_proxy_globals.mutex);        OBJ_DESTRUCT(&orte_gpr_proxy_globals.wait_for_compound_mutex);        OBJ_DESTRUCT(&orte_gpr_proxy_globals.compound_cmd_condition);        /* clear the local subscriptions and triggers */        if (NULL != orte_gpr_proxy_globals.subscriptions) {            lsubs = (orte_gpr_proxy_subscriber_t**)(orte_gpr_proxy_globals.subscriptions)->addr;            for (i=0, j=0; j < orte_gpr_proxy_globals.num_subs &&                        i < (orte_gpr_proxy_globals.subscriptions)->size; i++) {                if (NULL != lsubs[i]) {                    j++;                    OBJ_RELEASE(lsubs[i]);                }            }            OBJ_RELEASE(orte_gpr_proxy_globals.subscriptions);        }            ltrigs = (orte_gpr_proxy_trigger_t**)(orte_gpr_proxy_globals.triggers)->addr;        if (NULL != orte_gpr_proxy_globals.triggers) {            for (i=0, k=0; k < orte_gpr_proxy_globals.num_trigs &&                        i < (orte_gpr_proxy_globals.triggers)->size; i++) {                if (NULL != ltrigs[i]) {                    k++;                    OBJ_RELEASE(ltrigs[i]);                }            }            OBJ_RELEASE(orte_gpr_proxy_globals.triggers);        }       initialized = false;    }    /* All done */    orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_GPR_NOTIFY);    return ORTE_SUCCESS;}/* * handle notify messages from replicas */void orte_gpr_proxy_notify_recv(int status, orte_process_name_t* sender,                   orte_buffer_t *buffer, orte_rml_tag_t tag,                   void* cbdata){    orte_gpr_cmd_flag_t command;    orte_gpr_notify_message_t *msg;    orte_std_cntr_t n;    int rc;    n = 1;    if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &command, &n, ORTE_GPR_CMD))) {        ORTE_ERROR_LOG(rc);        goto RETURN_ERROR;    }    if (ORTE_GPR_NOTIFY_CMD != command) {        ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);        goto RETURN_ERROR;    }    msg = OBJ_NEW(orte_gpr_notify_message_t);    if (NULL == msg) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        goto RETURN_ERROR;    }    n = 1;    if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &msg, &n, ORTE_GPR_NOTIFY_MSG))) {        ORTE_ERROR_LOG(rc);        OBJ_RELEASE(msg);        goto RETURN_ERROR;    }    /* process the message */    if (ORTE_SUCCESS != (rc = orte_gpr_proxy_deliver_notify_msg(msg))) {        ORTE_ERROR_LOG(rc);        OBJ_RELEASE(msg);        goto RETURN_ERROR;     }    /* release data */    OBJ_RELEASE(msg);RETURN_ERROR:    return;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -