📄 gpr_replica_trig_ops_fn.c
字号:
*/ /* see if another trigger is available */ if (ORTE_GPR_TRIGGER_ID_MAX-1 < orte_gpr_replica.num_trigs) { /* none left! */ ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } trig = OBJ_NEW(orte_gpr_replica_trigger_t); if (NULL == trig) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } trig->idtag = orte_gpr_replica.num_trigs; /* if a name for this trigger has been provided, copy it over */ if (NULL != trigger->name) { trig->name = strdup(trigger->name); } /* copy the action field */ trig->action = trigger->action; /* put this trigger on the replica's list */ if (0 > (rc = orte_pointer_array_add(&(trig->index), orte_gpr_replica.triggers, trig))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } (orte_gpr_replica.num_trigs)++; /* locate and setup the trigger's counters */ for (i=0; i < trigger->cnt; i++) { /* get this counter's addressing modes */ tok_mode = ORTE_GPR_REPLICA_TOKMODE((trigger->values[i])->addr_mode); if (0x00 == tok_mode) { /* default token address mode to AND */ tok_mode = ORTE_GPR_REPLICA_AND; } key_mode = ORTE_GPR_REPLICA_KEYMODE((trigger->values[i])->addr_mode); if (0x00 == key_mode) { /* default key address mode to OR */ key_mode = ORTE_GPR_REPLICA_OR; } /* locate this counter's segment - this is where the counter will be */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&seg, true, trigger->values[i]->segment))) { ORTE_ERROR_LOG(rc); OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); return rc; } /* convert the counter's tokens to an itaglist */ if (NULL != (trigger->values[i])->tokens && 0 < (trigger->values[i])->num_tokens) { num_tokens = (trigger->values[i])->num_tokens; /* indicates non-NULL terminated list */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_itag_list(&tokentags, seg, (trigger->values[i])->tokens, &num_tokens))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } } /* find the specified container(s) */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_containers(seg, tok_mode, tokentags, num_tokens))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } if (0 == orte_gpr_replica_globals.num_srch_cptr) { /* no existing container found - create one using all the tokens */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_create_container(&cptr2, seg, num_tokens, tokentags))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } /* ok, store all of this counter's values in the new container, adding a pointer to each * one in the trigger's counter array */ for (j=0; j < (trigger->values[i])->cnt; j++) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr2, (trigger->values[i])->keyvals[j]))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } cntr = OBJ_NEW(orte_gpr_replica_counter_t); if (NULL == cntr) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } cntr->seg = seg; cntr->cptr = cptr2; cntr->iptr = iptr; /* if the trigger is at a level, then the requestor MUST specify the * level in the provided keyval. Otherwise, we only need to store * the iptr since we will be comparing levels between multiple * counters */ if (trigger->action & ORTE_GPR_TRIG_AT_LEVEL) { if (NULL == trigger->values[i]->keyvals) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); rc = ORTE_ERR_BAD_PARAM; goto CLEANUP; } cntr->trigger_level.value = OBJ_NEW(orte_data_value_t); if (NULL == cntr->trigger_level.value) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto CLEANUP; } cntr->trigger_level.value->type = ((trigger->values[i])->keyvals[j])->value->type; if (ORTE_SUCCESS != (rc = orte_dss.copy(&((cntr->trigger_level.value)->data), ((trigger->values[i])->keyvals[j])->value->data, ((trigger->values[i])->keyvals[j])->value->type))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } } if (0 > orte_pointer_array_add(&index, trig->counters, cntr)) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto CLEANUP; } } trig->num_counters += (trigger->values[i])->cnt; } else { /* For each counter, go through the list of containers and see if it already exists in container. Only allow each counter to be identified once - error if either a counter is never found or already existing in more than one place. */ cptr = (orte_gpr_replica_container_t**)(orte_gpr_replica_globals.srch_cptr)->addr; for (j=0; j < (trigger->values[i])->cnt; j++) { found = false; if (ORTE_SUCCESS != orte_gpr_replica_dict_lookup(&itag, seg, ((trigger->values[i])->keyvals[j])->key)) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); return ORTE_ERR_NOT_FOUND; } for (k=0, m=0; m < orte_gpr_replica_globals.num_srch_cptr && k < (orte_gpr_replica_globals.srch_cptr)->size; k++) { if (NULL != cptr[k]) { m++; if (ORTE_SUCCESS == orte_gpr_replica_search_container( ORTE_GPR_REPLICA_OR, &itag, 1, cptr[k]) && 0 < orte_gpr_replica_globals.num_srch_ival) { /* this key already exists - make sure it's unique */ if (1 < orte_gpr_replica_globals.num_srch_ival || found) { /* not unique - error out */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); rc = ORTE_ERR_BAD_PARAM; goto CLEANUP; } /* okay, add to trigger's counter array */ found = true; iptr = (orte_gpr_replica_itagval_t*)((orte_gpr_replica_globals.srch_ival)->addr[0]); cntr = OBJ_NEW(orte_gpr_replica_counter_t); if (NULL == cntr) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } cntr->seg = seg; cntr->cptr = cptr[k]; cntr->iptr = iptr; /* if the trigger is at a level, then the requestor MUST specify the * level in the provided keyval. Otherwise, we only need to store * the iptr since we will be comparing levels between multiple * counters */ if (trigger->action & ORTE_GPR_TRIG_AT_LEVEL) { if (NULL == trigger->values[i]->keyvals) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); rc = ORTE_ERR_BAD_PARAM; goto CLEANUP; } cntr->trigger_level.value = OBJ_NEW(orte_data_value_t); if (NULL == cntr->trigger_level.value) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto CLEANUP; } cntr->trigger_level.value->type = ((trigger->values[i])->keyvals[j])->value->type; if (ORTE_SUCCESS != (rc = orte_dss.copy(&((cntr->trigger_level.value)->data), ((trigger->values[i])->keyvals[j])->value->data, ((trigger->values[i])->keyvals[j])->value->type))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } } if (0 > orte_pointer_array_add(&index, trig->counters, cntr)) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto CLEANUP; } (trig->num_counters)++; } /* end if found */ } /* end if cptr NULL */ } /* end for k */ if (!found) { /* specified counter never found - error */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); rc = ORTE_ERR_BAD_PARAM; goto CLEANUP; } /* end if found */ } /* end for j */ } /* end if/else container found */ } /* end for i */ADDREQ: /* see if this requestor and trigger id is already attached to * this trigger - if so, ignore it to avoid duplicates */ reqs = (orte_gpr_replica_trigger_requestor_t**)(trig->attached)->addr; for (i=0, j=0; j < trig->num_attached && i < (trig->attached)->size; i++) { if (NULL != reqs[i]) { j++; /* if one is NULL and the other isn't, then they can't possibly match */ if ((NULL == reqs[i]->requestor && NULL != requestor) || (NULL != reqs[i]->requestor && NULL == requestor)) { continue; } if (reqs[i]->idtag == trigger->id && ((NULL == reqs[i]->requestor && NULL == requestor) || (ORTE_EQUAL == orte_dss.compare(reqs[i]->requestor, requestor, ORTE_NAME)))) { /* found this requestor - do not add it again */ goto DONETRIG; } } } /* add this requestor to the trigger's list of "attached" callers */ req = OBJ_NEW(orte_gpr_replica_trigger_requestor_t); if (NULL == req) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } if (NULL != requestor) { if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(req->requestor), requestor, ORTE_NAME))) { ORTE_ERROR_LOG(rc); return rc; } } else { req->requestor = NULL; } if (0 > (rc = orte_pointer_array_add(&(req->index), trig->attached, req))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } (trig->num_attached)++; /* store the requestor's trigger id so they can ask * us to cancel their subscription at a later time, * if they choose to do so. */ req->idtag = trigger->id; /* see if the ROUTE_DATA_TO_ME flag is set. This indicates * that the requestor wants all data sent to them and * is assuming all responsibility for properly routing * the data */ if (ORTE_GPR_TRIG_ROUTE_DATA_THRU_ME & trig->action) { if (NULL == trig->master) { /* someone already requested this responsibility. * if I'm a singleton, this is NOT an error - the * initial "launch" has recorded the stage gate * triggers using the [-1,-1,-1] name, so we need to * overwrite that with my name so I get the notifications. */#if 0 if (orte_process_info.singleton || orte_process_info.seed) {opal_output(0, "Trigger master being redefined"); trig->master = req; } else { /* if i'm not a singleton, then this is an error - report it */ ORTE_ERROR_LOG(ORTE_ERR_NOT_AVAILABLE); } } else {#endif trig->master = req; } }DONETRIG: /* report the location of this trigger */ *trigptr = trig; /* record that we had success */ rc = ORTE_SUCCESS;CLEANUP: if (NULL != tokentags) { free(tokentags); } return rc;}/* * Remove a subscription from the system. Note that the requestor only * knows their local subscription id, so that is what has been provided. * We need to find the specified combination of requestor and * subscription id, and then delete it */intorte_gpr_replica_remove_subscription(orte_process_name_t *requestor, orte_gpr_subscription_id_t id){ orte_gpr_replica_subscription_t **subs, *sub; orte_gpr_replica_requestor_t **reqs, *req; orte_gpr_replica_trigger_t **trigs; orte_std_cntr_t i, j, k, m; bool found; OPAL_TRACE(3); /* find this subscription on the list */ subs = (orte_gpr_replica_subscription_t**)(orte_gpr_replica.subscriptions)->addr; for (i=0, j=0; j < orte_gpr_replica.num_subs && i < (orte_gpr_replica.subscriptions)->size; i++) { if (NULL != subs[i]) { j++; reqs = (orte_gpr_replica_requestor_t**)(subs[i]->requestors)->addr; for (k=0, m=0; m < subs[i]->num_requestors && k < (subs[i]->requestors)->size; k++) { if (NULL != reqs[k]) { m++; if (id == reqs[k]->idtag && ((NULL == requestor && NULL == reqs[k]->requestor) || (NULL != requestor && NULL != reqs[k]->requestor && ORTE_EQUAL == orte_dss.compare(reqs[k]->requestor, requestor, ORTE_NAME)))) { /* this is the subscription */ sub = subs[i]; req = reqs[k];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -