⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
/* * cms_cluster.c: cms daemon cluster operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include <cl_log.h>#include <heartbeat.h>#include "cms_common.h"#define MQ_MEMBER_COUNT	g_list_length(mqmember_list)#define RETENTION_TIME_EXPIRES(mq) (				    \		mq->status.creationFlags != SA_MSG_QUEUE_PERSISTENT \		&&	mq->status.closeTime > 0		    \		&&	get_current_satime() - mq->status.closeTime \			>= mq->status.retentionTime)GHashTable * mq_open_pending_hash;GHashTable * mq_status_pending_hash;GHashTable * mq_ack_pending_hash;GHashTable * mq_reply_pending_hash;struct reply_info {	unsigned long seq;	char * node;};int gReplyCount = 1; static intget_senderId_by_name(const char * node, int seq){	struct reply_info * info;	int * senderId;	info = (struct reply_info *) ha_malloc(sizeof(struct reply_info));	info->seq = seq;	info->node = ha_strdup(node);	senderId = (int *) ha_malloc(sizeof(int));	*senderId = ++gReplyCount;	g_hash_table_insert(mq_reply_pending_hash, senderId, info);	return *senderId;}static const char *get_name_by_senderId(int senderId, unsigned long * seq){	gboolean found;	struct reply_info * info;	int * orig_id;	static char node[SA_MAX_NAME_LENGTH];	*seq = 0;	memset(node, 0, SA_MAX_NAME_LENGTH);	found = g_hash_table_lookup_extended(mq_reply_pending_hash,			&senderId, (gpointer) &orig_id, (gpointer) &info);	if (found) {		strncpy(node, info->node, SA_MAX_NAME_LENGTH);		*seq = info->seq;		dprintf("%s: found the reply_info: node = %s, seq = %ld\n", __FUNCTION__, node, *seq);		ha_free(info);		ha_free(orig_id);		return node;	}	return NULL;}/** * cluster_hash_table_init - initialize local message queue database */intcluster_hash_table_init(){	mqueue_table_init();	mq_open_pending_hash = g_hash_table_new(g_str_hash, g_str_equal);	mq_status_pending_hash = g_hash_table_new(g_str_hash, g_str_equal);	mq_ack_pending_hash = g_hash_table_new(g_int_hash, g_int_equal);	mq_reply_pending_hash = g_hash_table_new(g_int_hash, g_int_equal);	return HA_OK;}/** * request_mqname_open - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * If the request is sent out successfully, it returns TRUE; otherwise * returns FALSE. This call is non-blocking. *//* * The algorithm for selecting the master node is the following: *  * index = g_str_hash(request->qname) % MQ_MEMBER_COUNT; * tonode = g_list_nth_data(mqmember_list, (guint)index); * * Every node could potentially be the mater node for a queue.  * The purpose of this algorithm is for load-balancing.  * */intrequest_mqname_open(mqueue_request_t * request, cms_data_t * cmsdata){	int ret;	ll_cluster_t *hb;	struct ha_msg *msg;	const char *tonode, *type;	int index, i;	char *data, *p;	char size_string[64];	index = g_str_hash(request->qname) % MQ_MEMBER_COUNT;	dprintf("index = %d, total = %d\n", index, MQ_MEMBER_COUNT);	data = g_list_nth_data(mqmember_list, (guint)index);	assert(data);	tonode = data;	type = mqname_type2string(MQNAME_TYPE_REQUEST);	for (p = size_string, i = 0; i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++) {		p += sprintf(p, "%d:", request->size[i]);	}	dprintf("%s: tonode is [%s]\n", __FUNCTION__, tonode);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(msg, F_MQREQUEST, 			cmsrequest_type2string(request->request_type))			== HA_FAIL	||	ha_msg_add(msg, F_MQNAME, request->qname) == HA_FAIL	||	ha_msg_addbin(msg, F_MQPOLICY, &(request->policy), sizeof(int))			== HA_FAIL	||	ha_msg_addbin(msg, F_MQINVOCATION, &(request->invocation), 			sizeof(int)) == HA_FAIL	||	ha_msg_addbin(msg, F_MQCREATEFLAG, &(request->create_flag),			sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL	||	ha_msg_addbin(msg, F_MQOPENFLAG, &(request->open_flag),			sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL	||	ha_msg_addbin(msg, F_MQRETENTION, &(request->retention),			sizeof(SaTimeT)) == HA_FAIL	||	ha_msg_add(msg, F_MQSIZE, size_string) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ret = FALSE;	} else {		hb = cmsdata->hb_handle;		hb->llc_ops->sendnodemsg(hb, msg, tonode);		ret = TRUE;	}	ha_msg_del(msg);	return ret;}/** * request_mqname_close - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_close(const char *name, cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	CMS_TRACE();	type = mqname_type2string(MQNAME_TYPE_CLOSE);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(msg);		return FALSE;	}	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(msg);#endif	hb->llc_ops->sendclustermsg(hb, msg);	ha_msg_del(msg);	return SA_OK;}/** * request_mqname_unlink - apply for a message queue in the cluster * @name: message queue name * @cmsdata: pointer to cms daemon private data struct * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_unlink(const char *name, cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	type = mqname_type2string(MQNAME_TYPE_UNLINK);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} else {		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER		cl_log_message(msg);#endif		hb->llc_ops->sendclustermsg(hb, msg);	}	ha_msg_del(msg);	return SA_OK;}/** * request_mqname_send - send a message to a message queue in the cluster * * Always returns TRUE. This call is non-blocking. */intrequest_mqname_send(mqueue_request_t * request, const char *node,		    const char * client, SaMsgMessageT *msg,		    cms_data_t * cmsdata){	int ret;	ll_cluster_t *hb;	struct ha_msg *m;	const char *type;	type = mqname_type2string(MQNAME_TYPE_SEND);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	|| 	ha_msg_add(m, F_MQREQUEST, 		    cmsrequest_type2string(request->request_type)) == HA_FAIL	||	ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL	||	ha_msg_addbin(m, F_SENDRECEIVE, (char *) &(request->sendreceive),			sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGTYPE, (char *) &msg->type, 			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGVER, (char *) &msg->version,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSIZE, (char *) &msg->size,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGPRI, (char *) &msg->priority,			sizeof(SaUint8T)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGDATA, (char *) msg->data, 			msg->size) == HA_FAIL	||	ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation),		    	sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq),		    	sizeof(unsigned long)) == HA_FAIL	|| 	ha_msg_addbin(m, F_MQMSGACK, &(request->ack), 		    	sizeof(SaMsgAckFlagsT)) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	} 	if (request->gname != NULL) {		if (ha_msg_add(m, F_MQGROUPNAME, request->gname) == HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			ha_msg_del(m);			return FALSE;		}	}		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(m);#endif	ret = hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("%s: node = %s, ops->sendnodemsg: ret = %d\n"	,	__FUNCTION__, node, ret);	if (ret != HA_OK) {		dprintf("%s: err = [%s]\n", __FUNCTION__, hb->llc_ops->errmsg(hb));	}	ha_msg_del(m);	return TRUE;}intsend_mq_reply(mqueue_request_t * request, 		  SaMsgSenderIdT senderId,		  SaMsgMessageT *msg, 		  cms_data_t * cmsdata){	int ret;	ll_cluster_t *hb;	struct ha_msg *m;	const char *type;	const char * node;	unsigned long seq;	/* we need the seq number in the original sendreceive msg */	node = get_name_by_senderId(senderId, &seq);	if (!node) {		return FALSE;	}	type = mqname_type2string(MQNAME_TYPE_REPLY);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	|| 	ha_msg_add(m, F_MQREQUEST, 		    cmsrequest_type2string(request->request_type)) == HA_FAIL	||	ha_msg_addbin(m, F_SENDRECEIVE, (char *) &(request->sendreceive),			sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGTYPE, (char *) &msg->type, 			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGVER, (char *) &msg->version,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSIZE, (char *) &msg->size,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGPRI, (char *) &msg->priority,			sizeof(SaUint8T)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGDATA, (char *) msg->data, 			msg->size) == HA_FAIL	||	ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation),		    	sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSEQ, &seq,		    	sizeof(unsigned long)) == HA_FAIL	|| 	ha_msg_addbin(m, F_MQMSGREPLYSEQ, &request->seq,			sizeof(unsigned long)) == HA_FAIL	|| 	ha_msg_addbin(m, F_MQMSGACK, &(request->ack), 		    	sizeof(SaMsgAckFlagsT)) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	} 	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(m);#endif	ret = hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("%s: node = %s, ops->sendnodemsg: ret = %d\n"	,	__FUNCTION__, node, ret);	ha_msg_del(m);	return TRUE;}/* * ack the mqname_send back to the sender's client. */static intmqname_send_ack(mqueue_request_t * request, const char *node,		const char *client, const SaErrorT ret, cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *m;	const char *type;	type = mqname_type2string(MQNAME_TYPE_ACK);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(m, F_MQNAME, request->qname) == HA_FAIL	|| 	ha_msg_add(m, F_MQREQUEST, 		    cmsrequest_type2string(request->request_type)) == HA_FAIL	||	ha_msg_addbin(m, F_MQINVOCATION, &(request->invocation),		    sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSEQ, &(request->seq),		    sizeof(unsigned long)) == HA_FAIL	||	ha_msg_add(m, F_MQERROR, saerror_type2string(ret)) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} 	dprintf("mqname_send_ack, request->qname = %s\n", request->qname);	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(m);#endif	hb->llc_ops->sendnodemsg(hb, m, node);	ha_msg_del(m);	return TRUE;}intrequest_mqgroup_insert(const char *gname, const char *name,		       cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	type = mqname_type2string(MQNAME_TYPE_INSERT);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL	||	ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} else {		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER		cl_log_message(msg);#endif		hb->llc_ops->sendclustermsg(hb, msg);	}	ha_msg_del(msg);	return SA_OK;}intrequest_mqgroup_remove(const char *gname, const char *name,		       cms_data_t * cmsdata){	ll_cluster_t *hb;	struct ha_msg *msg;	const char *type;	type = mqname_type2string(MQNAME_TYPE_REMOVE);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL	||	ha_msg_add(msg, F_MQGROUPNAME, gname) == HA_FAIL	||	ha_msg_add(msg, F_MQNAME, name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	} else {		hb = cmsdata->hb_handle;#if DEBUG_CLUSTER		cl_log_message(msg);#endif		hb->llc_ops->sendclustermsg(hb, msg);	}	ha_msg_del(msg);	return SA_OK;}/** * reply_mqname_open - process the request message as the master node *		       for this message queue name * * @hb:		heartbeat IPC Channel handle * @msg:	received message from heartbeat IPC Channel */intreply_mqname_open(ll_cluster_t *hb, struct ha_msg *msg){	const char *name, *type, *request, *mqhost = NULL;	size_t  invocation_size, cflag_size, oflag_size, retention_size;	const SaInvocationT * invocation = NULL;	const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL;	const SaTimeT * retention = NULL;	const int * policy = NULL;	const char * size_string;	SaMsgQueueSendingStateT sending_state;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -