📄 cmslib_client.c
字号:
if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(client_message_t) + message->size, cmg); ha_free(cmg); return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}static intrequest_for_message(__cms_handle_t * hd, const SaNameT * name){ client_header_t request_msg; request_msg.type = CMS_MSG_REQUEST; request_msg.name = *name; return cmsclient_message_send(hd, sizeof(request_msg), &request_msg);}SaErrorTsaMsgMessageGet(const SaMsgQueueHandleT *queueHandle, SaMsgMessageT *message, SaMsgMessageInfoT *messageInfo, SaTimeT timeout){ int ret; SaErrorT error = SA_OK; client_message_t * cmg; client_header_t *rcmg; client_header_t request_msg; __cms_handle_t *hd = GET_MQ_HANDLE(queueHandle); __cms_queue_handle_t *qhd; SaNameT * qname; int freecmg = 0; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by queueHandle [%d]" , __FUNCTION__, queueHandle ? *queueHandle : -1); return SA_ERR_BAD_HANDLE; } qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle); assert(qhd != NULL); qname = &(qhd->queue_name); /* * request a message from daemon */ request_msg.type = CMS_MSG_REQUEST; request_msg.name = *qname; cmsclient_message_send(hd, sizeof(request_msg), &request_msg);wait_again: ret = wait_for_msg(hd, CMS_MSG_NOTIFY |CMS_MSG_GET, qname, &rcmg, timeout); if (rcmg->type == CMS_MSG_NOTIFY) { dprintf("Received CMS_MSG_NOTIFY\n"); request_for_message(hd, qname); goto wait_again; } if (ret != SA_OK) { cl_log(LOG_ERR, "wait_for_msg error [%d]", ret); return ret; } cmg = (client_message_t *)rcmg; cmg->data = (void *)((char *)cmg + sizeof(client_message_t)); dprintf("message.data is [%s]\n", (char *)cmg->data); if (message->size < cmg->msg.size) error = SA_ERR_NO_SPACE; message->size = (cmg->msg).size; if (message->data) { memcpy(message->data, cmg->data, (cmg->msg.size > message->size ? message->size : cmg->msg.size)); freecmg = 1; } else { message->data = cmg->data; } message->type = cmg->msg.type; message->version = cmg->msg.version; message->priority = cmg->msg.priority; if (freecmg) ha_free(cmg); /* TODO: message info */ return error;}SaErrorT saMsgMessageReceivedGet(const SaMsgQueueHandleT *queueHandle, const SaMsgMessageHandleT *messageHandle, SaMsgMessageT *message, SaMsgMessageInfoT *messageInfo){ return SA_ERR_NOT_SUPPORTED;}SaErrorT saMsgMessageCancel(const SaMsgQueueHandleT *queueHandle){ return SA_ERR_NOT_SUPPORTED;}SaErrorTsaMsgSelectionObjectGet(const SaMsgHandleT *msgHandle, SaSelectionObjectT *selectionObject){ int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } dprintf("hd->backup_fd is [%d]\n", hd->backup_fd); ret = hd->backup_fd >= 0 ? hd->active_fd : hd->ch->ops->get_recv_select_fd(hd->ch); if (ret < 0) return SA_ERR_LIBRARY; *selectionObject = ret; return SA_OK;}SaErrorTsaMsgDispatch(const SaMsgHandleT *msgHandle, SaDispatchFlagsT dispatchFlags){ int ret; client_header_t *msg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } switch (dispatchFlags) { case SA_DISPATCH_ONE: read_and_queue_ipc_msg(hd); if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) == NULL) { cl_log(LOG_ERR, "%s: dequeue_dispatch_msg got NULL" , __FUNCTION__); return SA_OK; } ret = dispatch_msg(hd, msg); if (g_list_length(hd->dispatch_queue)) active_poll(hd); if (ret != HA_OK) return SA_ERR_LIBRARY; break; case SA_DISPATCH_ALL: read_and_queue_ipc_msg(hd); do { if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) != NULL) { dispatch_msg(hd, msg); } } while (g_list_length(hd->dispatch_queue)); break; case SA_DISPATCH_BLOCKING: break; default: cl_log(LOG_ERR, "%s: wrong dispatchFlags [%d]", __FUNCTION__, dispatchFlags); return SA_ERR_INVALID_PARAM; } return SA_OK;}SaErrorTsaMsgQueueOpenAsync(const SaMsgHandleT *msgHandle, SaInvocationT invocation, const SaNameT *queueName, const SaMsgQueueCreationAttributesT *creationAttributes, SaMsgQueueOpenFlagsT openFlags){ client_mqueue_open_t cmg; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (!openFlags) return SA_ERR_BAD_FLAGS; if (bad_saname(queueName) || !msgHandle || (!creationAttributes && !(openFlags & SA_MSG_QUEUE_OPEN_ONLY)) || (creationAttributes && (openFlags & SA_MSG_QUEUE_OPEN_ONLY))) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_OPEN_ASYNC; cmg.header.name = *queueName; if (creationAttributes) { if ((creationAttributes->creationFlags != 0) && creationAttributes->creationFlags != SA_MSG_QUEUE_PERSISTENT) return SA_ERR_BAD_FLAGS; cmg.attr = *creationAttributes; } cmg.openflag = openFlags; cmg.invocation = invocation; cmg.policy = 0; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK && !(hd->callbacks).saMsgQueueOpenCallback && !(hd->callbacks).saMsgMessageReceivedCallback) return SA_ERR_INIT; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) cl_log(LOG_ERR, "%s: cmsclient_message_send failed, rc = %d" , __FUNCTION__, ret); return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}SaErrorTsaMsgQueueGroupCreate(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, SaMsgQueueGroupPolicyT queueGroupPolicy){ int ret; client_mqueue_open_t cmg; client_mqueue_open_t *rcmg; client_header_t * reply; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); cmg.header.type = CMS_QUEUEGROUP_CREATE; cmg.header.name = *queueGroupName; cmg.invocation = 0; if (queueGroupPolicy != SA_MSG_QUEUE_GROUP_ROUND_ROBIN) return SA_ERR_INVALID_PARAM; cmg.policy = queueGroupPolicy; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__,ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_CREATE, queueGroupName, &reply, SA_TIME_END); if (ret != SA_OK) return ret; rcmg = (client_mqueue_open_t *) reply; if ((rcmg->header).flag == SA_OK) { SaMsgQueueHandleT *key; key = (SaMsgQueueHandleT *)ha_malloc(sizeof(SaMsgQueueHandleT)); *key = rcmg->handle; g_hash_table_insert(__mqhandle_hash, key, hd); } ret = (rcmg->header).flag; ha_free(rcmg); return ret;}SaErrorT saMsgQueueGroupDelete(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName){ /* TODO: we should remove the key that's in the __mqhandle_hash * as well */ return saMsgQueueUnlink(msgHandle, queueGroupName);}SaErrorT saMsgQueueGroupInsert(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, const SaNameT *queueName){ int ret; client_header_t * rcmg; client_mqgroup_ops_t cmg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUEGROUP_INSERT; cmg.header.name = *queueName; cmg.qgname = *queueGroupName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_INSERT, queueName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; ret = rcmg->flag; ha_free(rcmg); return ret;}SaErrorT saMsgQueueGroupRemove(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, const SaNameT *queueName){ int ret; client_header_t * rcmg; client_mqgroup_ops_t cmg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUEGROUP_REMOVE; cmg.header.name = *queueName; cmg.qgname = *queueGroupName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_REMOVE, queueName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; ret = rcmg->flag; ha_free((client_mqueue_unlink_t *) rcmg); return ret;}SaErrorT saMsgQueueGroupTrackStart(const SaMsgHandleT *msgHandle, const SaNameT *queueGroupName, SaUint8T trackFlags, SaMsgQueueGroupNotificationT *notificationBuffer, SaUint32T numberOfItems){ int ret; client_mqgroup_mem_t cmg; client_header_t * rcmg; client_mqgroup_notify_t * rmsg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (!(hd->callbacks).saMsgQueueGroupTrackCallback) return SA_ERR_INIT; if ((trackFlags & SA_TRACK_CHANGES) && (trackFlags & SA_TRACK_CHANGES_ONLY)) return SA_ERR_BAD_FLAGS; /* tell server we care about the membership information */ cmg.header.type = CMS_QUEUEGROUP_TRACK_START; cmg.header.name = *queueGroupName; cmg.group_name = *queueGroupName; cmg.flag = trackFlags; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); if (ret != IPC_OK) { cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } ret = wait_for_msg(hd, CMS_QUEUEGROUP_TRACK_START, queueGroupName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; if ((ret = rcmg->flag) != SA_OK) return ret; rmsg = (client_mqgroup_notify_t *)rcmg; if ((trackFlags & SA_TRACK_CHANGES) || (trackFlags & SA_TRACK_CHANGES_ONLY)) { /* * Track membership changes with callbacks. */ __mqgroup_track_t * track; char * name; name = (char *) ha_malloc(queueGroupName->length + 1); strncpy(name, queueGroupName->value, queueGroupName->length); name[queueGroupName->length] = '\0'; track = (__mqgroup_track_t *) ha_malloc(sizeof(__mqgroup_track_t)); track->name = queueGroupName; track->flag = trackFlags & ~SA_TRACK_CURRENT; g_hash_table_insert(__group_tracking_hash, name, track); } if (trackFlags & SA_TRACK_CURRENT) { rmsg->data = (char *)rmsg + sizeof(client_mqgroup_notify_t); if (!notificationBuffer) { /* * Client wants saMsgQueueGroupTrackCallback. */ goto exit; } }exit: ha_free(rcmg); return ret;}SaErrorT saMsgQueueGroupTrackStop(const SaMsgHandleT *msgHandle, const SaNameT *queueGroupName){ char * name; gpointer key, track; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (bad_saname(queueGroupName)) return SA_ERR_INVALID_PARAM; name = (char *) ha_malloc(queueGroupName->length + 1); strncpy(name, queueGroupName->value, queueGroupName->length); name[queueGroupName->length] = '\0'; if (g_hash_table_lookup_extended(__group_tracking_hash, name, &key , &track) == TRUE) { g_hash_table_remove(__group_tracking_hash, key); ha_free(key); } else return SA_ERR_NOT_EXIST; ha_free(name); return SA_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -