⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gpr_replica_messaging_fn.c

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 C
📖 第 1 页 / 共 2 页
字号:
                    ORTE_ERROR_LOG(rc);                    return rc;                }                /*                 * store the data in the message                 */                if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_trigger_msg(subs[i],                                                cb->message, cnt, values))) {                    ORTE_ERROR_LOG(rc);                    return rc;                }                /* release the storage */                for (k=0; k < cnt; k++) OBJ_RELEASE(values[k]);                if (NULL != values) free(values);            } else {                /* in the case of a non-named subscription, we know that someone                 * has attached a subscription to this trigger, and that the                 * requestor needs the data to be returned directly to them. This                 * occurs in the case of orterun, which attaches subscriptions to                 * the standard triggers so it can monitor the progress of a job                 * it has launched. To facilitate this, we register a separate                 * callback for this subscription                 */                if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(subs[i], NULL))) {                    ORTE_ERROR_LOG(rc);                    return rc;                }            }        }    }    return ORTE_SUCCESS;}int orte_gpr_replica_define_callback(orte_gpr_notify_msg_type_t msg_type,                                     orte_gpr_replica_callbacks_t **cbptr,                                     orte_process_name_t *recipient){    orte_gpr_replica_callbacks_t *cb;    int rc;    /* see if a callback has already been registered for this recipient */    for (cb = (orte_gpr_replica_callbacks_t*)opal_list_get_first(&(orte_gpr_replica.callbacks));         cb != (orte_gpr_replica_callbacks_t*)opal_list_get_end(&(orte_gpr_replica.callbacks));         cb = (orte_gpr_replica_callbacks_t*)opal_list_get_next(cb)) {        /* must check to see if both the recipient is the same AND that the         * message type being sent is identical (i.e., that messages going back         * to trigger callbacks do NOT get mixed with messages going back to         * subscription callbacks). This is critical as the deliver_notify_msg         * functions handle these message types in different ways         */         if (((NULL == recipient && NULL == cb->requestor) &&              (msg_type == cb->message->msg_type)) ||             (((NULL != recipient && NULL != cb->requestor) &&              (ORTE_EQUAL == orte_dss.compare(recipient, cb->requestor, ORTE_NAME))) &&               (msg_type == cb->message->msg_type))) {             /* okay, a callback has been registered to send data to this              * recipient - return this location              */             *cbptr = cb;             return ORTE_SUCCESS;         }    }    /* this is going to somebody new - create a new callback     * for this recipient     */    cb = OBJ_NEW(orte_gpr_replica_callbacks_t);    if (NULL == cb) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    opal_list_append(&orte_gpr_replica.callbacks, &cb->item);    /* construct the message */    cb->message = OBJ_NEW(orte_gpr_notify_message_t);    if (NULL == cb->message) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    cb->message->msg_type = msg_type;    if (NULL == recipient) {        cb->requestor = NULL;    } else {        if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(cb->requestor), recipient, ORTE_NAME))) {            ORTE_ERROR_LOG(rc);            return rc;        }    }    /* return the pointer to the new callback */    *cbptr = cb;    return ORTE_SUCCESS;}int orte_gpr_replica_store_value_in_msg(orte_gpr_replica_requestor_t *req,                                        orte_gpr_notify_message_t *msg,                                        char *sub_name,                                        orte_std_cntr_t cnt,                                        orte_gpr_value_t **values){    orte_std_cntr_t i, j, k, index;    orte_gpr_notify_data_t **data, *dptr;    /* check to see if this data is going to the same place as     * any prior data on the message. if so, then we add the values     * to that existing data structure. if not, then we realloc to     * establish a new data structure and store the data there     */    data = (orte_gpr_notify_data_t**)(msg->data)->addr;    for (i=0, k=0; k < msg->cnt &&                   i < (msg->data)->size; i++) {        if (NULL != data[i]) {            k++;            if (data[i]->id == req->idtag) { /* going to the same place */                for (j=0; j < cnt; j++) {                    if (0 > orte_pointer_array_add(&index, data[i]->values, values[j])) {                        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                        return ORTE_ERR_OUT_OF_RESOURCE;                    }                    /* must "retain" the value object to ensure that it is                    * there for this datagram. Since we are only storing                    * pointers to the object (and not actually copying it),                    * datagrams may wind up sharing the object. Hence, when                    * a datagram is released, it will release the object. Without                    * the retain, the next datagram that shares that object                    * will see trash                    */                    OBJ_RETAIN(values[j]);                }                data[i]->cnt += cnt;                return ORTE_SUCCESS;            }        }    }    /* no prior matching data found, so add another data location to     * the message and store the values there     */    dptr = OBJ_NEW(orte_gpr_notify_data_t);    if (NULL == dptr) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    /* set the name of the subscription, if provided */    if (NULL != sub_name) {        dptr->target = strdup(sub_name);    }    dptr->id = req->idtag;    if (0 > orte_pointer_array_add(&index, msg->data, dptr)) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    (msg->cnt)++;    for (j=0; j < cnt; j++) {        if (0 > orte_pointer_array_add(&index, dptr->values, values[j])) {            ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);            return ORTE_ERR_OUT_OF_RESOURCE;        }        /* must "retain" the value object to ensure that it is         * there for this datagram. Since we are only storing         * pointers to the object (and not actually copying it),         * datagrams may wind up sharing the object. Hence, when         * a datagram is released, it will release the object. Without         * the retain, the next datagram that shares that object         * will see trash         */        OBJ_RETAIN(values[j]);    }    dptr->cnt = cnt;    return ORTE_SUCCESS;}static int orte_gpr_replica_store_value_in_trigger_msg(orte_gpr_replica_subscription_t *sub,                                                       orte_gpr_notify_message_t *msg,                                                       orte_std_cntr_t cnt,                                                       orte_gpr_value_t **values){    orte_std_cntr_t i, j, k, index;    orte_gpr_notify_data_t **data, *dptr;    /* check to see if this data is going to the same place as     * any prior data on the message. if so, then we add the values     * to that existing data structure. if not, then we realloc to     * establish a new data structure and store the data there     */    data = (orte_gpr_notify_data_t**)(msg->data)->addr;    for (i=0, k=0; k < msg->cnt &&                   i < (msg->data)->size; i++) {        if (NULL != data[i]) {            k++;            if ((NULL == data[i]->target && NULL == sub) ||                 (NULL != data[i]->target && NULL != sub->name &&                 0 == strcmp(data[i]->target, sub->name))) { /* going to the same place */                for (j=0; j < cnt; j++) {                    if (0 > orte_pointer_array_add(&index, data[i]->values, values[j])) {                        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                        return ORTE_ERR_OUT_OF_RESOURCE;                    }                    /* must "retain" the value object to ensure that it is                    * there for this datagram. Since we are only storing                    * pointers to the object (and not actually copying it),                    * datagrams may wind up sharing the object. Hence, when                    * a datagram is released, it will release the object. Without                    * the retain, the next datagram that shares that object                    * will see trash                    */                    OBJ_RETAIN(values[j]);                }                data[i]->cnt += cnt;                return ORTE_SUCCESS;            }        }    }    /* no prior matching data found, so add another data location to     * the message and store the values there     */    dptr = OBJ_NEW(orte_gpr_notify_data_t);    if (NULL == dptr) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    if (NULL != sub && NULL != sub->name) {        dptr->target = strdup(sub->name);    }    if (0 > orte_pointer_array_add(&index, msg->data, dptr)) {        ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);        return ORTE_ERR_OUT_OF_RESOURCE;    }    (msg->cnt)++;    for (j=0; j < cnt; j++) {        if (0 > orte_pointer_array_add(&index, dptr->values, values[j])) {            ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);            return ORTE_ERR_OUT_OF_RESOURCE;        }        /* must "retain" the value object to ensure that it is         * there for this datagram. Since we are only storing         * pointers to the object (and not actually copying it),         * datagrams may wind up sharing the object. Hence, when         * a datagram is released, it will release the object. Without         * the retain, the next datagram that shares that object         * will see trash         */        OBJ_RETAIN(values[j]);    }    dptr->cnt = cnt;    return ORTE_SUCCESS;}static int orte_gpr_replica_get_callback_data(orte_gpr_value_t ***ret_values, orte_std_cntr_t *cnt,                                              orte_gpr_replica_subscription_t *sub){    orte_gpr_value_t **vals, **values;    orte_gpr_replica_ivalue_t **ivals;    orte_std_cntr_t i, j, k, num_tokens, num_keys, interim, count;    int rc;    /* setup default error returns */    *ret_values = NULL;    *cnt = 0;    /* get the data off the registry. since a     * subscription can have multiple data sources specified, we     * have to loop through those sources, constructing an aggregated     * array of data values that we can work with in composing the     * final message     */    ivals = (orte_gpr_replica_ivalue_t**)(sub->values)->addr;    count = 0;    values = NULL;    for (i=0, j=0; j < sub->num_values &&                   i < (sub->values)->size; i++) {        if (NULL != ivals[i]) {            j++;            num_tokens = orte_value_array_get_size(&(ivals[i]->tokentags));            num_keys = orte_value_array_get_size(&(ivals[i]->keytags));            /* get the data for this description off the registry */            if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_fn(ivals[i]->addr_mode,                    ivals[i]->seg,                    ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->tokentags), orte_gpr_replica_itag_t),                    num_tokens,                    ORTE_VALUE_ARRAY_GET_BASE(&(ivals[i]->keytags), orte_gpr_replica_itag_t),                    num_keys,                    &interim, &vals))) {                ORTE_ERROR_LOG(rc);                return rc;            }            /* if we don't get any data back, just continue - don't             * try to add it to the values since that would cause a             * zero-byte malloc             */            if (0 == interim) {                continue;            }            /* add these results to those we have already obtained */            if (0 == count) { /* first time through */                values = (orte_gpr_value_t**)malloc(interim *                                                sizeof(orte_gpr_value_t*));                if (NULL == values) {                    ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                    return ORTE_ERR_OUT_OF_RESOURCE;                }            } else {                /* reallocate values array */                values = (orte_gpr_value_t**)realloc(values,                                (count+interim)*sizeof(orte_gpr_value_t*));                if (NULL == values) {                    ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);                    return ORTE_ERR_OUT_OF_RESOURCE;                }            }            /* add data to end of array */            for (k=0; k < interim; k++) {                values[k+count] = vals[k];            }            /* release the array of pointers - the pointers themselves             * will remain "alive" in the values array to be released             * later             */            free(vals);            /* update the count */            count += interim;        }    }    *ret_values = values;    *cnt = count;    return ORTE_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -