📄 cms_cluster.c
字号:
/* * cms_cluster.c: cms daemon cluster operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include <cl_log.h>#include <heartbeat.h>#include "cms_common.h"#define MQ_MEMBER_COUNT g_list_length(mqmember_list)#define RETENTION_TIME_EXPIRES(mq) ( \ mq->status.creationFlags != SA_MSG_QUEUE_PERSISTENT \ && mq->status.closeTime > 0 \ && get_current_satime() - mq->status.closeTime \ >= mq->status.retentionTime)GHashTable * mq_open_pending_hash;GHashTable * mq_status_pending_hash;GHashTable * mq_ack_pending_hash;char mqname_type_str[MQNAME_TYPE_LAST][TYPESTRSIZE] = { "", "mqinit", "mqrequest", "mqgranted", "mqreopen", "mqdenied", "mqclose", "mqunlink", "mqsend", "mqinsert", "mqremove", "mqmsgack", "mqinfoupdate", "mqreopenmsgfeed", "mqmsgfeedend", "mqstatusrequest", "mqstatusreply", ""};char cmsrequest_type_str[CMS_TYPE_TOTAL][TYPESTRSIZE] = { "cms_qstatus", "cms_qopen", "cms_qopenasync", "cms_qclose", "cms_qunlink", "cms_msend", "cms_msendasync", "cms_mack", "cms_mget", "cms_mreceivedget", "cms_qg_creat", "cms_qg_delete", "cms_qg_insert", "cms_qg_remove", "cms_qg_track_start", "cms_qg_track_stop", "cms_qg_notify", "cms_msg_notify", "cms_msg_request"};char sa_errortype_str[SA_ERR_BAD_FLAGS + 2][TYPESTRSIZE] = { "", "sa_ok", "sa_err_library", "sa_err_version", "sa_err_init", "sa_err_timeout", "sa_err_try_again", "sa_err_invalid_param", "sa_err_no_memory", "sa_err_bad_handle", "sa_err_busy", "sa_err_access", "sa_err_not_exist", "sa_err_name_too_long", "sa_err_exist", "sa_err_no_space", "sa_err_interrupt", "sa_err_system", "sa_err_name_not_found", "sa_err_no_resources", "sa_err_not_supported", "sa_err_bad_operation", "sa_err_failed_operation", "sa_err_message_error", "sa_err_no_message", "sa_err_queue_full", "sa_err_queue_not_available", "sa_err_bad_checkpoint", "sa_err_bad_flags", ""};/** * cluster_hash_table_init - initialize local message queue database */intcluster_hash_table_init(){ mqueue_table_init(); mq_open_pending_hash = g_hash_table_new(g_str_hash, g_str_equal); mq_status_pending_hash = g_hash_table_new(g_str_hash, g_str_equal); mq_ack_pending_hash = g_hash_table_new(g_int_hash, g_int_equal); return HA_OK;}/** * request_mqname_open - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * If the request is sent out successfully, it returns TRUE; otherwise * returns FALSE. This call is non-blocking. */intrequest_mqname_open(mqueue_request_t * request, cms_data_t * cmsdata){ int ret; ll_cluster_t *hb; struct ha_msg *msg; const char *tonode, *type; int index, i; char *data, *p; char size_string[64]; index = g_str_hash(request->qname) % MQ_MEMBER_COUNT; dprintf("index = %d, total = %d\n", index, MQ_MEMBER_COUNT); data = g_list_nth_data(mqmember_list, (guint)index); assert(data); tonode = data; type = mqname_type2string(MQNAME_TYPE_REQUEST); for (p = size_string, i = 0; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) { p += sprintf(p, "%d:", request->size[i]); } dprintf("%s: tonode is [%s]\n", __FUNCTION__, tonode); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_add(msg, F_MQNAME, request->qname) == HA_FAIL || ha_msg_addbin(msg, F_MQPOLICY, &(request->policy), sizeof(int)) == HA_FAIL || ha_msg_addbin(msg, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(msg, F_MQCREATEFLAG, &(request->create_flag), sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL || ha_msg_addbin(msg, F_MQOPENFLAG, &(request->open_flag), sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL || ha_msg_addbin(msg, F_MQRETENTION, &(request->retention), sizeof(SaTimeT)) == HA_FAIL || ha_msg_add(msg, F_MQSIZE, size_string) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ret = FALSE; } else { hb = cmsdata->hb_handle; hb->llc_ops->sendnodemsg(hb, msg, tonode); ret = TRUE; } ha_msg_del(msg); return ret;}/** * request_mqname_close - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_close(const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; CMS_TRACE(); type = mqname_type2string(MQNAME_TYPE_CLOSE); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); ha_msg_del(msg); return SA_OK;}/** * request_mqname_unlink - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_unlink(const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; type = mqname_type2string(MQNAME_TYPE_UNLINK); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } else { hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); } ha_msg_del(msg); return SA_OK;}/** * request_mqname_send - send a message to a message queue in the cluster * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_send(mqueue_request_t * request, const char *node, const char * client, SaMsgMessageT *msg, cms_data_t * cmsdata){ int ret; ll_cluster_t *hb; struct ha_msg *m; const char *type; type = mqname_type2string(MQNAME_TYPE_SEND); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL || ha_msg_addbin(m, F_MQMSGTYPE, (char *) &msg->type, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGVER, (char *) &msg->version, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSIZE, (char *) &msg->size, sizeof(SaSizeT)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGPRI, (char *) &msg->priority, sizeof(SaUint8T)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGDATA, (char *) msg->data, msg->size) == HA_FAIL || ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq), sizeof(unsigned long)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGACK, &(request->ack), sizeof(SaMsgAckFlagsT)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } if (request->gname != NULL) { if (ha_msg_add(m, F_MQGROUPNAME, request->gname) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); ha_msg_del(m); return FALSE; } } hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(m);#endif ret = hb->llc_ops->sendnodemsg(hb, m, node); dprintf("%s: node = %s, ops->sendnodemsg: ret = %d\n" , __FUNCTION__, node, ret); ha_msg_del(m); return SA_OK;}/* * ack the mqname_send back to the sender's client. */static intmqname_send_ack(mqueue_request_t * request, const char *node, const char *client, const SaErrorT ret, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *m; const char *type; type = mqname_type2string(MQNAME_TYPE_ACK); if ((m = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(m, F_TYPE, type) == HA_FAIL || ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL || ha_msg_add(m, F_MQREQUEST, cmsrequest_type2string(request->request_type)) == HA_FAIL || ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation), sizeof(int)) == HA_FAIL || ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq), sizeof(unsigned long)) == HA_FAIL || ha_msg_add(m, F_MQERROR, saerror_type2string(ret)) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } dprintf("mqname_send_ack, request->qname = %s\n", request->qname); hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(m);#endif hb->llc_ops->sendnodemsg(hb, m, node); ha_msg_del(m); return TRUE;}intrequest_mqgroup_insert(const char *gname, const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; type = mqname_type2string(MQNAME_TYPE_INSERT); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } else { hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); } ha_msg_del(msg); return SA_OK;}intrequest_mqgroup_remove(const char *gname, const char *name, cms_data_t * cmsdata){ ll_cluster_t *hb; struct ha_msg *msg; const char *type; type = mqname_type2string(MQNAME_TYPE_REMOVE); if ((msg = ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__); return FALSE; } if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL || ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL || ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) { cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__); return FALSE; } else { hb = cmsdata->hb_handle;#if DEBUG_CLUSTER cl_log_message(msg);#endif hb->llc_ops->sendclustermsg(hb, msg); } ha_msg_del(msg); return SA_OK;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -