📄 cms_client.c
字号:
if ((mq = mqueue_table_lookup(mqname, NULL)) == NULL) { cl_log(LOG_ERR, "%s: cannot find mq [%s]", __FUNCTION__,mqname); ha_free(mqname); return FALSE; } if ((message = dequeue_message(mq)) == NULL) { cl_log(LOG_ERR, "%s: No message found for mq [%s], block" , __FUNCTION__, mqname); ha_free(mqname); return TRUE; } dprintf("%s: dequeue_message [%s]\n", __FUNCTION__ , (char *)message->msg.data); m = (client_message_t *) ha_malloc(sizeof(client_message_t) + message->msg.size); m->header.type = CMS_MSG_GET; m->header.len = sizeof(client_message_t) + message->msg.size; m->header.flag = SA_OK; m->header.name = msg->name; m->handle = mq->handle; m->senderId = message->msgInfo.senderId; m->msg = message->msg; m->msg.data = NULL; m->data = m + 1; memcpy(m->data, message->msg.data, message->msg.size); client_send_msg(client, m->header.len, m); /* TODO: needs fix. Can only be called after the msg_done */ ha_free(message); ha_free(mqname); ha_free(m); return TRUE;}intclient_process_mqgroup_insert(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ mqueue_t *mq, *mqg; char *mqname, *mqgname; client_mqgroup_ops_t * m = (client_mqgroup_ops_t *)msg; client_header_t reply; mqname = saname2str(m->header.name); mqgname = saname2str(m->qgname); reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = SA_OK; reply.name = msg->name; mqg = mqueue_table_lookup(mqgname, NULL); if (mqg == NULL) goto bad_queue; if (mqg->policy == 0) /* not a queue group */ goto bad_queue; mq = mqueue_table_lookup(mqname, NULL); if (mq == NULL) goto bad_queue; /* * check if the queue is already in the queue group */ if (g_list_find(mqg->list, mq) == NULL) { mqg->list = g_list_append(mqg->list, mq); cl_log(LOG_INFO, "Adding mq <%p> to [%s] list", mq, mqgname); if (mqg->current == NULL) mqg->current = g_list_first(mqg->list); /* * Notify other nodes the change */ request_mqgroup_insert(mqgname, mqname, cmsdata); } else { reply.flag = SA_ERR_EXIST; client_send_msg(client, reply.len, &reply); goto exit; } client_send_msg(client, reply.len, &reply); goto exit;bad_queue: reply.flag = SA_ERR_NOT_EXIST; client_send_msg(client, reply.len, &reply);exit: ha_free(mqname); ha_free(mqgname); return TRUE;}intclient_process_mqgroup_remove(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ mqueue_t *mq, *mqg; char *mqname, *mqgname; client_mqgroup_ops_t * m = (client_mqgroup_ops_t *)msg; client_header_t reply; mqname = saname2str(m->header.name); mqgname = saname2str(m->qgname); reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = SA_OK; reply.name = msg->name; mqg = mqueue_table_lookup(mqgname, NULL); if (mqg == NULL) goto error; if (!IS_MQGROUP(mqg)) /* not a queue group */ goto error; mq = mqname_lookup(mqname, NULL); if (mq == NULL) goto error; /* * check if the queue is already removed from the queue group */ if (g_list_find(mqg->list, mq) != NULL) { mqgroup_unref_mqueue(mqg, mq); /* * Notify other nodes the change */ request_mqgroup_remove(mqgname, mqname, cmsdata); } else goto error; client_send_msg(client, reply.len, &reply); goto exit;error: reply.flag = SA_ERR_NOT_EXIST; client_send_msg(client, reply.len, &reply);exit: ha_free(mqname); ha_free(mqgname); return TRUE;}intclient_process_mqgroup_track(IPC_Channel * client, client_header_t * msg){ mqueue_t *mqg; char * gname; int group; client_mqgroup_mem_t * m = (client_mqgroup_mem_t *)msg; client_mqgroup_notify_t * rmsg; rmsg = (client_mqgroup_notify_t *) ha_malloc(sizeof(client_mqgroup_notify_t)); rmsg->header.type = msg->type; rmsg->header.len = sizeof(client_mqgroup_notify_t); rmsg->header.flag = SA_OK; rmsg->header.name = msg->name; gname = saname2str(m->group_name); mqg = mqueue_table_lookup(gname, &group); if (mqg == NULL || group == FALSE) goto noexist; if ((m->flag & SA_TRACK_CHANGES) || (m->flag & SA_TRACK_CHANGES_ONLY)) { client_mqgroup_track_t * track; track = (client_mqgroup_track_t *) ha_malloc(sizeof(client_mqgroup_track_t)); if (track == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); rmsg->header.flag = SA_ERR_NO_MEMORY; client_send_msg(client, rmsg->header.len , (client_header_t *)rmsg); ha_free(rmsg); return TRUE; } track->ch = client; track->flag = m->flag; cl_log(LOG_INFO, "%s: append ch to [%s] notify_list,flag = <%d>" , __FUNCTION__, gname, track->flag); mqg->notify_list = g_list_append(mqg->notify_list, track); } if (m->flag & SA_TRACK_CURRENT) { notify_buffer_t buf; size_t length; buf.number = 0; buf.change_buff = NULL; g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf); length = buf.number * sizeof(SaMsgQueueGroupNotificationT); rmsg = realloc(rmsg, sizeof(client_mqgroup_notify_t) + length); rmsg->data = (char *)rmsg + sizeof(client_mqgroup_notify_t); memcpy(rmsg->data, buf.change_buff, length); ha_free(buf.change_buff); rmsg->number = buf.number; rmsg->policy = buf.policy; rmsg->group_name = buf.name; rmsg->header.len += length; } client_send_msg(client, rmsg->header.len, (client_header_t *)rmsg); goto exit;noexist: rmsg->header.flag = SA_ERR_NOT_EXIST; client_send_msg(client, rmsg->header.len, (client_header_t *)rmsg);exit: ha_free(gname); ha_free(rmsg); return TRUE;}static gintcompare_client(gconstpointer a, gconstpointer b){ const client_mqgroup_track_t * c; const IPC_Channel * d; c = (const client_mqgroup_track_t *)a; d = (const IPC_Channel *)b; if (c->ch == d) return TRUE; else return FALSE;}intclient_process_mqgroup_track_stop(IPC_Channel * client, client_header_t * msg){ mqueue_t *mqg; char * gname; int group; client_mqgroup_track_t * track; client_mqgroup_mem_t * m = (client_mqgroup_mem_t *)msg; client_header_t reply; reply.type = msg->type; reply.len = sizeof(client_header_t); reply.flag = SA_OK; reply.name = msg->name; gname = saname2str(m->group_name); mqg = mqueue_table_lookup(gname, &group); if (mqg == NULL || group == FALSE) goto noexist; track = (client_mqgroup_track_t *)g_list_find_custom(mqg->notify_list, client, compare_client); if (track != NULL) { mqg->notify_list = g_list_remove(mqg->notify_list, track); ha_free(track); } client_send_msg(client, reply.len, &reply); goto exit;noexist: reply.flag = SA_ERR_NOT_EXIST; client_send_msg(client, reply.len, &reply);exit: ha_free(gname); return TRUE;}static voidcms_client_msg_done(IPC_Message * msg){ client_header_t * message; size_t msg_type; //mqueue_t * mq; //client_message_t * m = (client_message_t *) message; message = msg->msg_body; msg_type = message->type; dprintf("cms_client_msg_done, type = %d\n", (int)msg_type);#if 0 /* update the buffer size */ if (msg_type == CMS_MSG_SEND || msg_type == CMS_MSG_SEND_ASYNC) if ((mq = mqname_lookup(mqname, NULL))) mqueue_update_usage(mq, m->msg.priority, -m->msg.size);#endif ha_free(msg->msg_private); return;}static voidcms_client_msg_done_freeclient(IPC_Message * msg){ client_header_t * message; size_t msg_type; message = msg->msg_body; msg_type = message->type; dprintf("cms_client_msg_done, type = %d\n", (int)msg_type); ha_free(msg->msg_private); ha_free(msg->msg_ch); return;}intclient_send_msg(IPC_Channel * client, size_t len, gpointer data){ int ret; IPC_Message * msg; CMS_TRACE(); if ((msg = ha_malloc(sizeof(IPC_Message) + len)) == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; }#if DEBUG_MEMORY dprintf("%s (%p) ha_malloc %p, size 0x%x\n", __FUNCTION__ , &client_send_msg, msg, sizeof(IPC_Message) + len);#endif msg->msg_body = msg + 1; memcpy(msg->msg_body, data, len); msg->msg_len = len; msg->msg_done = cms_client_msg_done; msg->msg_private = msg; msg->msg_ch = client; msg->msg_buf = NULL; ret = client->ops->send(client, msg); if (ret == IPC_OK) return TRUE; else return FALSE;}/* This function send the message thru the client and free the client * memory afterward. This has to be done here since it is async. --YZ */intclient_send_msg_freeclient(IPC_Channel * client, size_t len, gpointer data){ int ret; IPC_Message * msg; CMS_TRACE(); if ((msg = ha_malloc(sizeof(IPC_Message) + len)) == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; }#if DEBUG_MEMORY dprintf("%s (%p) ha_malloc %p, size 0x%x\n", __FUNCTION__ , &client_send_msg, msg, sizeof(IPC_Message) + len);#endif msg->msg_body = msg + 1; memcpy(msg->msg_body, data, len); msg->msg_len = len; msg->msg_done = cms_client_msg_done_freeclient; msg->msg_private = msg; msg->msg_ch = client; msg->msg_buf = NULL; ret = client->ops->send(client, msg); if (ret == IPC_OK) return TRUE; else return FALSE;}intclient_send_error_msg(IPC_Channel * client, const char * name, size_t type, SaErrorT error){ client_header_t msg; msg.type = type; msg.len = sizeof(client_header_t); msg.flag = error; str2saname(&msg.name, name); return client_send_msg(client, msg.len, &msg);}intclient_send_qstatus(IPC_Channel * client, mqueue_t * queue, int flag){ client_mqueue_status_t qstatus; memset(&qstatus, 0, sizeof(client_mqueue_status_t)); qstatus.header.type = CMS_QUEUE_STATUS; qstatus.header.len = sizeof(client_mqueue_status_t); qstatus.header.flag = flag; str2saname(&qstatus.header.name, queue->name); qstatus.qstatus = queue->status; return client_send_msg(client, qstatus.header.len, &qstatus);}intclient_send_client_qopen(IPC_Channel * client, mqueue_request_t * request, guint handle, int flag){ client_mqueue_open_t qopen; CMS_TRACE(); if (!request) return HA_FAIL; memset(&qopen, 0, sizeof(client_mqueue_open_t)); qopen.header.type = request->request_type; qopen.header.len = sizeof(client_mqueue_open_t); qopen.header.flag = flag; str2saname(&qopen.header.name, request->qname); qopen.handle = handle; qopen.invocation = request->invocation; return client_send_msg(client, qopen.header.len, &qopen);}intclient_send_ack_msg(IPC_Channel * client, mqueue_request_t * request, guint handle, int flag){ client_message_ack_t ack; if (!request) return HA_FAIL; memset(&ack, 0, sizeof(client_message_ack_t)); ack.header.type = CMS_MSG_ACK; ack.header.len = sizeof(client_message_ack_t); ack.header.flag = flag; str2saname(&ack.header.name, request->qname); ack.handle = handle; ack.invocation = request->invocation; ack.send_type = request->request_type; return client_send_msg(client, ack.header.len, &ack);}int client_send_notready_msg(IPC_Channel * client, client_header_t * msg){ client_header_t reply; reply.type = msg->type; reply.len = sizeof(client_header_t); reply.name = msg->name; reply.flag = SA_ERR_TRY_AGAIN; cl_log(LOG_ERR, "CMS is still waiting to receive message queue " "updates. Please try again later."); return client_send_msg(client, reply.len, &reply);}int client_process_mqsend_reply(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){ SaErrorT error; mqueue_request_t request; IPC_Channel *cli = NULL; unsigned long * seq = NULL; client_message_t * m = (client_message_t *) msg; client_message_ack_t reply; /* no queue name for this msg */ request.qname = NULL; request.gname = NULL; request.request_type = m->header.type; dprintf("request.request_type is %d\n", request.request_type); request.invocation = m->invocation; request.ack = m->ack; request.sendreceive = 0; request.seq = gSendSeqNo++; m->data = (void *)((char *)msg + sizeof(client_message_t)); m->msg.data = m->data; if (request.ack) { dprintf("%s: insert ack packet: ", __FUNCTION__); if ((cli = (IPC_Channel *) ha_malloc(sizeof(IPC_Channel))) == NULL || (seq = (unsigned long *) ha_malloc(sizeof(unsigned long))) == NULL ) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); if (cli) ha_free(cli); error = SA_ERR_NO_MEMORY; goto error; } *cli = *client; *seq = request.seq; dprintf("seq = %ld, client = %p\n", *seq, cli); g_hash_table_insert(mq_ack_pending_hash, seq, cli); } if (send_mq_reply(&request, m->senderId, &(m->msg), cmsdata) != TRUE) { cl_log(LOG_ERR, "%s: mqname_send failed", __FUNCTION__); error = SA_ERR_LIBRARY; goto error; } return TRUE;error: /* This is actually a error respond instead of a ACK. */ memset(&reply, 0, sizeof(client_message_ack_t)); reply.header.type = CMS_MSG_ACK; reply.header.len = sizeof(client_message_ack_t); reply.header.flag = error; reply.header.name = msg->name; reply.send_type = msg->type; reply.invocation = m->invocation; client_send_msg(client, reply.header.len, &reply); return TRUE;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -