📄 cmslib_client.c
字号:
return HA_FAIL; switch (msg->type) { case CMS_QUEUE_OPEN_ASYNC: omsg = (client_mqueue_open_t *) msg; if ((handle->callbacks).saMsgQueueOpenCallback) { if (omsg->header.flag != SA_OK) { omsg->handle = 0; } (handle->callbacks).saMsgQueueOpenCallback( &(omsg->handle), omsg->invocation, omsg->header.flag); } ha_free(omsg); break; case CMS_MSG_NOTIFY: gmsg = (client_message_t *) msg; qhd = g_hash_table_lookup(handle->queue_handle_hash, &(gmsg->handle)); if (handle->callbacks.saMsgMessageReceivedCallback) handle->callbacks.saMsgMessageReceivedCallback( &(qhd->queue_handle), NULL, 0); ha_free(gmsg); break; case CMS_MSG_ACK: amsg = (client_message_ack_t *) msg; if ((handle->callbacks).saMsgMessageDeliveredCallback) { (handle->callbacks).saMsgMessageDeliveredCallback( amsg->invocation, msg->flag); } ha_free(amsg); break; case CMS_QUEUEGROUP_NOTIFY: nmsg = (client_mqgroup_notify_t *)msg; name = (char *) ha_malloc(nmsg->group_name.length + 1); if (name == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; } dprintf("group name [%s], length [%d]\n" , nmsg->group_name.value, nmsg->group_name.length); strncpy(name, nmsg->group_name.value, nmsg->group_name.length); name[nmsg->group_name.length] = '\0'; dprintf("name = [%s]\n", name); track = g_hash_table_lookup(__group_tracking_hash, name); if (track == NULL) { cl_log(LOG_ERR, "Cannot find track buffer"); return FALSE; } if ((handle->callbacks).saMsgQueueGroupTrackCallback == NULL) return FALSE; ha_free(name); ha_free(nmsg); break; default: return HA_FAIL; } return HA_OK;}SaErrorTsaMsgInitialize(SaMsgHandleT *msgHandle, const SaMsgCallbacksT *msgCallbacks, const SaVersionT *version){ IPC_Channel *ch; __cms_handle_t *hd; SaMsgHandleT * key; int pipefd[2]; cl_log_set_entity("libcms"); cl_log_set_facility(LOG_USER);#ifdef DEBUG_LIBRARY cl_log_enable_stderr(TRUE);#endif if ((!version) || version->releaseCode < 'A' || version->releaseCode > 'Z' || (version->releaseCode << 8) + (version->major << 4) + version->minor > (AIS_VERSION_RELEASE_CODE << 8) + (AIS_VERSION_MAJOR << 4) + AIS_VERSION_MINOR) { cl_log(LOG_ERR, "AIS library version is lower then required"); return SA_ERR_VERSION; } if (!msgHandle) return SA_ERR_INVALID_PARAM; if (!(ch = cms_channel_conn())) { cl_log(LOG_ERR, "cms_channel_conn failed."); return SA_ERR_LIBRARY; } if (pipe(pipefd) == -1) { cl_log(LOG_ERR, "create pipe failed"); return SA_ERR_LIBRARY; } /* * Write something to the pipe but we never read so that * select to this fd will always return immediately. */ if (write(pipefd[1], "ACTIVE", 6) < 0) { cl_log(LOG_ERR, "write pipe failed\n"); return SA_ERR_LIBRARY; } cmsclient_hash_init(); dprintf("ch_status = %d\n", ch->ch_status); dprintf("farside_pid = %d\n", ch->farside_pid); hd = (__cms_handle_t *)ha_malloc(sizeof(__cms_handle_t)); if (!hd) return SA_ERR_NO_MEMORY; memset(hd, 0, sizeof(__cms_handle_t)); hd->queue_handle_hash = g_hash_table_new(g_int_hash, g_int_equal); hd->ch = ch; if (msgCallbacks) { memcpy(&(hd->callbacks), msgCallbacks, sizeof(SaMsgCallbacksT)); } else { memset(&(hd->callbacks), 0, sizeof(SaMsgCallbacksT)); } *msgHandle = __cmshandle_counter++; hd->service_handle = *msgHandle; hd->active_fd = pipefd[0]; hd->backup_fd = -1; key = (SaMsgHandleT *) ha_malloc(sizeof(SaMsgHandleT)); key = msgHandle; g_hash_table_insert(__cmshandle_hash, key, hd); return SA_OK;}SaErrorTsaMsgFinalize(SaMsgHandleT *msgHandle){ __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } g_hash_table_foreach_remove(hd->queue_handle_hash, msgqueue_remove, hd); g_hash_table_remove(__cmshandle_hash, msgHandle); if (hd->backup_fd >= 0) restore_poll(hd); hd->ch->ops->destroy(hd->ch); close(hd->active_fd); /* TODO: need to free the glist on the dispatch queue */ ha_free(hd); return SA_OK;}SaErrorTsaMsgQueueOpen(const SaMsgHandleT *msgHandle, const SaNameT *queueName, const SaMsgQueueCreationAttributesT *creationAttributes, SaMsgQueueOpenFlagsT openFlags, SaMsgQueueHandleT *queueHandle, SaTimeT timeout){ int ret; client_mqueue_open_t cmg; client_mqueue_open_t *rcmg; client_header_t * reply; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (!openFlags) return SA_ERR_BAD_FLAGS; if (bad_saname(queueName) || !queueHandle || (!creationAttributes && !(openFlags & SA_MSG_QUEUE_OPEN_ONLY)) || (creationAttributes && (openFlags & SA_MSG_QUEUE_OPEN_ONLY))) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_OPEN; cmg.header.name = *queueName; if (creationAttributes) { if ((creationAttributes->creationFlags != 0) && creationAttributes->creationFlags != SA_MSG_QUEUE_PERSISTENT) return SA_ERR_BAD_FLAGS; cmg.attr = *creationAttributes; } else { /* * else set to -1, so that daemon knows client didn't * provide a creationAttributes */ cmg.attr.creationFlags = -1; } cmg.openflag = openFlags; cmg.invocation = 0; cmg.policy = 0; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK && !(hd->callbacks).saMsgQueueOpenCallback && !(hd->callbacks).saMsgMessageReceivedCallback) return SA_ERR_INIT; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); dprintf("%s: cmsclient_message_send returns %d\n", __FUNCTION__, ret); /* * We should only have one client blocking for it. */ ret = wait_for_msg(hd, CMS_QUEUE_OPEN, queueName, &reply, timeout); if (ret != SA_OK) { return ret; } rcmg = (client_mqueue_open_t *) reply; if ((ret = (rcmg->header).flag) == SA_OK) { SaMsgQueueHandleT *key; __cms_queue_handle_t *qhd; key = (SaMsgQueueHandleT *) ha_malloc(sizeof(SaMsgQueueHandleT)); qhd = (__cms_queue_handle_t *) ha_malloc(sizeof(__cms_queue_handle_t)); memset(qhd, 0, sizeof(__cms_queue_handle_t)); qhd->queue_handle = rcmg->handle; qhd->queue_name = *queueName; qhd->cms_handle = hd; *key = qhd->queue_handle; g_hash_table_insert(hd->queue_handle_hash, key, qhd); g_hash_table_insert(__mqhandle_hash, key, hd); *queueHandle = *key; } ha_free(rcmg); return ret;}SaErrorTsaMsgQueueClose(SaMsgQueueHandleT *queueHandle){ int ret; client_mqueue_close_t cmg; client_header_t *rcmg; __cms_handle_t *hd = NULL; __cms_queue_handle_t *qhd; SaNameT * qname; gpointer origkey, orighd; if (g_hash_table_lookup_extended(__mqhandle_hash, queueHandle, &origkey, &orighd)) { hd = (__cms_handle_t *) orighd; }; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, queueHandle ? *queueHandle : -1); return SA_ERR_BAD_HANDLE; } qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle); if (!qhd) { cl_log(LOG_ERR, "%s: Cannot find handlle [%d]" , __FUNCTION__, queueHandle ? *queueHandle : -1); return SA_ERR_BAD_HANDLE; } qname = &(qhd->queue_name); cmg.header.type = CMS_QUEUE_CLOSE; cmg.header.name = *qname; cmg.handle = *queueHandle; cmg.silent = FALSE; ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); ret = wait_for_msg(hd, CMS_QUEUE_CLOSE, qname, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; if ((ret = rcmg->flag) == SA_OK) { g_hash_table_remove(hd->queue_handle_hash, queueHandle); g_hash_table_remove(__mqhandle_hash, queueHandle); ha_free(origkey); ha_free(qhd); /* TODO: free the queue msgs. */ } ha_free((client_mqueue_close_t *) rcmg); return ret;}static voidlookup_queuehandle(gpointer key, gpointer value, gpointer user_data){ char * qname; __cms_queue_handle_t *qhd = (__cms_queue_handle_t *)value; char * name = (char *)user_data; SaMsgQueueHandleT *queueHandle = (SaMsgQueueHandleT *)key; qname = saname2str(qhd->queue_name); if (!strcmp(qname, name)) { g_hash_table_remove(qhd->cms_handle->queue_handle_hash, key); } g_hash_table_remove(__mqhandle_hash, queueHandle);}SaErrorT saMsgQueueUnlink(SaMsgHandleT *msgHandle, const SaNameT *queueName){ int ret; char * name; client_mqueue_unlink_t cmg; client_header_t *rcmg; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_UNLINK; cmg.header.name = *queueName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); ret = wait_for_msg(hd, CMS_QUEUE_UNLINK, queueName, &rcmg, SA_TIME_END); if (ret != SA_OK) return ret; /* * remove from the mq from queue_handle_hash if possible */ name = saname2str(*queueName); g_hash_table_foreach(hd->queue_handle_hash, lookup_queuehandle, name); ret = rcmg->flag; ha_free((client_mqueue_unlink_t *) rcmg); ha_free(name); return ret;}SaErrorT saMsgQueueStatusGet(SaMsgHandleT *msgHandle, const SaNameT *queueName, SaMsgQueueStatusT *queueStatus){ int ret; client_mqueue_status_t cmg; client_mqueue_status_t *rcmg; client_header_t * reply; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (bad_saname(queueName)) return SA_ERR_INVALID_PARAM; cmg.header.type = CMS_QUEUE_STATUS; cmg.header.name = *queueName; if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } ret = cmsclient_message_send(hd, sizeof(cmg), &cmg); ret = wait_for_msg(hd, CMS_QUEUE_STATUS, queueName, &reply, SA_TIME_END); if (ret != SA_OK) return ret; rcmg = (client_mqueue_status_t *) reply; ret = reply->flag; if (ret == SA_OK) *queueStatus = rcmg->qstatus; ha_free((client_mqueue_status_t *) reply); return ret;}SaErrorTsaMsgMessageSend(const SaMsgHandleT *msgHandle, const SaNameT *destination, const SaMsgMessageT *message, SaMsgAckFlagsT ackFlags, SaTimeT timeout){ client_message_t *cmg; client_header_t *rcmg; client_message_ack_t * ack; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); if (message->priority > SA_MSG_MESSAGE_LOWEST_PRIORITY || !(ackFlags & SA_MSG_MESSAGE_DELIVERED_ACK)) return SA_ERR_INVALID_PARAM; if (ackFlags & ~SA_MSG_MESSAGE_DELIVERED_ACK) return SA_ERR_BAD_FLAGS; cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + message->size); if (!cmg) return SA_ERR_NO_MEMORY; cmg->header.type = CMS_MSG_SEND; cmg->header.name = *destination; cmg->msg = *message; cmg->invocation = 0; cmg->data = cmg + 1; memcpy(cmg->data, message->data, message->size); cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */ if (hd == NULL) { cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]" , __FUNCTION__, msgHandle ? *msgHandle : -1); return SA_ERR_BAD_HANDLE; } if (ackFlags & SA_MSG_MESSAGE_DELIVERED_ACK && !(hd->callbacks).saMsgMessageDeliveredCallback) return SA_ERR_INIT; ret = cmsclient_message_send(hd, sizeof(client_message_t) + message->size, cmg); ha_free(cmg); while (1) { ret = wait_for_msg(hd, CMS_MSG_ACK | CMS_MSG_SEND, destination, &rcmg, timeout); if (ret != SA_OK) return ret; ret = rcmg->flag; ack = (client_message_ack_t *) rcmg; if (rcmg->type == CMS_MSG_SEND) { ha_free((client_message_t *)rcmg); return ret; } /* * CMS_MSG_SEND is a blocking call, so we can only * have one client waiting for it. Thus when we get * an ACK that is for the request type CMS_MSG_SEND, * we know this is the ACK we are waiting for. */ dprintf("type is %d\n", ack->send_type); if (ack->send_type == CMS_MSG_SEND) { ha_free((client_message_t *) rcmg); return ret; } else { enqueue_dispatch_msg(hd, rcmg); } }}SaErrorTsaMsgMessageSendAsync(const SaMsgHandleT *msgHandle, SaInvocationT invocation, const SaNameT *destination, const SaMsgMessageT *message, SaMsgAckFlagsT ackFlags){ client_message_t *cmg; int ret; __cms_handle_t *hd = GET_CMS_HANDLE(msgHandle); cmg = (client_message_t *) ha_malloc(sizeof(client_message_t) + message->size); if (!cmg) return SA_ERR_NO_MEMORY; cmg->header.type = CMS_MSG_SEND_ASYNC; cmg->header.name = *destination; cmg->msg = *message; cmg->invocation = invocation; cmg->data = (char *)cmg + sizeof(client_message_t); memcpy(cmg->data, message->data, message->size); cmg->ack = ackFlags;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -