📄 gpr_replica_trig_ops_fn.c
字号:
goto PROCESS; } } } } } /* if we arrive here, then we were * unable to find a matching subscription. report that fact * and exit */ ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); return ORTE_ERR_NOT_FOUND;PROCESS: /* remove the specified requestor. if this was the last * requestor on this subscription, remove the subscription * as well */ /* must release the requestor object PRIOR to setting * the indexed location to NULL or we lose the pointer */ i = req->index; OBJ_RELEASE(req); orte_pointer_array_set_item(sub->requestors, i, NULL); (sub->num_requestors)--; if (0 == sub->num_requestors) { /* nobody left */ /* NOTE: cannot release sub here as we still need the * object so we can check for it in the list of triggers */ orte_pointer_array_set_item(orte_gpr_replica.subscriptions, sub->index, NULL); (orte_gpr_replica.num_subs)--; } /* check for this subscription throughout the list of triggers * and remove it wherever found */ trigs = (orte_gpr_replica_trigger_t**)(orte_gpr_replica.triggers)->addr; for (i=0, j=0; j < orte_gpr_replica.num_trigs && i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trigs[i]) { j++; found = false; subs = (orte_gpr_replica_subscription_t**)(trigs[i]->subscriptions)->addr; for (k=0, m=0; !found && m < trigs[i]->num_subscriptions && k < (trigs[i]->subscriptions)->size; k++) { if (NULL != subs[k]) { m++; if (sub == subs[k]) { /* match found */ orte_pointer_array_set_item(trigs[i]->subscriptions, k, NULL); (trigs[i]->num_subscriptions)--; /* if that was the last subscription on this trigger, then * remove the trigger - not needed any more */ if (0 == trigs[i]->num_subscriptions) { OBJ_RELEASE(trigs[i]); orte_pointer_array_set_item(orte_gpr_replica.triggers, i, NULL); } found = true; } } } } } /* done with sub, so now can release it if we need to do so */ if (0 == sub->num_requestors) OBJ_RELEASE(sub); /* ALL DONE! */ return ORTE_SUCCESS;}/* * Remove a trigger from the system. Note that the requestor only * knows their local trigger id, so that is what has been provided. * We need to find the specified combination of requestor and * trigger id, and then delete it */intorte_gpr_replica_remove_trigger(orte_process_name_t *requestor, orte_gpr_trigger_id_t id){ orte_gpr_replica_subscription_t **subs; orte_gpr_replica_trigger_requestor_t **reqs, *req; orte_gpr_replica_trigger_t **trigs, *trig; orte_std_cntr_t i, j, k, m; OPAL_TRACE(3); /* find this trigger on the list */ trigs = (orte_gpr_replica_trigger_t**)(orte_gpr_replica.triggers)->addr; for (i=0, j=0; j < orte_gpr_replica.num_trigs && i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trigs[i]) { j++; reqs = (orte_gpr_replica_trigger_requestor_t**)(trigs[i]->attached)->addr; for (k=0, m=0; m < trigs[i]->num_attached && k < (trigs[i]->attached)->size; k++) { if (NULL != reqs[k]) { m++; if (id == reqs[k]->idtag && ((NULL == requestor && NULL == reqs[k]->requestor) || (NULL != requestor && NULL != reqs[k]->requestor && ORTE_EQUAL == orte_dss.compare(reqs[k]->requestor, requestor, ORTE_NAME)))) { /* this is the trigger */ trig = trigs[i]; req = reqs[k]; goto PROCESS; } } } } } /* if we arrive here, then we had a remote requestor but were * unable to find a matching trigger. report that fact * and exit */ ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); return ORTE_ERR_NOT_FOUND;PROCESS: /* remove the specified requestor. if this was the last * requestor on this trigger, remove the trigger * as well */ /* must release the requestor object PRIOR to setting * the indexed location to NULL or we lose the pointer */ i = req->index; OBJ_RELEASE(req); orte_pointer_array_set_item(trig->attached, i, NULL); (trig->num_attached)--; if (0 == trig->num_attached) { /* nobody left */ /* NOTE: cannot release trig here as we still need the * object so we can clear any attached subscriptions */ orte_pointer_array_set_item(orte_gpr_replica.triggers, trig->index, NULL); (orte_gpr_replica.num_trigs)--; } /* now need to check any attached subscriptions. if the subscription * was flagged to be deleted after the trigger fired, or was flagged * to only start once the trigger had fired, then we need * to delete it here. otherwise, we leave the subscription alone. */ subs = (orte_gpr_replica_subscription_t**)(trig->subscriptions)->addr; for (i=0, j=0; j < trig->num_subscriptions && i < (trig->subscriptions)->size; i++) { if (NULL != subs[i]) { j++; if (ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG & subs[i]->action || ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG & subs[i]->action) { OBJ_RELEASE(subs[i]); } } } /* done processing trigger - can release it now, if we need to do so */ if (0 == trig->num_attached) OBJ_RELEASE(trig); /* ALL DONE! */ return ORTE_SUCCESS;}int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg, orte_gpr_replica_container_t *cptr, orte_gpr_replica_itagval_t *iptr, orte_gpr_replica_action_t action){ orte_gpr_replica_action_taken_t *new_action; orte_std_cntr_t index; int rc; OPAL_TRACE(3); new_action = OBJ_NEW(orte_gpr_replica_action_taken_t); if (NULL == new_action) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } new_action->action = action; /* store pointers to the affected itagval */ new_action->seg = seg; new_action->cptr = cptr; new_action->iptr = iptr; /* "retain" ALL of the respective objects so they can't disappear until * after we process the actions */ OBJ_RETAIN(seg); OBJ_RETAIN(cptr); OBJ_RETAIN(iptr); /* add the new action record to the array */ if (0 > (rc = orte_pointer_array_add(&index, orte_gpr_replica_globals.acted_upon, new_action))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* increment the number acted upon */ (orte_gpr_replica_globals.num_acted_upon)++; return ORTE_SUCCESS;}int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_iptr){ orte_gpr_replica_trigger_t **trig; orte_gpr_replica_counter_t **cntrs; orte_gpr_replica_itagval_t **old_iptrs; orte_std_cntr_t i, j, k, m, n, p; bool replaced; OPAL_TRACE(3); trig = (orte_gpr_replica_trigger_t**)((orte_gpr_replica.triggers)->addr); for (i=0, m=0; m < orte_gpr_replica.num_trigs && i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trig[i]) { m++; cntrs = (orte_gpr_replica_counter_t**)((trig[i]->counters)->addr); for (j=0, n=0; n < trig[i]->num_counters && j < (trig[i]->counters)->size; j++) { if (NULL != cntrs[j]) { n++; old_iptrs = (orte_gpr_replica_itagval_t**)((orte_gpr_replica_globals.srch_ival)->addr); for (k=0, p=0; p < orte_gpr_replica_globals.num_srch_ival && k < (orte_gpr_replica_globals.srch_ival)->size; k++) { replaced = false; if (NULL != old_iptrs[k]) { p++; if (old_iptrs[k] == cntrs[j]->iptr) { if (NULL == new_iptr || replaced) { orte_pointer_array_set_item(trig[i]->counters, j, NULL); (trig[i]->num_counters)--; } else if (!replaced) { cntrs[j]->iptr = new_iptr; replaced = true; } } } } } } } } return ORTE_SUCCESS;}int orte_gpr_replica_check_events(void){ orte_gpr_replica_trigger_t **trigs; orte_gpr_replica_subscription_t **subs; orte_gpr_replica_action_taken_t **ptr; orte_std_cntr_t i, j; int rc; OPAL_TRACE(3); /* we first check all the subscriptions to see if any are "active". * this needs to be done BEFORE we check triggers to ensure that * triggers that turn "on" a subscription don't cause duplicate * messages to their requestor */ subs = (orte_gpr_replica_subscription_t**)((orte_gpr_replica.subscriptions)->addr); for (i=0, j=0; j < orte_gpr_replica.num_subs && i < (orte_gpr_replica.subscriptions)->size; i++) { if (NULL != subs[i]) { j++; if (subs[i]->active) { /* this is an active subscription - check to see if * any of the recorded actions match its specified * conditions and process it if so */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscription(subs[i]))) { ORTE_ERROR_LOG(rc); return rc; } } /* if notify */ } } /* check for triggers that might have fired. * NOTE: MUST DO THIS *AFTER* THE NOTIFY CHECK. If the trigger was * set to start notifies after firing, then checking notifies * AFTER the triggers were processed causes the notification to * be sent twice. */ trigs = (orte_gpr_replica_trigger_t**)((orte_gpr_replica.triggers)->addr); for (i=0, j=0; j < orte_gpr_replica.num_trigs && i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trigs[i] && !trigs[i]->processing) { j++; /* check the trigger */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_trig(trigs[i]))) { ORTE_ERROR_LOG(rc); return rc; } } /* if trig not NULL */ } /* clean up the action record. The recorded actions from a given * call into the registry are only needed through the "check_events" * function call. */ ptr = (orte_gpr_replica_action_taken_t**)((orte_gpr_replica_globals.acted_upon)->addr); for (i=0, j=0; j < orte_gpr_replica_globals.num_acted_upon && i < (orte_gpr_replica_globals.acted_upon)->size; i++) { if (NULL != ptr[i]) { j++; OBJ_RELEASE(ptr[i]); } } orte_gpr_replica_globals.num_acted_upon = 0; return ORTE_SUCCESS;}/* * Check a trigger to see if it has fired based on the current * state of its counters */int orte_gpr_replica_check_trig(orte_gpr_replica_trigger_t *trig){ orte_gpr_replica_subscription_t **subs; orte_gpr_replica_counter_t **cntr; orte_gpr_replica_itagval_t *base_value=NULL; orte_data_type_t base_type = ORTE_UNDEF; bool first, fire; orte_std_cntr_t i, j; int rc; OPAL_TRACE(3);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -