⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
/* * cms_cluster.c: cms daemon cluster operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include <cl_log.h>#include <heartbeat.h>#include "cms_common.h"#define MQ_MEMBER_COUNT	g_list_length(mqmember_list)#define RETENTION_TIME_EXPIRES(mq) (				    \		mq->status.creationFlags != SA_MSG_QUEUE_PERSISTENT \		&&	mq->status.closeTime > 0		    \		&&	get_current_satime() - mq->status.closeTime \			>= mq->status.retentionTime)GHashTable * mq_open_pending_hash;GHashTable * mq_status_pending_hash;GHashTable * mq_ack_pending_hash;char mqname_type_str[MQNAME_TYPE_LAST][TYPESTRSIZE] = {	"",	"mqinit",	"mqrequest",	"mqgranted",	"mqreopen",	"mqdenied",	"mqclose",	"mqunlink",	"mqsend",	"mqinsert",	"mqremove",	"mqmsgack",	"mqinfoupdate",	"mqreopenmsgfeed",	"mqmsgfeedend",	"mqstatusrequest",	"mqstatusreply",	""};char cmsrequest_type_str[CMS_TYPE_TOTAL][TYPESTRSIZE] = {	"cms_qstatus",	"cms_qopen",	"cms_qopenasync",	"cms_qclose",	"cms_qunlink",	"cms_msend",	"cms_msendasync",	"cms_mack",	"cms_mget",	"cms_mreceivedget",	"cms_qg_creat",	"cms_qg_delete",	"cms_qg_insert",	"cms_qg_remove",	"cms_qg_track_start",	"cms_qg_track_stop",	"cms_qg_notify",	"cms_msg_notify",	"cms_msg_request"};char sa_errortype_str[SA_ERR_BAD_FLAGS + 2][TYPESTRSIZE] = {	"",	"sa_ok",	"sa_err_library",	"sa_err_version",	"sa_err_init",	"sa_err_timeout",	"sa_err_try_again",	"sa_err_invalid_param",	"sa_err_no_memory",	"sa_err_bad_handle",	"sa_err_busy",	"sa_err_access",	"sa_err_not_exist",	"sa_err_name_too_long",	"sa_err_exist",	"sa_err_no_space",	"sa_err_interrupt",	"sa_err_system",	"sa_err_name_not_found",	"sa_err_no_resources",	"sa_err_not_supported",	"sa_err_bad_operation",	"sa_err_failed_operation",	"sa_err_message_error",	"sa_err_no_message",	"sa_err_queue_full",	"sa_err_queue_not_available",	"sa_err_bad_checkpoint",	"sa_err_bad_flags",	""};/** * cluster_hash_table_init - initialize local message queue database */intcluster_hash_table_init(){	mqueue_table_init();	mq_open_pending_hash = g_hash_table_new(g_str_hash, g_str_equal);	mq_status_pending_hash = g_hash_table_new(g_str_hash, g_str_equal);	mq_ack_pending_hash = g_hash_table_new(g_int_hash, g_int_equal);	return HA_OK;}/** * request_mqname_open - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * If the request is sent out successfully, it returns TRUE; otherwise * returns FALSE. This call is non-blocking. */intrequest_mqname_open(mqueue_request_t * request, cms_data_t * cmsdata){	int ret;	ll_cluster_t *hb;	struct ha_msg *msg;	const char *tonode, *type;	int index, i;	char *data, *p;	char size_string[64];	index = g_str_hash(request->qname) % MQ_MEMBER_COUNT;	dprintf("index = %d, total = %d\n", index, MQ_MEMBER_COUNT);	data = g_list_nth_data(mqmember_list, (guint)index);	assert(data);	tonode = data;	type = mqname_type2string(MQNAME_TYPE_REQUEST);	for (p = size_string, i = 0; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) {		p += sprintf(p, "%d:", request->size[i]);	}	dprintf("%s: tonode is [%s]\n", __FUNCTION__, tonode);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(msg, F_MQREQUEST, 			cmsrequest_type2string(request->request_type))			== HA_FAIL	||	ha_msg_add(msg, F_MQNAME, request->qname) == HA_FAIL	||	ha_msg_addbin(msg, F_MQPOLICY, &(request->policy), sizeof(int))			== HA_FAIL	||	ha_msg_addbin(msg, F_MQINVOCATION, &(request->invocation), 			sizeof(int)) == HA_FAIL	||	ha_msg_addbin(msg, F_MQCREATEFLAG, &(request->create_flag),			sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL	||	ha_msg_addbin(msg, F_MQOPENFLAG, &(request->open_flag),			sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL	||	ha_msg_addbin(msg, F_MQRETENTION, &(request->retention),			sizeof(SaTimeT)) == HA_FAIL	||	ha_msg_add(msg, F_MQSIZE, size_string) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ret = FALSE;	} else {		hb = cmsdata->hb_handle;		hb->llc_ops->sendnodemsg(hb, msg, tonode);		ret = TRUE;	}	ha_msg_del(msg);	return ret;}/** * request_mqname_close - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_close(const char *name, cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	CMS_TRACE();	type = mqname_type2string(MQNAME_TYPE_CLOSE);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(msg);#endif	hb->llc_ops->sendclustermsg(hb, msg);	ha_msg_del(msg);	return SA_OK;}/** * request_mqname_unlink - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_unlink(const char *name, cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	type = mqname_type2string(MQNAME_TYPE_UNLINK);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} else {		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER		cl_log_message(msg);#endif		hb->llc_ops->sendclustermsg(hb, msg);	}	ha_msg_del(msg);	return SA_OK;}/** * request_mqname_send - send a message to a message queue in the cluster * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_send(mqueue_request_t * request, const char *node,		    const char * client, SaMsgMessageT *msg,		    cms_data_t * cmsdata){	int ret;	ll_cluster_t *hb;	struct ha_msg *m;	const char *type;	type = mqname_type2string(MQNAME_TYPE_SEND);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	|| 	ha_msg_add(m, F_MQREQUEST, 		    cmsrequest_type2string(request->request_type)) == HA_FAIL	||	ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGTYPE, (char *) &msg->type, 			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGVER, (char *) &msg->version,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSIZE, (char *) &msg->size,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGPRI, (char *) &msg->priority,			sizeof(SaUint8T)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGDATA, (char *) msg->data, 			msg->size) == HA_FAIL	||	ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation),		    	sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq),		    	sizeof(unsigned long)) == HA_FAIL	|| 	ha_msg_addbin(m, F_MQMSGACK, &(request->ack), 		    	sizeof(SaMsgAckFlagsT)) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	} 	if (request->gname != NULL) {		if (ha_msg_add(m, F_MQGROUPNAME, request->gname) == HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			ha_msg_del(m);			return FALSE;		}	}		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(m);#endif	ret = hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("%s: node = %s, ops->sendnodemsg: ret = %d\n"	,	__FUNCTION__, node, ret);	ha_msg_del(m);	return SA_OK;}/* * ack the mqname_send back to the sender's client. */static intmqname_send_ack(mqueue_request_t * request, const char *node,		const char *client, const SaErrorT ret, cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *m;	const char *type;	type = mqname_type2string(MQNAME_TYPE_ACK);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL	|| 	ha_msg_add(m, F_MQREQUEST, 		    cmsrequest_type2string(request->request_type)) == HA_FAIL	||	ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation),		    sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq),		    sizeof(unsigned long)) == HA_FAIL	||	ha_msg_add(m, F_MQERROR, saerror_type2string(ret)) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} 	dprintf("mqname_send_ack, request->qname = %s\n", request->qname);	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(m);#endif	hb->llc_ops->sendnodemsg(hb, m, node);	ha_msg_del(m);	return TRUE;}intrequest_mqgroup_insert(const char *gname, const char *name,		       cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	type = mqname_type2string(MQNAME_TYPE_INSERT);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL	||	ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} else {		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER		cl_log_message(msg);#endif		hb->llc_ops->sendclustermsg(hb, msg);	}	ha_msg_del(msg);	return SA_OK;}intrequest_mqgroup_remove(const char *gname, const char *name,		       cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	type = mqname_type2string(MQNAME_TYPE_REMOVE);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL	||	ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} else {		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER		cl_log_message(msg);#endif		hb->llc_ops->sendclustermsg(hb, msg);	}	ha_msg_del(msg);	return SA_OK;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -