📄 gpr_replica_trig_ops_fn.c
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ *//** @file: * * The Open MPI general purpose registry - support functions. * *//* * includes */#include "orte_config.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/mca/ns/ns.h"#include "opal/util/output.h"#include "opal/util/trace.h"#include "orte/mca/gpr/replica/api_layer/gpr_replica_api.h"#include "orte/mca/gpr/replica/transition_layer/gpr_replica_tl.h"#include "orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h"/* * GENERAL REGISTRY TRIGGER FUNCTIONS */intorte_gpr_replica_register_subscription(orte_gpr_replica_subscription_t **subptr, orte_process_name_t *requestor, orte_gpr_subscription_t *subscription){ int rc; orte_std_cntr_t i, j, k, num_tokens, num_keys; orte_gpr_replica_subscription_t *sub, **subs; orte_gpr_replica_requestor_t *req, **reqs; orte_gpr_replica_addr_mode_t tok_mode, key_mode; orte_gpr_replica_itag_t itag, *tokentags=NULL; orte_gpr_replica_ivalue_t *ival; OPAL_TRACE(3); /* if this is a named subscription, see if that name has * already been entered on the replica. If it has, then we * simply attach this recipient to that subscription - * this indicates that this recipient would also like a * copy of the data generated by that subscription */ if (NULL != subscription->name) { /* look for this name on current list */ subs = (orte_gpr_replica_subscription_t**)(orte_gpr_replica.subscriptions)->addr; for (k=0, j=0; j < orte_gpr_replica.num_subs && k < (orte_gpr_replica.subscriptions)->size; k++) { if (NULL != subs[k]) { j++; if (NULL != subs[k]->name && NULL != subscription->name && 0 == strcmp(subs[k]->name, subscription->name)) { /* found name on list - add another recipient to that * subscription */ sub = subs[k]; goto ADDREQ; } } } } /* Either this is NOT a named subscription, or it is named * but that name is NOT on the current list of subscriptions. * Either way, we add this subscription to the replica's list. * * NOTE that you CANNOT add yourself as a recipient to a non-named * subscription - even if all the subscription specifications are * identical. This is done in the interest of speed as checking * all the specifications would take some time. Subscriptions are * "named" because they are intended to be used by multiple processes. * Un-named subscriptions are, therefore, assumed to be specialty * subscriptions that do not merit such consideration. */ /* see if another subscription is available on the system */ if (ORTE_GPR_SUBSCRIPTION_ID_MAX-1 < orte_gpr_replica.num_subs) { /* none left! */ ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } sub = OBJ_NEW(orte_gpr_replica_subscription_t); if (NULL == sub) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } sub->idtag = orte_gpr_replica.num_subs; if (NULL != subscription->name) { sub->name = strdup(subscription->name); } sub->action = subscription->action; if (ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG & sub->action) { sub->active = false; } else { sub->active = true; } /* store all the data specifications for this subscription */ for (i=0; i < subscription->cnt; i++) { ival = OBJ_NEW(orte_gpr_replica_ivalue_t); if (NULL == ival) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(sub); return ORTE_ERR_OUT_OF_RESOURCE; } /* find and store the segment */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&(ival->seg), true, subscription->values[i]->segment))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sub); OBJ_RELEASE(ival); return rc; } tok_mode = ORTE_GPR_REPLICA_TOKMODE((subscription->values[i])->addr_mode); if (0x00 == tok_mode) { /* default token address mode to AND */ subscription->values[i]->addr_mode = subscription->values[i]->addr_mode | ORTE_GPR_TOKENS_AND; } key_mode = ORTE_GPR_REPLICA_KEYMODE((subscription->values[i])->addr_mode); if (0x00 == key_mode) { /* default key address mode to OR */ key_mode = subscription->values[i]->addr_mode = subscription->values[i]->addr_mode | ORTE_GPR_KEYS_OR; } ival->addr_mode = ORTE_GPR_REPLICA_REMOVE_OVERWRITE(subscription->values[i]->addr_mode); if (NULL != subscription->values[i]->tokens && 0 < subscription->values[i]->num_tokens) { num_tokens = subscription->values[i]->num_tokens; /* indicates non-NULL terminated list */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_itag_list(&tokentags, ival->seg, subscription->values[i]->tokens, &num_tokens))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sub); OBJ_RELEASE(ival); return rc; } if (ORTE_SUCCESS != (rc = orte_value_array_set_size(&(ival->tokentags), (orte_std_cntr_t)num_tokens))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sub); OBJ_RELEASE(ival); return rc; } for (j=0; j < num_tokens; j++) { ORTE_VALUE_ARRAY_SET_ITEM(&(ival->tokentags), orte_gpr_replica_itag_t, j, tokentags[j]); } free(tokentags); tokentags = NULL; } if (NULL != subscription->values[i]->keyvals && 0 < subscription->values[i]->cnt) { num_keys = subscription->values[i]->cnt; if (ORTE_SUCCESS != (rc = orte_value_array_set_size(&(ival->keytags), num_keys))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sub); OBJ_RELEASE(ival); return rc; } for (j=0; j < num_keys; j++) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_create_itag(&itag, ival->seg, subscription->values[i]->keyvals[j]->key))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sub); OBJ_RELEASE(ival); return rc; } ORTE_VALUE_ARRAY_SET_ITEM(&(ival->keytags), orte_gpr_replica_itag_t, j, itag); } } /* add the object to the subscription's value pointer array */ if (0 > (rc = orte_pointer_array_add(&(ival->index), sub->values, ival))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(sub); OBJ_RELEASE(ival); return ORTE_ERR_OUT_OF_RESOURCE; } (sub->num_values)++; } /* add the object to the replica's subscriptions pointer array */ if (0 > (rc = orte_pointer_array_add(&(sub->index), orte_gpr_replica.subscriptions, sub))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(sub); return ORTE_ERR_OUT_OF_RESOURCE; } (orte_gpr_replica.num_subs)++;ADDREQ: /* see if this requestor and subscription id is already attached to * this subscription - if so, ignore it to avoid duplicates */ reqs = (orte_gpr_replica_requestor_t**)(sub->requestors)->addr; for (i=0, j=0; j < sub->num_requestors && i < (sub->requestors)->size; i++) { if (NULL != reqs[i]) { j++; if ((NULL == reqs[i]->requestor && NULL != requestor) || (NULL != reqs[i]->requestor && NULL == requestor)) { continue; } if (reqs[i]->idtag == subscription->id && ((NULL == reqs[i]->requestor && NULL == requestor) || (ORTE_EQUAL == orte_dss.compare(reqs[i]->requestor, requestor, ORTE_NAME)))) { /* found this requestor - do not add it again */ goto DONESUB; } } } /* get here if requestor is not already on this subscription * add this requestor to the subscription */ req = OBJ_NEW(orte_gpr_replica_requestor_t); if (NULL == req) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } if (NULL != requestor) { if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(req->requestor), requestor, ORTE_NAME))) { ORTE_ERROR_LOG(rc); return rc; } } else { req->requestor = NULL; } if (0 > (rc = orte_pointer_array_add(&(req->index), sub->requestors, req))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } (sub->num_requestors)++; /* store the requestor's subscription id so they can ask * us to cancel their subscription at a later time, * if they choose to do so, and so that we can tell * them which callback function to use when we send * them a datagram */ req->idtag = subscription->id; /* * New subscription, send initial values? */ if(sub->active && subscription->action & ORTE_GPR_NOTIFY_PRE_EXISTING) { if(ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(sub, NULL))) { ORTE_ERROR_LOG(rc); return rc; } }DONESUB: /* record where the subscription went */ *subptr = sub; return ORTE_SUCCESS;}intorte_gpr_replica_register_trigger(orte_gpr_replica_trigger_t **trigptr, orte_process_name_t *requestor, orte_gpr_trigger_t *trigger){ orte_gpr_replica_trigger_t *trig, **trigs; int rc; orte_std_cntr_t i, j, k, m, num_tokens, index; orte_gpr_replica_addr_mode_t tok_mode, key_mode; orte_gpr_replica_segment_t *seg; orte_gpr_replica_container_t **cptr, *cptr2; orte_gpr_replica_itag_t itag, *tokentags=NULL; orte_gpr_replica_itagval_t *iptr; orte_gpr_replica_counter_t *cntr; orte_gpr_replica_trigger_requestor_t *req, **reqs; bool found; OPAL_TRACE(3); /* set a default response value */ *trigptr = NULL; /* if this is a named trigger, see if that name has * already been entered on the replica. If it has, then we * can simply return the pointer to the existing trigger. */ if (NULL != trigger->name) { /* look for this name on current list */ trigs = (orte_gpr_replica_trigger_t**)(orte_gpr_replica.triggers)->addr; for (k=0, j=0; j < orte_gpr_replica.num_trigs && k < (orte_gpr_replica.triggers)->size; k++) { if (NULL != trigs[k]) { j++; if (trigs[k]->name && NULL != trigger->name && 0 == strcmp(trigs[k]->name, trigger->name)) { /* found name on list - add recipient's * trigger info to that trigger */ trig = trigs[k]; goto ADDREQ; } } } } /* Either this is NOT a named trigger, or it is named * but that name is NOT on the current list of triggers. * Either way, we add this trigger to the replica's list. * * NOTE that you CANNOT add a subscription to a pre-entered non-named * trigger - even if all the trigger specifications are * identical. This is done in the interest of speed as checking * all the specifications would take some time. Triggers are * "named" because they are intended to be used by multiple processes. * Un-named triggers are, therefore, assumed to be specialty * triggers that do not merit such consideration.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -