📄 gpr_replica_trig_ops_fn.c
字号:
if (ORTE_GPR_TRIG_CMP_LEVELS & trig->action) { /* compare the levels of the counters */ cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); first = true; fire = true; for (i=0, j=0; j < trig->num_counters && i < (trig->counters)->size && fire; i++) { if (NULL != cntr[i]) { j++; if (first) { base_value = cntr[i]->iptr; base_type = cntr[i]->iptr->value->type; first = false; } else { if (base_type != cntr[i]->iptr->value->type) { ORTE_ERROR_LOG(ORTE_ERR_COMPARE_FAILURE); return ORTE_ERR_COMPARE_FAILURE; } if (ORTE_EQUAL != orte_dss.compare(base_value->value->data, cntr[i]->iptr->value->data, base_type)) { fire = false; } } } } if (fire) { /* all levels were equal */ goto FIRED; } return ORTE_SUCCESS; } else if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { /* see if counters are at a level */ cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); fire = true; for (i=0, j=0; j < trig->num_counters && i < (trig->counters)->size && fire; i++) { if (NULL != cntr[i]) { j++; if (cntr[i]->iptr->value->type != cntr[i]->trigger_level.value->type) { ORTE_ERROR_LOG(ORTE_ERR_COMPARE_FAILURE); return ORTE_ERR_COMPARE_FAILURE; } if (ORTE_EQUAL != orte_dss.compare(cntr[i]->iptr->value->data, cntr[i]->trigger_level.value->data, cntr[i]->iptr->value->type)) { fire = false; } } } if (fire) { /* all counters at specified trigger level */ goto FIRED; } return ORTE_SUCCESS; } return ORTE_SUCCESS; /* neither cmp nor at level set */FIRED: /* if this trigger wants everything routed through a "master", then we register * this as a trigger_callback. */ if (NULL != trig->master) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_trigger_callback(trig))) { ORTE_ERROR_LOG(rc); return rc; } /* for each subscription assocated with this trigger, check to see if * the subscription needs any special treatment */ subs = (orte_gpr_replica_subscription_t**)(trig->subscriptions)->addr; for (i=0, j=0; j < trig->num_subscriptions && i < (trig->subscriptions)->size; i++) { if (NULL != subs[i]) { j++; /* if ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG set, set the subscription * "active" to indicate that trigger fired */ if (ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG & subs[i]->action) { subs[i]->active = true; } /* if ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG set, then set the flag * so it can be cleaned up later */ if (ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG & subs[i]->action) { subs[i]->cleanup = true; } } } } else { /* for each subscription associated with this trigger, we need to * register a callback to the requestor that returns the specified * data */ subs = (orte_gpr_replica_subscription_t**)(trig->subscriptions)->addr; for (i=0, j=0; j < trig->num_subscriptions && i < (trig->subscriptions)->size; i++) { if (NULL != subs[i]) { j++; if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(subs[i], NULL))) { ORTE_ERROR_LOG(rc); return rc; } /* if ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG set, set the subscription * "active" to indicate that trigger fired */ if (ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG & subs[i]->action) { subs[i]->active = true; } /* if ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG set, then set the flag * so it can be cleaned up later */ if (ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG & subs[i]->action) { subs[i]->cleanup = true; } } } } /* set the processing flag so we don't go into infinite loop if * any callback functions modify the registry */ trig->processing = true; /* if this trigger was a one-shot, set flag to indicate it has fired * so it can be cleaned up later */ if (ORTE_GPR_TRIG_ONE_SHOT & trig->action) { trig->one_shot_fired = true; } return ORTE_SUCCESS;}/* * Check subscriptions to see if any were fired by any of the * recorded actions that have occurred on the registry. */int orte_gpr_replica_check_subscription(orte_gpr_replica_subscription_t *sub){ orte_gpr_replica_action_taken_t **ptr; orte_std_cntr_t i, j, k; orte_gpr_value_t *value; orte_gpr_addr_mode_t addr_mode; int rc=ORTE_SUCCESS; OPAL_TRACE(3); /* When entering this function, we know that the specified * subscription is active since that was tested above. What we now need * to determine is whether or not any of the data * objects pointed to by the subscription were involved in a change. The * subscription could describe a container - e.g., the subscriber might want to know * if anything gets added to a container - or could be a container plus one or * more keys when the subscriber wants to know when a specific value gets changed. */ ptr = (orte_gpr_replica_action_taken_t**)((orte_gpr_replica_globals.acted_upon)->addr); for (i=0, k=0; k < orte_gpr_replica_globals.num_acted_upon && i < (orte_gpr_replica_globals.acted_upon)->size; i++) { if (NULL != ptr[i]) { k++; if ( (((sub->action & ORTE_GPR_NOTIFY_ADD_ENTRY) && (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED)) || ((sub->action & ORTE_GPR_NOTIFY_DEL_ENTRY) && (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED)) || ((sub->action & ORTE_GPR_NOTIFY_VALUE_CHG) && (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO)) || ((sub->action & ORTE_GPR_NOTIFY_VALUE_CHG) && (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM)) || ((sub->action & ORTE_GPR_NOTIFY_VALUE_CHG) && (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED))) && orte_gpr_replica_check_notify_matches(&addr_mode, sub, ptr[i])) { /* if the notify matched one of the subscription values, * then the address mode will have * been stored for us. we now need to send back * the segment name and tokens from the container that is * being addressed! */ /* Construct the base structure for returned data so it can be * sent to the user, if required */ if (ORTE_GPR_REPLICA_STRIPPED(addr_mode)) { if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&value, addr_mode, NULL, 1, 0))) { ORTE_ERROR_LOG(rc); return rc; } } else { if (ORTE_SUCCESS != (rc = orte_gpr_base_create_value(&value, addr_mode, ptr[i]->seg->name, 1, ptr[i]->cptr->num_itags))) { ORTE_ERROR_LOG(rc); return rc; } for (j=0; j < value->num_tokens; j++) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( &(value->tokens[j]), ptr[i]->seg, ptr[i]->cptr->itags[j]))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } } } /* send back the recorded data */ value->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( &((value->keyvals[0])->key), ptr[i]->seg, ptr[i]->iptr->itag))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } (value->keyvals[0])->value = OBJ_NEW(orte_data_value_t); if (NULL == value->keyvals[0]->value) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(value); return ORTE_ERR_OUT_OF_RESOURCE; } value->keyvals[0]->value->type = ptr[i]->iptr->value->type; if (ORTE_SUCCESS != (rc = orte_dss.copy(&((value->keyvals[0]->value)->data), ptr[i]->iptr->value->data, ptr[i]->iptr->value->type))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(sub, value))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } /* register that this subscription is being processed * to avoid potential infinite loops */ sub->processing = true; } } }CLEANUP: return rc;}bool orte_gpr_replica_check_notify_matches(orte_gpr_addr_mode_t *addr_mode, orte_gpr_replica_subscription_t *sub, orte_gpr_replica_action_taken_t *ptr){ orte_gpr_replica_addr_mode_t tokmod; orte_std_cntr_t i, j; orte_gpr_replica_ivalue_t **ivals; OPAL_TRACE(3); /* we need to run through all of this subscription's defined * values to see if any of them match the acted upon one. */ ivals = (orte_gpr_replica_ivalue_t**)(sub->values)->addr; for (i=0, j=0; j < sub->num_values && i < (sub->values)->size; i++) { if (NULL != ivals[i]) { j++; /* first, check to see if the segments match */ if (ivals[i]->seg != ptr->seg) { /* don't match - return false */ continue; } /* next, check to see if the containers match */ tokmod = ORTE_GPR_REPLICA_TOKMODE(ivals[i]->addr_mode); if (!orte_gpr_replica_check_itag_list(tokmod, orte_value_array_get_size(&(ivals[i]->tokentags)), ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->tokentags), orte_gpr_replica_itag_t), (ptr->cptr)->num_itags, (ptr->cptr)->itags)) { /* not this container */ continue; } /* next, check to see if this keyval was on the list */ if (orte_gpr_replica_check_itag_list(ORTE_GPR_REPLICA_OR, orte_value_array_get_size(&(ivals[i]->keytags)), ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->keytags), orte_gpr_replica_itag_t), 1, &(ptr->iptr->itag))) { /* keyval is on list - return the address mode */ *addr_mode = ivals[i]->addr_mode; return true; } } } /* if we get here, then the acted upon value was * nowhere on the subscription's defined values */ return false;}int orte_gpr_replica_purge_subscriptions(orte_process_name_t *proc){#if 0 orte_gpr_replica_trigger_t **trig; orte_std_cntr_t i; int rc; OPAL_TRACE(3); /* locate any notification events that have proc as the requestor * and remove them */ trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); for (i=0; i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trig[i]) { if (NULL == proc && NULL == trig[i]->requestor) { if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers, trig[i]->index, NULL))) { ORTE_ERROR_LOG(rc); return rc; } OBJ_RELEASE(trig); } else if (NULL != proc && NULL != trig[i]->requestor && ORTE_EQUAL == orte_dss.compare(Oproc, trig[i]->requestor, ORTE_NAME)) { if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers, trig[i]->index, NULL))) { ORTE_ERROR_LOG(rc); return rc; } OBJ_RELEASE(trig); } } }#endif return ORTE_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -