📄 cmslib_client.c
字号:
if (ret < 0) return SA_ERR_LIBRARY; *selectionObject = ret; return SA_OK;}SaErrorTsaMsgDispatch(const SaMsgHandleT *msgHandle, SaDispatchFlagsT dispatchFlags){ int ret; client_header_t *msg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } switch (dispatchFlags) { case SA_DISPATCH_ONE: read_and_queue_ipc_msg(hd); if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) == NULL) { cl_log(LOG_ERR, "%s: dequeue_dispatch_msg got NULL" , __FUNCTION__); return SA_OK; } ret = dispatch_msg(hd, msg); if (g_list_length(hd->dispatch_queue)) active_poll(hd); if (ret != HA_OK) return SA_ERR_LIBRARY; break; case SA_DISPATCH_ALL: read_and_queue_ipc_msg(hd); do { if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) != NULL) { dispatch_msg(hd, msg); } } while (g_list_length(hd->dispatch_queue)); break; case SA_DISPATCH_BLOCKING: break; default: cl_log(LOG_ERR, "%s: wrong dispatchFlags [%d]", __FUNCTION__, dispatchFlags); return SA_ERR_INVALID_PARAM; } return SA_OK;}SaErrorTsaMsgQueueOpenAsync(const SaMsgHandleT *msgHandle, SaInvocationT invocation, const SaNameT *queueName, const SaMsgQueueCreationAttributesT *creationAttributes, SaMsgQueueOpenFlagsT openFlags){ client_mqueue_open_t cmg; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (!openFlags) return SA_ERR_BAD_FLAGS; if (bad_saname(queueName) || !msgHandle || (!creationAttributes && openFlags & SA_MSG_QUEUE_CREATE) || (creationAttributes && !(openFlags & SA_MSG_QUEUE_CREATE))) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_OPEN_ASYNC; cmg.header.name = *queueName; if (creationAttributes) { if ((creationAttributes->creationFlags != 0) && creationAttributes->creationFlags != SA_MSG_QUEUE_PERSISTENT) return SA_ERR_BAD_FLAGS; cmg.attr = *creationAttributes; } cmg.openflag = openFlags; cmg.invocation = invocation; cmg.policy = 0; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK && !(hd->callbacks).saMsgQueueOpenCallback && !(hd->callbacks).saMsgMessageReceivedCallback) return SA_ERR_INIT; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) cl_log(LOG_ERR, "%s: cmsclient_message_send failed, rc = %d" , __FUNCTION__, ret); return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}SaErrorTsaMsgQueueGroupCreate(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, SaMsgQueueGroupPolicyT queueGroupPolicy){ int ret; client_mqueue_open_t cmg; client_mqueue_open_t *rcmg; client_header_t * reply; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); cmg.header.type = CMS_QUEUEGROUP_CREATE; cmg.header.name = *queueGroupName; cmg.invocation = 0; if (queueGroupPolicy != SA_MSG_QUEUE_GROUP_ROUND_ROBIN) return SA_ERR_INVALID_PARAM; cmg.policy = queueGroupPolicy; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__,ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_CREATE, queueGroupName, &reply, SA_TIME_END); if (ret != SA_OK) return ret; rcmg = (client_mqueue_open_t *) reply; if ((rcmg->header).flag == SA_OK) { SaMsgQueueHandleT *key; key = (SaMsgQueueHandleT *)ha_malloc(sizeof(SaMsgQueueHandleT)); *key = rcmg->handle; g_hash_table_insert(__mqhandle_hash, key, hd); } ret = (rcmg->header).flag; ha_free(rcmg); return ret;}SaErrorT saMsgQueueGroupDelete(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName){ /* TODO: we should remove the key that's in the __mqhandle_hash * as well */ return saMsgQueueUnlink(msgHandle, queueGroupName);}SaErrorT saMsgQueueGroupInsert(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, const SaNameT *queueName){ int ret; client_header_t * rcmg; client_mqgroup_ops_t cmg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUEGROUP_INSERT; cmg.header.name = *queueName; cmg.qgname = *queueGroupName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_INSERT, queueName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; ret = rcmg->flag; ha_free(rcmg); return ret;}SaErrorT saMsgQueueGroupRemove(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, const SaNameT *queueName){ int ret; client_header_t * rcmg; client_mqgroup_ops_t cmg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUEGROUP_REMOVE; cmg.header.name = *queueName; cmg.qgname = *queueGroupName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_REMOVE, queueName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; ret = rcmg->flag; ha_free((client_mqueue_unlink_t *) rcmg); return ret;}SaErrorT saMsgQueueGroupTrack(const SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, SaUint8T trackFlags, SaMsgQueueGroupNotificationBufferT *notificationBuffer){ int ret; client_mqgroup_mem_t cmg; client_header_t * rcmg; client_mqgroup_notify_t * rmsg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (!(hd->callbacks).saMsgQueueGroupTrackCallback) return SA_ERR_INIT; if ((trackFlags & SA_TRACK_CHANGES) && (trackFlags & SA_TRACK_CHANGES_ONLY)) return SA_ERR_BAD_FLAGS; /* tell server we care about the membership information */ cmg.header.type = CMS_QUEUEGROUP_TRACK_START; cmg.header.name = *queueGroupName; cmg.group_name = *queueGroupName; cmg.flag = trackFlags; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_TRACK_START, queueGroupName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; if ((ret = rcmg->flag) != SA_OK) return ret; rmsg = (client_mqgroup_notify_t *)rcmg; if ((trackFlags & SA_TRACK_CHANGES) || (trackFlags & SA_TRACK_CHANGES_ONLY)) { /* * Track membership changes with callbacks. */ __mqgroup_track_t * track; char * name; name = (char *) ha_malloc(queueGroupName->length + 1); strncpy(name, queueGroupName->value, queueGroupName->length); name[queueGroupName->length] = '\0'; track = (__mqgroup_track_t *) ha_malloc(sizeof(__mqgroup_track_t)); track->name = queueGroupName; track->flag = trackFlags & ~SA_TRACK_CURRENT; g_hash_table_insert(__group_tracking_hash, name, track); } if (trackFlags & SA_TRACK_CURRENT) { rmsg->data = (char *)rmsg + sizeof(client_mqgroup_notify_t); if (!notificationBuffer) { /* * Client wants saMsgQueueGroupTrackCallback. */ goto exit; } dprintf("numberOfItems %lu, real number %lu\n" , notificationBuffer->numberOfItems, rmsg->number); if (!notificationBuffer->notification) { notificationBuffer->notification = (SaMsgQueueGroupNotificationT *) ha_malloc(rmsg->number * sizeof(SaMsgQueueGroupNotificationT)); if (!notificationBuffer->notification) { ret = SA_ERR_NO_MEMORY; goto exit; } } else if (notificationBuffer->numberOfItems < rmsg->number) { ret = SA_ERR_NO_SPACE; goto exit; } notificationBuffer->numberOfItems = rmsg->number; memcpy(notificationBuffer->notification, rmsg->data , rmsg->number * sizeof(SaMsgQueueGroupNotificationT)); }exit: ha_free(rcmg); return ret;}SaErrorT saMsgQueueGroupTrackStop(const SaMsgHandleT *msgHandle, const SaNameT *queueGroupName){ char * name; gpointer key, track; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (bad_saname(queueGroupName)) return SA_ERR_INVALID_PARAM; name = (char *) ha_malloc(queueGroupName->length + 1); strncpy(name, queueGroupName->value, queueGroupName->length); name[queueGroupName->length] = '\0'; if (g_hash_table_lookup_extended(__group_tracking_hash, name, &key , &track) == TRUE) { g_hash_table_remove(__group_tracking_hash, key); ha_free(key); } else return SA_ERR_NOT_EXIST; ha_free(name); return SA_OK;}SaErrorT saMsgMessageSendReceive(SaMsgHandleT msgHandle, const SaNameT *destination, const SaMsgMessageT *sendMessage, SaMsgMessageT *receiveMessage, SaTimeT *replySendTime, SaTimeT timeout){ SaErrorT error = SA_OK; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(&msgHandle); const SaMsgMessageT * message; client_message_t *cmg; client_header_t *rcmg; client_message_t * ack; int freeack = 0; message = sendMessage; if (message->priority > SA_MSG_MESSAGE_LOWEST_PRIORITY) return SA_ERR_INVALID_PARAM; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle); return SA_ERR_BAD_HANDLE; } cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + message->size); cmg->header.type = CMS_MSG_SEND_RECEIVE; cmg->header.name = *destination; cmg->msg = *message; cmg->invocation = 0; cmg->data = cmg + 1; cmg->sendreceive = 1; memcpy(cmg->data, message->data, message->size); cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */ ret = cmsclient_message_send(hd, sizeof(client_message_t) + message->size, cmg); /* TODO: fix needed. this can only be called after the msg_done */ ha_free(cmg); while (1) { ret = wait_for_msg(hd, CMS_MSG_RECEIVE, NULL, &rcmg, timeout); if (ret != SA_OK) return ret; else break; } ret = rcmg->flag; ack = (client_message_t *) rcmg; ack->data = (void *)((char *)cmg + sizeof(client_message_t)); if (ack->msg.size > receiveMessage->size) { error = SA_ERR_NO_SPACE; } receiveMessage->size = ack->msg.size; if (receiveMessage->data) { memcpy(receiveMessage->data, ack->data, (ack->msg.size > receiveMessage->size ? receiveMessage->size : ack->msg.size)); freeack = 1; } else { receiveMessage->data = ack->data; } receiveMessage->type = ack->msg.type; receiveMessage->version = ack->msg.version; receiveMessage->priority = 0; if (freeack) ha_free(ack); dprintf("type is %d\n", ack->send_type); return error;}SaErrorT saMsgMessageReply(SaMsgHandleT msgHandle, const SaMsgMessageT *replyMessage, const SaMsgSenderIdT *senderId, SaTimeT timeout){ client_message_t *cmg; client_header_t *rcmg; client_message_ack_t * ack; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(&msgHandle); cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + replyMessage->size); cmg->header.type = CMS_MSG_REPLY; cmg->header.name.length = 0; cmg->msg = *replyMessage; cmg->invocation = 0; cmg->data = cmg + 1; memcpy(cmg->data, replyMessage->data, replyMessage->size); cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */ if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(client_message_t) + replyMessage->size, cmg); ha_free(cmg); while (1) { ret = wait_for_msg(hd, CMS_MSG_ACK, NULL, &rcmg, timeout); if (ret != SA_OK) return ret; ret = rcmg->flag; ack = (client_message_ack_t *) rcmg; /* * CMS_MSG_SEND is a blocking call, so we can only * have one client waiting for it. Thus when we get * an ACK that is for the request type CMS_MSG_SEND, * we know this is the ACK we are waiting for. */ dprintf("type is %d\n", ack->send_type); if (ack->send_type == CMS_MSG_REPLY) { ha_free((client_message_t *) rcmg); return ret; } else { enqueue_dispatch_msg(hd, rcmg); } } return SA_ERR_NOT_SUPPORTED;}SaErrorT saMsgMessageReplyAsync(SaMsgHandleT msgHandle, SaInvocationT invocation, const SaMsgMessageT *replyMessage, const SaMsgSenderIdT *senderId, SaMsgAckFlagsT ackFlags){ client_message_t *cmg; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(&msgHandle); cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + replyMessage->size); cmg->header.type = CMS_MSG_REPLY_ASYNC; cmg->header.name.length = 0; cmg->msg = *replyMessage; cmg->invocation = 0; cmg->data = cmg + 1; memcpy(cmg->data, replyMessage->data, replyMessage->size); cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */ if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(client_message_t) + replyMessage->size, cmg); ha_free(cmg); return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -