⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gpr_replica_messaging_fn.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- C -*- * * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ *//** @file: * * The Open MPI General Purpose Registry - Replica component * *//* * includes */#include "orte_config.h"#include "orte/orte_constants.h"#include "opal/util/output.h"#include "orte/dss/dss.h"#include "orte/util/proc_info.h"#include "orte/mca/ns/ns.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/schema/schema.h"#include "orte/mca/gpr/base/base.h"#include "orte/mca/gpr/replica/api_layer/gpr_replica_api.h"#include "orte/mca/gpr/replica/communications/gpr_replica_comm.h"#include "gpr_replica_fn.h"static int orte_gpr_replica_get_callback_data(orte_gpr_value_t ***values, orte_std_cntr_t *num_vals,                                              orte_gpr_replica_subscription_t *sub);static int orte_gpr_replica_store_value_in_trigger_msg(orte_gpr_replica_subscription_t *sub,                                                       orte_gpr_notify_message_t *msg,                                                       orte_std_cntr_t cnt,                                                       orte_gpr_value_t **values);int orte_gpr_replica_process_callbacks(void){    orte_gpr_replica_callbacks_t *cb;    orte_gpr_replica_trigger_t **trigs;    orte_gpr_replica_subscription_t **subs;    orte_gpr_replica_requestor_t **reqs;    orte_std_cntr_t i, j, k, m;    int rc;    /* check and set flag indicating callbacks being processed */    if (orte_gpr_replica.processing_callbacks) {        return ORTE_SUCCESS;    }    orte_gpr_replica.processing_callbacks = true;    while (NULL != (cb = (orte_gpr_replica_callbacks_t*)opal_list_remove_last(&orte_gpr_replica.callbacks))) {        /* each callback corresponds to a specific requestor         * The message in the callback consists of at least one (and can         * be more) "datagrams" intended for that requestor, each of which         * is slated to be returned to a specific function on the requestor.         */        if (NULL == cb->requestor) {  /* local callback */            /* Since this requestor is "local", we simply execute             * the callbacks ourself.             */            OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);            if (ORTE_SUCCESS != (rc = orte_gpr_replica_deliver_notify_msg(cb->message))) {                ORTE_ERROR_LOG(rc);            }            OPAL_THREAD_LOCK(&orte_gpr_replica_globals.mutex);        } else {  /* remote request - send messages back */           orte_gpr_replica_remote_notify(cb->requestor, cb->message);        }        OBJ_RELEASE(cb);    }    /* cleanup any one-shot triggers that fired and set processing to     * false on all others     */    trigs = (orte_gpr_replica_trigger_t**)((orte_gpr_replica.triggers)->addr);    for (i=0, k=0, m=0; k < orte_gpr_replica.num_trigs &&                   i < (orte_gpr_replica.triggers)->size; i++) {        if (NULL != trigs[i]) {            k++;            if (trigs[i]->one_shot_fired) {                OBJ_RELEASE(trigs[i]);                orte_pointer_array_set_item(orte_gpr_replica.triggers, i, NULL);                m++;            } else {                trigs[i]->processing = false;            }        }    }    orte_gpr_replica.num_trigs -= m;    /* cleanup any subscriptions that are supposed to be     * removed based on a trigger having fired - set processing to false     * on all others     */    subs = (orte_gpr_replica_subscription_t**)(orte_gpr_replica.subscriptions)->addr;    for (i=0, k=0; k < orte_gpr_replica.num_subs &&                   i < (orte_gpr_replica.subscriptions)->size; i++) {        if (NULL != subs[i]) {            k++;            if (subs[i]->cleanup) {                reqs = (orte_gpr_replica_requestor_t**)(subs[i]->requestors)->addr;                for (j=0, m=0; NULL != subs[i] &&                               m < subs[i]->num_requestors &&                               j < (subs[i]->requestors)->size; j++) {                    if (NULL != reqs[j]) {                        m++;                        if (ORTE_SUCCESS != (rc =                                orte_gpr_replica_remove_subscription(reqs[j]->requestor, reqs[j]->idtag))) {                            ORTE_ERROR_LOG(rc);                            return rc;                        }                    }                }            } else {                subs[i]->processing = false;            }        }    }    /* all callbacks processed - indicate list is open */    orte_gpr_replica.processing_callbacks = false;    return ORTE_SUCCESS;}int orte_gpr_replica_register_callback(orte_gpr_replica_subscription_t *sub,                                       orte_gpr_value_t *value){    orte_gpr_replica_callbacks_t *cb;    orte_gpr_replica_requestor_t **reqs;    orte_gpr_value_t **values;    orte_std_cntr_t cnt;    orte_std_cntr_t i, j;    bool cleanup_reqd;    int rc=ORTE_SUCCESS;    /* The data to be returned will be the same for all requestors     * on this subscription. First, let's get the data (if it hasn't     * already been provided) so we have it ready to be added to     * the callback     */    if (NULL != value) { /* no need to get data - already provided */        values = &value;        cnt = 1;        cleanup_reqd = false;    } else {        if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_callback_data(&values, &cnt, sub))) {            ORTE_ERROR_LOG(rc);            return rc;        }        cleanup_reqd = true;    }    /* We now have the data to be sent to each requestor attached     * to this subscription.     * Each subscription that was placed on the system has an associated     * structure containing the process name and array of callback info where     * data is to be returned. For remote processes, the callback     * info is omitted and a subscription id is recorded - this tells     * the remote process which callback function to use when it receives     * a message from us.     * Each subscription can have multiple "requestors" attached to it,     * each "requestor" consisting of the process name and     * subscription id (for remote processes), and callback info (for local     * processes).     * For each requestor, we need to check to see if a callback has     * already been scheduled to that destination - if so, we piggyback     * another datagram onto it to minimize communication costs.     */    /* this data is intended to be sent to the individual     * subscribers themselves. Cycle through the subscription's     * requestors, define callbacks to them appropriately,     * and set the id to indicate that it does NOT go     * to a trigger     */    reqs = (orte_gpr_replica_requestor_t**)(sub->requestors)->addr;    for (i=0, j=0; j < sub->num_requestors &&                   i < (sub->requestors)->size; i++) {        if (NULL != reqs[i]) {            j++;            /* define the callback */            if (ORTE_SUCCESS != (rc = orte_gpr_replica_define_callback(ORTE_GPR_SUBSCRIPTION_MSG,                                            &cb, reqs[i]->requestor))) {                ORTE_ERROR_LOG(rc);                goto CLEANUP;            }            /* set the callback id to indicate not a trigger callback */            (cb->message)->id = ORTE_GPR_TRIGGER_ID_MAX;            /* okay, now we have a message going to the requestor. We need to             * store the values in the notify_data structure corresponding to this             * subscription id, combining data where the id's match             */            if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_msg(reqs[i],                                            cb->message, sub->name, cnt, values))) {                ORTE_ERROR_LOG(rc);                goto CLEANUP;            }        }    }  /* for i */CLEANUP:    /* release the values here - the value objects have been "retained" in     * the store_value function, so this just ensures that they will be     * released after the last datagram lets go of them     */    for (i=0; i < cnt; i++) OBJ_RELEASE(values[i]);    /* release the values array IF and only IF it was malloc'd here.     * otherwise, the value is coming in from the outside - when that happens,     * only a single value is passed in, so there is no array to free     */    if (cleanup_reqd && NULL != values) free(values);    return rc;}int orte_gpr_replica_register_trigger_callback(orte_gpr_replica_trigger_t *trig){    orte_gpr_replica_callbacks_t *cb;    orte_gpr_replica_counter_t **cntr;    orte_gpr_replica_subscription_t **subs;    orte_gpr_value_t **values, *value;    orte_std_cntr_t i, j, k, cnt;    int rc;    /* set the callback's message     * to point at the correct trigger id for that requestor     * so the message goes to the correct place, and go ahead     * and store the data in the message     */    /* define the callback */    if (ORTE_SUCCESS != (rc = orte_gpr_replica_define_callback(ORTE_GPR_TRIGGER_MSG,                                    &cb, (trig->master)->requestor))) {        ORTE_ERROR_LOG(rc);        return rc;    }    /* transfer the trigger name, if available */    if (NULL != trig->name) {        (cb->message)->target = strdup(trig->name);    }    /* set the callback id to point to the trigger callback function */    (cb->message)->id = (trig->master)->idtag;    /* if the trigger counters are to be included, do so */    if (ORTE_GPR_TRIG_INCLUDE_TRIG_CNTRS & trig->action) {        cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr);        for (i=0, j=0; j < trig->num_counters &&                       i < (trig->counters)->size; i++) {            if (NULL != cntr[i]) {                j++;                value = OBJ_NEW(orte_gpr_value_t);                if (NULL == value) {                    ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                    return ORTE_ERR_OUT_OF_RESOURCE;                }                value->segment = strdup(cntr[i]->seg->name);                value->cnt = 1;                value->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));                if (NULL == value->keyvals) {                    ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                    return ORTE_ERR_OUT_OF_RESOURCE;                }                value->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);                if (NULL == value->keyvals[0]) {                    ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                    return ORTE_ERR_OUT_OF_RESOURCE;                }                if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup(                                        &(value->keyvals[0]->key), cntr[i]->seg,                                        cntr[i]->iptr->itag))) {                    ORTE_ERROR_LOG(rc);                    OBJ_RELEASE(value);                    return rc;                }                value->keyvals[0]->value = OBJ_NEW(orte_data_value_t);                if (NULL == value->keyvals[0]->value) {                    ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                    return ORTE_ERR_OUT_OF_RESOURCE;                }                value->keyvals[0]->value->type = cntr[i]->iptr->value->type;                if (ORTE_SUCCESS != (rc = orte_dss.copy(&((value->keyvals[0]->value)->data), cntr[i]->iptr->value->data, cntr[i]->iptr->value->type))) {                    ORTE_ERROR_LOG(rc);                    OBJ_RELEASE(value);                    return rc;                }               /*                * store the data in the message                */                if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_trigger_msg(NULL,                                                cb->message, 1, &value))) {                    ORTE_ERROR_LOG(rc);                    return rc;                }                /* release the storage */                OBJ_RELEASE(value);            }        }    }    /* cycle through all the trigger's subscriptions and place     * that data on the message     */    subs = (orte_gpr_replica_subscription_t**)(trig->subscriptions)->addr;    for (i=0, j=0; j < trig->num_subscriptions &&                   i < (trig->subscriptions)->size; i++) {        if (NULL != subs[i]) {            j++;            if (NULL != subs[i]->name) {                /* if it's a named subscription, we will deliver it via the                 * trigger callback function. The data to be returned will                 * be the same for all requestors.                 */                if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_callback_data(&values, &cnt, subs[i]))) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -