📄 cmslib_client.c
字号:
ha_free(name); ha_free(nmsg); break; default: return HA_FAIL; } return HA_OK;}SaErrorTsaMsgInitialize(SaMsgHandleT *msgHandle, const SaMsgCallbacksT *msgCallbacks, const SaVersionT *version){ IPC_Channel *ch; __cms_handle_t *hd; SaMsgHandleT * key; int pipefd[2]; cl_log_set_entity("libcms"); cl_log_set_facility(LOG_USER);#ifdef DEBUG_LIBRARY cl_log_enable_stderr(TRUE);#endif if ((!version) || version->releaseCode < 'A' || version->releaseCode > 'Z' || (version->releaseCode << 8) + (version->major << 4) + version->minor > (AIS_VERSION_RELEASE_CODE << 8) + (AIS_VERSION_MAJOR << 4) + AIS_VERSION_MINOR) { cl_log(LOG_ERR, "AIS library version is lower then required"); return SA_ERR_VERSION; } if (!msgHandle) return SA_ERR_INVALID_PARAM; if (!(ch = cms_channel_conn())) { cl_log(LOG_ERR, "cms_channel_conn failed."); return SA_ERR_LIBRARY; } if (pipe(pipefd) == -1) { cl_log(LOG_ERR, "create pipe failed"); return SA_ERR_LIBRARY; } /* * Write something to the pipe but we never read so that * select to this fd will always return immediately. */ if (write(pipefd[1], "ACTIVE", 6) < 0) { cl_log(LOG_ERR, "write pipe failed\n"); return SA_ERR_LIBRARY; } cmsclient_hash_init(); dprintf("ch_status = %d\n", ch->ch_status); dprintf("farside_pid = %d\n", ch->farside_pid); hd = (__cms_handle_t *)ha_malloc(sizeof(__cms_handle_t)); memset(hd, 0, sizeof(__cms_handle_t)); hd->queue_handle_hash = g_hash_table_new(g_int_hash, g_int_equal); hd->ch = ch; if (msgCallbacks) { memcpy(&(hd->callbacks), msgCallbacks, sizeof(SaMsgCallbacksT)); } else { memset(&(hd->callbacks), 0, sizeof(SaMsgCallbacksT)); } *msgHandle = __cmshandle_counter++; hd->service_handle = *msgHandle; hd->active_fd = pipefd[0]; hd->backup_fd = -1; key = (SaMsgHandleT *) ha_malloc(sizeof(SaMsgHandleT)); key = msgHandle; g_hash_table_insert(__cmshandle_hash, key, hd); return SA_OK;}SaErrorTsaMsgFinalize(SaMsgHandleT *msgHandle){ __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } g_hash_table_foreach_remove(hd->queue_handle_hash, msgqueue_remove, hd); g_hash_table_remove(__cmshandle_hash, msgHandle); if (hd->backup_fd >= 0) restore_poll(hd); hd->ch->ops->destroy(hd->ch); close(hd->active_fd); /* TODO: need to free the glist on the dispatch queue */ ha_free(hd); return SA_OK;}SaErrorTsaMsgQueueOpen(const SaMsgHandleT *msgHandle, const SaNameT *queueName, const SaMsgQueueCreationAttributesT *creationAttributes, SaMsgQueueOpenFlagsT openFlags, SaTimeT timeout, SaMsgQueueHandleT *queueHandle){ int ret; client_mqueue_open_t cmg; client_mqueue_open_t *rcmg; client_header_t * reply; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (!openFlags) return SA_ERR_BAD_FLAGS; if (bad_saname(queueName) || !queueHandle || (!creationAttributes && (openFlags & SA_MSG_QUEUE_CREATE)) || (creationAttributes && !(openFlags & SA_MSG_QUEUE_CREATE))) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_OPEN; cmg.header.name = *queueName; if (creationAttributes) { if ((creationAttributes->creationFlags != 0) && creationAttributes->creationFlags != SA_MSG_QUEUE_PERSISTENT) return SA_ERR_BAD_FLAGS; cmg.attr = *creationAttributes; } else { /* * else set to -1, so that daemon knows client didn't * provide a creationAttributes */ cmg.attr.creationFlags = -1; } cmg.openflag = openFlags; cmg.invocation = 0; cmg.policy = 0; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK && !(hd->callbacks).saMsgMessageReceivedCallback) return SA_ERR_INIT; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); dprintf("%s: cmsclient_message_send returns %d\n", __FUNCTION__, ret); /* * We should only have one client blocking for it. */ ret = wait_for_msg(hd, CMS_QUEUE_OPEN, queueName, &reply, timeout); if (ret != SA_OK) { return ret; } rcmg = (client_mqueue_open_t *) reply; if ((ret = (rcmg->header).flag) == SA_OK) { SaMsgQueueHandleT *key; __cms_queue_handle_t *qhd; key = (SaMsgQueueHandleT *) ha_malloc(sizeof(SaMsgQueueHandleT)); qhd = (__cms_queue_handle_t *) ha_malloc(sizeof(__cms_queue_handle_t)); memset(qhd, 0, sizeof(__cms_queue_handle_t)); qhd->queue_handle = rcmg->handle; qhd->queue_name = *queueName; qhd->cms_handle = hd; *key = qhd->queue_handle; g_hash_table_insert(hd->queue_handle_hash, key, qhd); g_hash_table_insert(__mqhandle_hash, key, hd); *queueHandle = *key; } ha_free(rcmg); return ret;}SaErrorTsaMsgQueueClose(SaMsgQueueHandleT *queueHandle){ int ret; client_mqueue_close_t cmg; client_header_t *rcmg; __cms_handle_t *hd = NULL; __cms_queue_handle_t *qhd; SaNameT * qname; gpointer origkey, orighd; if (g_hash_table_lookup_extended(__mqhandle_hash, queueHandle, &origkey, &orighd)) { hd = (__cms_handle_t *) orighd; }; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, queueHandle ? *queueHandle : -1); return SA_ERR_BAD_HANDLE; } qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle); if (!qhd) { cl_log(LOG_ERR, "%s: Cannot find handlle [%d]" , __FUNCTION__, queueHandle ? *queueHandle : -1); return SA_ERR_BAD_HANDLE; } qname = &(qhd->queue_name); cmg.header.type = CMS_QUEUE_CLOSE; cmg.header.name = *qname; cmg.handle = *queueHandle; cmg.silent = FALSE; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); ret = wait_for_msg(hd, CMS_QUEUE_CLOSE, qname, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; if ((ret = rcmg->flag) == SA_OK) { g_hash_table_remove(hd->queue_handle_hash, queueHandle); g_hash_table_remove(__mqhandle_hash, queueHandle); ha_free(origkey); ha_free(qhd); /* TODO: free the queue msgs. */ } ha_free((client_mqueue_close_t *) rcmg); return ret;}static voidlookup_queuehandle(gpointer key, gpointer value, gpointer user_data){ char * qname; __cms_queue_handle_t *qhd = (__cms_queue_handle_t *)value; char * name = (char *)user_data; SaMsgQueueHandleT *queueHandle = (SaMsgQueueHandleT *)key; qname = saname2str(qhd->queue_name); if (!strcmp(qname, name)) { g_hash_table_remove(qhd->cms_handle->queue_handle_hash, key); } g_hash_table_remove(__mqhandle_hash, queueHandle);}SaErrorT saMsgQueueUnlink(SaMsgHandleT *msgHandle, const SaNameT *queueName){ int ret; char * name; client_mqueue_unlink_t cmg; client_header_t *rcmg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_UNLINK; cmg.header.name = *queueName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); ret = wait_for_msg(hd, CMS_QUEUE_UNLINK, queueName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; /* * remove from the mq from queue_handle_hash if possible */ name = saname2str(*queueName); g_hash_table_foreach(hd->queue_handle_hash, lookup_queuehandle, name); ret = rcmg->flag; ha_free((client_mqueue_unlink_t *) rcmg); ha_free(name); return ret;}SaErrorT saMsgQueueStatusGet(SaMsgHandleT *msgHandle, const SaNameT *queueName, SaMsgQueueStatusT *queueStatus){ int ret; client_mqueue_status_t cmg; client_mqueue_status_t *rcmg; client_header_t * reply; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_STATUS; cmg.header.name = *queueName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); ret = wait_for_msg(hd, CMS_QUEUE_STATUS, queueName, &reply, SA_TIME_END); if (ret != SA_OK) return ret; rcmg = (client_mqueue_status_t *) reply; ret = reply->flag; if (ret == SA_OK) *queueStatus = rcmg->qstatus; ha_free((client_mqueue_status_t *) reply); return ret;}SaErrorTsaMsgMessageSend(const SaMsgHandleT *msgHandle, const SaNameT *destination, const SaMsgMessageT *message, SaMsgAckFlagsT ackFlags, SaTimeT timeout){ client_message_t *cmg; client_header_t *rcmg; client_message_ack_t * ack; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (message->priority > SA_MSG_MESSAGE_LOWEST_PRIORITY || !(ackFlags & SA_MSG_MESSAGE_DELIVERED_ACK)) return SA_ERR_INVALID_PARAM; if (ackFlags & ~SA_MSG_MESSAGE_DELIVERED_ACK) return SA_ERR_BAD_FLAGS; cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + message->size); cmg->header.type = CMS_MSG_SEND; cmg->header.name = *destination; cmg->msg = *message; cmg->invocation = 0; cmg->data = cmg + 1; memcpy(cmg->data, message->data, message->size); cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */ if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (ackFlags & SA_MSG_MESSAGE_DELIVERED_ACK && !(hd->callbacks).saMsgMessageDeliveredCallback) return SA_ERR_INIT; ret = cmsclient_message_send(hd, sizeof(client_message_t) + message->size, cmg); ha_free(cmg); while (1) { ret = wait_for_msg(hd, CMS_MSG_ACK, destination, &rcmg, timeout); if (ret != SA_OK) return ret; ret = rcmg->flag; ack = (client_message_ack_t *) rcmg; /* * CMS_MSG_SEND is a blocking call, so we can only * have one client waiting for it. Thus when we get * an ACK that is for the request type CMS_MSG_SEND, * we know this is the ACK we are waiting for. */ dprintf("type is %d\n", ack->send_type); if (ack->send_type == CMS_MSG_SEND) { ha_free((client_message_t *) rcmg); return ret; } else { enqueue_dispatch_msg(hd, rcmg); } }}SaErrorTsaMsgMessageSendAsync(const SaMsgHandleT *msgHandle, SaInvocationT invocation, const SaNameT *destination, const SaMsgMessageT *message, SaMsgAckFlagsT ackFlags){ client_message_t *cmg; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + message->size); cmg->header.type = CMS_MSG_SEND_ASYNC; cmg->header.name = *destination; cmg->msg = *message; cmg->invocation = invocation; cmg->data = (char *)cmg + sizeof(client_message_t); memcpy(cmg->data, message->data, message->size); cmg->ack = ackFlags; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(client_message_t) + message->size, cmg); ha_free(cmg); return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}static intrequest_for_message(__cms_handle_t * hd, const SaNameT * name){ client_header_t request_msg; request_msg.type = CMS_MSG_REQUEST; request_msg.name = *name; return cmsclient_message_send(hd, sizeof(request_msg), &request_msg);}SaErrorTsaMsgMessageGet(const SaMsgQueueHandleT *queueHandle, SaMsgMessageT *message, SaMsgMessageInfoT *messageInfo, SaTimeT timeout){ int ret; SaErrorT error = SA_OK; client_message_t * cmg; client_header_t *rcmg; __cms_handle_t *hd = GET_MQ_HANDLE(queueHandle); __cms_queue_handle_t *qhd; SaNameT * qname; int freecmg = 0; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by queueHandle [%d]" , __FUNCTION__, queueHandle ? *queueHandle : -1); return SA_ERR_BAD_HANDLE; } if (!messageInfo || !message) return SA_ERR_INVALID_PARAM; memset(messageInfo, 0, sizeof(SaMsgMessageInfoT)); qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle); assert(qhd != NULL); qname = &(qhd->queue_name); /* * request a message from daemon */ while (1) { request_for_message(hd, qname); ret = wait_for_msg(hd, CMS_MSG_GET | CMS_MSG_NOTIFY, qname, &rcmg, timeout); if (ret != SA_OK) { cl_log(LOG_ERR, "wait_for_msg error [%d]", ret); return ret; } if (rcmg->type == CMS_MSG_NOTIFY) { dprintf("Received CMS_MSG_NOTIFY\n"); continue; } else break; } cmg = (client_message_t *)rcmg; cmg->data = (void *)((char *)cmg + sizeof(client_message_t)); dprintf("message.data is [%s]\n", (char *)cmg->data); if (cmg->senderId) { messageInfo->senderId = cmg->senderId; } if (message->size < cmg->msg.size) error = SA_ERR_NO_SPACE; message->size = (cmg->msg).size; if (message->data) { memcpy(message->data, cmg->data, (cmg->msg.size > message->size ? message->size : cmg->msg.size)); freecmg = 1; } else { message->data = cmg->data; } message->type = cmg->msg.type; message->version = cmg->msg.version; message->priority = cmg->msg.priority; if (freecmg) ha_free(cmg); /* TODO: message info */ return error;}SaErrorT saMsgMessageReceivedGet(const SaMsgQueueHandleT *queueHandle, const SaMsgMessageHandleT *messageHandle, SaMsgMessageT *message, SaMsgMessageInfoT *messageInfo){ return SA_ERR_NOT_SUPPORTED;}SaErrorT saMsgMessageCancel(const SaMsgQueueHandleT *queueHandle){ return SA_ERR_NOT_SUPPORTED;}SaErrorTsaMsgSelectionObjectGet(const SaMsgHandleT *msgHandle, SaSelectionObjectT *selectionObject){ int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } dprintf("hd->backup_fd is [%d]\n", hd->backup_fd); ret = hd->backup_fd >= 0 ? hd->active_fd : hd->ch->ops->get_recv_select_fd(hd->ch);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -