📄 cms_cluster.c
字号:
/* * cms_cluster.c: cms daemon cluster operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include <cl_log.h>#include <heartbeat.h>#include "cms_common.h"#define MQ_MEMBER_COUNT g_list_length(mqmember_list)#define RETENTION_TIME_EXPIRES(mq) ( \ mq->status.creationFlags != SA_MSG_QUEUE_PERSISTENT \ && mq->status.closeTime > 0 \ && get_current_satime() - mq->status.closeTime \ >= mq->status.retentionTime)GHashTable * mq_open_pending_hash;GHashTable * mq_status_pending_hash;GHashTable * mq_ack_pending_hash;GHashTable * mq_reply_pending_hash;struct reply_info { unsigned long seq; char * node;};int gReplyCount = 1; static intget_senderId_by_name(const char * node, int seq){ struct reply_info * info; int * senderId; info = (struct reply_info *) ha_malloc(sizeof(struct reply_info)); info->seq = seq; info->node = ha_strdup(node); senderId = (int *) ha_malloc(sizeof(int)); *senderId = ++gReplyCount; g_hash_table_insert(mq_reply_pending_hash, senderId, info); return *senderId;}static const char *get_name_by_senderId(int senderId, unsigned long * seq){ gboolean found; struct reply_info * info; int * orig_id; static char node[SA_MAX_NAME_LENGTH]; *seq = 0; memset(node, 0, SA_MAX_NAME_LENGTH); found = g_hash_table_lookup_extended(mq_reply_pending_hash, &senderId, (gpointer) &orig_id, (gpointer) &info); if (found) { strncpy(node, info->node, SA_MAX_NAME_LENGTH); *seq = info->seq; dprintf("%s: found the reply_info: node = %s, seq = %ld\n", __FUNCTION__, node, *seq); ha_free(info); ha_free(orig_id); return node; } return NULL;}/** * cluster_hash_table_init - initialize local message queue database */intcluster_hash_table_init(){ mqueue_table_init(); mq_open_pending_hash = g_hash_table_new(g_str_hash, g_str_equal); mq_status_pending_hash = g_hash_table_new(g_str_hash, g_str_equal); mq_ack_pending_hash = g_hash_table_new(g_int_hash, g_int_equal); mq_reply_pending_hash = g_hash_table_new(g_int_hash, g_int_equal); return HA_OK;}/** * request_mqname_open - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * If the request is sent out successfully, it returns TRUE; otherwise * returns FALSE. This call is non-blocking. *//* * The algorithm for selecting the master node is the following: * * index = g_str_hash(request->qname) % MQ_MEMBER_COUNT; * tonode = g_list_nth_data(mqmember_list, (guint)index); * * Every node could potentially be the mater node for a queue. * The purpose of this algorithm is for load-balancing. * */intrequest_mqname_open(mqueue_request_t * request, cms_data_t * cmsdata){ int ret; ll_cluster_t *hb; struct ha_msg *msg; const char *tonode, *type; int index, i; char *data, *p; char size_string[64]; index = g_str_hash(request->qname) % MQ_MEMBER_COUNT; dprintf("index = %d, total = %d\n", index, MQ_MEMBER_COUNT); data = g_list_nth_data(mqmember_list, (guint)index); assert(data); tonode = data; type = mqname_type2string(MQNAME_TYPE_REQUEST); for (p = size_string, i = 0; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) { p += sprintf(p, "%d:", request->size[i]); } dprintf("%s: tonode is [%s]\n", __FUNCTION__, tonode); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_add(msg, F_MQNAME, request->qname) == HA_FAIL || ha_msg_addbin(msg, F_MQPOLICY, &(request->policy), sizeof(int)) == HA_FAIL || ha_msg_addbin(msg, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(msg, F_MQCREATEFLAG, &(request->create_flag), sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL || ha_msg_addbin(msg, F_MQOPENFLAG, &(request->open_flag), sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL || ha_msg_addbin(msg, F_MQRETENTION, &(request->retention), sizeof(SaTimeT)) == HA_FAIL || ha_msg_add(msg, F_MQSIZE, size_string) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ret = FALSE; } else { hb = cmsdata->hb_handle; hb->llc_ops->sendnodemsg(hb, msg, tonode); ret = TRUE; } ha_msg_del(msg); return ret;}/** * request_mqname_close - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_close(const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; CMS_TRACE(); type = mqname_type2string(MQNAME_TYPE_CLOSE); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(msg); return FALSE; } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); ha_msg_del(msg); return SA_OK;}/** * request_mqname_unlink - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_unlink(const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; type = mqname_type2string(MQNAME_TYPE_UNLINK); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } else { hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); } ha_msg_del(msg); return SA_OK;}/** * request_mqname_send - send a message to a message queue in the cluster * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_send(mqueue_request_t * request, const char *node, const char * client, SaMsgMessageT *msg, cms_data_t * cmsdata){ int ret; ll_cluster_t *hb; struct ha_msg *m; const char *type; type = mqname_type2string(MQNAME_TYPE_SEND); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL || ha_msg_addbin(m, F_SENDRECEIVE, (char *) &(request->sendreceive), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGTYPE, (char *) &msg->type, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGVER, (char *) &msg->version, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSIZE, (char *) &msg->size, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGPRI, (char *) &msg->priority, sizeof(SaUint8T)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGDATA, (char *) msg->data, msg->size) == HA_FAIL || ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq), sizeof(unsigned long)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGACK, &(request->ack), sizeof(SaMsgAckFlagsT)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } if (request->gname != NULL) { if (ha_msg_add(m, F_MQGROUPNAME, request->gname) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(m);#endif ret = hb->llc_ops->sendnodemsg(hb, m, node); dprintf("%s: node = %s, ops->sendnodemsg: ret = %d\n" , __FUNCTION__, node, ret); if (ret != HA_OK) { dprintf("%s: err = [%s]\n", __FUNCTION__, hb->llc_ops->errmsg(hb)); } ha_msg_del(m); return TRUE;}intsend_mq_reply(mqueue_request_t * request, SaMsgSenderIdT senderId, SaMsgMessageT *msg, cms_data_t * cmsdata){ int ret; ll_cluster_t *hb; struct ha_msg *m; const char *type; const char * node; unsigned long seq; /* we need the seq number in the original sendreceive msg */ node = get_name_by_senderId(senderId, &seq); if (!node) { return FALSE; } type = mqname_type2string(MQNAME_TYPE_REPLY); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_addbin(m, F_SENDRECEIVE, (char *) &(request->sendreceive), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGTYPE, (char *) &msg->type, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGVER, (char *) &msg->version, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSIZE, (char *) &msg->size, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGPRI, (char *) &msg->priority, sizeof(SaUint8T)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGDATA, (char *) msg->data, msg->size) == HA_FAIL || ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSEQ, &seq, sizeof(unsigned long)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGREPLYSEQ, &request->seq, sizeof(unsigned long)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGACK, &(request->ack), sizeof(SaMsgAckFlagsT)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(m);#endif ret = hb->llc_ops->sendnodemsg(hb, m, node); dprintf("%s: node = %s, ops->sendnodemsg: ret = %d\n" , __FUNCTION__, node, ret); ha_msg_del(m); return TRUE;}/* * ack the mqname_send back to the sender's client. */static intmqname_send_ack(mqueue_request_t * request, const char *node, const char *client, const SaErrorT ret, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *m; const char *type; type = mqname_type2string(MQNAME_TYPE_ACK); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq), sizeof(unsigned long)) == HA_FAIL || ha_msg_add(m, F_MQERROR, saerror_type2string(ret)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } dprintf("mqname_send_ack, request->qname = %s\n", request->qname); hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(m);#endif hb->llc_ops->sendnodemsg(hb, m, node); ha_msg_del(m); return TRUE;}intrequest_mqgroup_insert(const char *gname, const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; type = mqname_type2string(MQNAME_TYPE_INSERT); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } else { hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); } ha_msg_del(msg); return SA_OK;}intrequest_mqgroup_remove(const char *gname, const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; type = mqname_type2string(MQNAME_TYPE_REMOVE); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } else { hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); } ha_msg_del(msg); return SA_OK;}/** * reply_mqname_open - process the request message as the master node * for this message queue name * * @hb: heartbeat IPC Channel handle * @msg: received message from heartbeat IPC Channel */intreply_mqname_open(ll_cluster_t *hb, struct ha_msg *msg){ const char *name, *type, *request, *mqhost = NULL; size_t invocation_size, cflag_size, oflag_size, retention_size; const SaInvocationT * invocation = NULL; const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL; const SaTimeT * retention = NULL; const int * policy = NULL; const char * size_string; SaMsgQueueSendingStateT sending_state;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -