📄 gpr_replica_messaging_fn.c
字号:
ORTE_ERROR_LOG(rc); return rc; } /* * store the data in the message */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_trigger_msg(subs[i], cb->message, cnt, values))) { ORTE_ERROR_LOG(rc); return rc; } /* release the storage */ for (k=0; k < cnt; k++) OBJ_RELEASE(values[k]); if (NULL != values) free(values); } else { /* in the case of a non-named subscription, we know that someone * has attached a subscription to this trigger, and that the * requestor needs the data to be returned directly to them. This * occurs in the case of orterun, which attaches subscriptions to * the standard triggers so it can monitor the progress of a job * it has launched. To facilitate this, we register a separate * callback for this subscription */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(subs[i], NULL))) { ORTE_ERROR_LOG(rc); return rc; } } } } return ORTE_SUCCESS;}int orte_gpr_replica_define_callback(orte_gpr_notify_msg_type_t msg_type, orte_gpr_replica_callbacks_t **cbptr, orte_process_name_t *recipient){ orte_gpr_replica_callbacks_t *cb; int rc; /* see if a callback has already been registered for this recipient */ for (cb = (orte_gpr_replica_callbacks_t*)opal_list_get_first(&(orte_gpr_replica.callbacks)); cb != (orte_gpr_replica_callbacks_t*)opal_list_get_end(&(orte_gpr_replica.callbacks)); cb = (orte_gpr_replica_callbacks_t*)opal_list_get_next(cb)) { /* must check to see if both the recipient is the same AND that the * message type being sent is identical (i.e., that messages going back * to trigger callbacks do NOT get mixed with messages going back to * subscription callbacks). This is critical as the deliver_notify_msg * functions handle these message types in different ways */ if (((NULL == recipient && NULL == cb->requestor) && (msg_type == cb->message->msg_type)) || (((NULL != recipient && NULL != cb->requestor) && (ORTE_EQUAL == orte_dss.compare(recipient, cb->requestor, ORTE_NAME))) && (msg_type == cb->message->msg_type))) { /* okay, a callback has been registered to send data to this * recipient - return this location */ *cbptr = cb; return ORTE_SUCCESS; } } /* this is going to somebody new - create a new callback * for this recipient */ cb = OBJ_NEW(orte_gpr_replica_callbacks_t); if (NULL == cb) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } opal_list_append(&orte_gpr_replica.callbacks, &cb->item); /* construct the message */ cb->message = OBJ_NEW(orte_gpr_notify_message_t); if (NULL == cb->message) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } cb->message->msg_type = msg_type; if (NULL == recipient) { cb->requestor = NULL; } else { if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(cb->requestor), recipient, ORTE_NAME))) { ORTE_ERROR_LOG(rc); return rc; } } /* return the pointer to the new callback */ *cbptr = cb; return ORTE_SUCCESS;}int orte_gpr_replica_store_value_in_msg(orte_gpr_replica_requestor_t *req, orte_gpr_notify_message_t *msg, char *sub_name, orte_std_cntr_t cnt, orte_gpr_value_t **values){ orte_std_cntr_t i, j, k, index; orte_gpr_notify_data_t **data, *dptr; /* check to see if this data is going to the same place as * any prior data on the message. if so, then we add the values * to that existing data structure. if not, then we realloc to * establish a new data structure and store the data there */ data = (orte_gpr_notify_data_t**)(msg->data)->addr; for (i=0, k=0; k < msg->cnt && i < (msg->data)->size; i++) { if (NULL != data[i]) { k++; if (data[i]->id == req->idtag) { /* going to the same place */ for (j=0; j < cnt; j++) { if (0 > orte_pointer_array_add(&index, data[i]->values, values[j])) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* must "retain" the value object to ensure that it is * there for this datagram. Since we are only storing * pointers to the object (and not actually copying it), * datagrams may wind up sharing the object. Hence, when * a datagram is released, it will release the object. Without * the retain, the next datagram that shares that object * will see trash */ OBJ_RETAIN(values[j]); } data[i]->cnt += cnt; return ORTE_SUCCESS; } } } /* no prior matching data found, so add another data location to * the message and store the values there */ dptr = OBJ_NEW(orte_gpr_notify_data_t); if (NULL == dptr) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* set the name of the subscription, if provided */ if (NULL != sub_name) { dptr->target = strdup(sub_name); } dptr->id = req->idtag; if (0 > orte_pointer_array_add(&index, msg->data, dptr)) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } (msg->cnt)++; for (j=0; j < cnt; j++) { if (0 > orte_pointer_array_add(&index, dptr->values, values[j])) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* must "retain" the value object to ensure that it is * there for this datagram. Since we are only storing * pointers to the object (and not actually copying it), * datagrams may wind up sharing the object. Hence, when * a datagram is released, it will release the object. Without * the retain, the next datagram that shares that object * will see trash */ OBJ_RETAIN(values[j]); } dptr->cnt = cnt; return ORTE_SUCCESS;}static int orte_gpr_replica_store_value_in_trigger_msg(orte_gpr_replica_subscription_t *sub, orte_gpr_notify_message_t *msg, orte_std_cntr_t cnt, orte_gpr_value_t **values){ orte_std_cntr_t i, j, k, index; orte_gpr_notify_data_t **data, *dptr; /* check to see if this data is going to the same place as * any prior data on the message. if so, then we add the values * to that existing data structure. if not, then we realloc to * establish a new data structure and store the data there */ data = (orte_gpr_notify_data_t**)(msg->data)->addr; for (i=0, k=0; k < msg->cnt && i < (msg->data)->size; i++) { if (NULL != data[i]) { k++; if ((NULL == data[i]->target && NULL == sub) || (NULL != data[i]->target && NULL != sub->name && 0 == strcmp(data[i]->target, sub->name))) { /* going to the same place */ for (j=0; j < cnt; j++) { if (0 > orte_pointer_array_add(&index, data[i]->values, values[j])) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* must "retain" the value object to ensure that it is * there for this datagram. Since we are only storing * pointers to the object (and not actually copying it), * datagrams may wind up sharing the object. Hence, when * a datagram is released, it will release the object. Without * the retain, the next datagram that shares that object * will see trash */ OBJ_RETAIN(values[j]); } data[i]->cnt += cnt; return ORTE_SUCCESS; } } } /* no prior matching data found, so add another data location to * the message and store the values there */ dptr = OBJ_NEW(orte_gpr_notify_data_t); if (NULL == dptr) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } if (NULL != sub && NULL != sub->name) { dptr->target = strdup(sub->name); } if (0 > orte_pointer_array_add(&index, msg->data, dptr)) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } (msg->cnt)++; for (j=0; j < cnt; j++) { if (0 > orte_pointer_array_add(&index, dptr->values, values[j])) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } /* must "retain" the value object to ensure that it is * there for this datagram. Since we are only storing * pointers to the object (and not actually copying it), * datagrams may wind up sharing the object. Hence, when * a datagram is released, it will release the object. Without * the retain, the next datagram that shares that object * will see trash */ OBJ_RETAIN(values[j]); } dptr->cnt = cnt; return ORTE_SUCCESS;}static int orte_gpr_replica_get_callback_data(orte_gpr_value_t ***ret_values, orte_std_cntr_t *cnt, orte_gpr_replica_subscription_t *sub){ orte_gpr_value_t **vals, **values; orte_gpr_replica_ivalue_t **ivals; orte_std_cntr_t i, j, k, num_tokens, num_keys, interim, count; int rc; /* setup default error returns */ *ret_values = NULL; *cnt = 0; /* get the data off the registry. since a * subscription can have multiple data sources specified, we * have to loop through those sources, constructing an aggregated * array of data values that we can work with in composing the * final message */ ivals = (orte_gpr_replica_ivalue_t**)(sub->values)->addr; count = 0; values = NULL; for (i=0, j=0; j < sub->num_values && i < (sub->values)->size; i++) { if (NULL != ivals[i]) { j++; num_tokens = orte_value_array_get_size(&(ivals[i]->tokentags)); num_keys = orte_value_array_get_size(&(ivals[i]->keytags)); /* get the data for this description off the registry */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_fn(ivals[i]->addr_mode, ivals[i]->seg, ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->tokentags), orte_gpr_replica_itag_t), num_tokens, ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->keytags), orte_gpr_replica_itag_t), num_keys, &interim, &vals))) { ORTE_ERROR_LOG(rc); return rc; } /* if we don't get any data back, just continue - don't * try to add it to the values since that would cause a * zero-byte malloc */ if (0 == interim) { continue; } /* add these results to those we have already obtained */ if (0 == count) { /* first time through */ values = (orte_gpr_value_t**)malloc(interim * sizeof(orte_gpr_value_t*)); if (NULL == values) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } } else { /* reallocate values array */ values = (orte_gpr_value_t**)realloc(values, (count+interim)*sizeof(orte_gpr_value_t*)); if (NULL == values) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } } /* add data to end of array */ for (k=0; k < interim; k++) { values[k+count] = vals[k]; } /* release the array of pointers - the pointers themselves * will remain "alive" in the values array to be released * later */ free(vals); /* update the count */ count += interim; } } *ret_values = values; *cnt = count; return ORTE_SUCCESS;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -