⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_membership.c

📁 在LINUX下实现HA的源代码
💻 C
字号:
/* * cms_membership.c: cms daemon membership event handlers * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include <clplumbing/cl_log.h>#include <heartbeat.h>#include "cms_data.h"#include "cms_common.h"#include "cms_membership.h"GList *mqmember_list = NULL;GList *clm_member_list = NULL;static cms_data_t * gcms_data = NULL;static int send_mq_update(const char * host);#define CS_TIMEOUT 1000 /* 1 second */static gintcomp_clm_member(gconstpointer data, gconstpointer user_data){	const SaClmClusterNotificationT * member = data;	const char * node = user_data;	if (member->clusterNode.nodeName.length != strlen(node))		return 1;	return strncmp(member->clusterNode.nodeName.value, node, strlen(node));}static gintcomp_mqmember(gconstpointer data, gconstpointer user_data){	const char * member = data;	const char * node = user_data;	return strcmp(member, node);	}static voiddump_cluster_member(gpointer data, gpointer user_data){	SaClmClusterNotificationT * member = data;	cl_log(LOG_INFO, "%s: nodeid = %ld nodename = %s", __FUNCTION__, 		member->clusterNode.nodeId, member->clusterNode.nodeName.value);}static voiddump_mq_member(gpointer data, gpointer user_data){	cl_log(LOG_INFO, "%s: nodename = %s", __FUNCTION__, (char *) data);}static voidmqname_dump_membership(void){	g_list_foreach(clm_member_list, dump_cluster_member, NULL);	g_list_foreach(mqmember_list, dump_mq_member, NULL);}static intis_cms_online(const char * node){	ll_cluster_t * hb = gcms_data->hb_handle;	const char * status;	assert(hb);	status = hb->llc_ops->client_status(hb, node, CMSID, CS_TIMEOUT);	if (!status) {		cl_log(LOG_ERR, "llc_ops->client_status error for node %s"				"Reason: %s", node, hb->llc_ops->errmsg(hb));		return 0;	}	dprintf("client status on %s = %s\n", node, status);	if (strcmp(status, ONLINESTATUS) == 0) {		return 1;	}	return 0;}int set_cms_status(const char * node, const char * status, void * private){	GList * element;	char * host = ha_strdup(node);	dprintf("set_cms_status: node = %s, status = %s\n", node, status);	if (!g_list_find_custom(clm_member_list, host, comp_clm_member)){		return HA_OK;	}	/* is_cms_online(node); */	element = g_list_find_custom(mqmember_list, host, comp_mqmember);	if (element) {		if (strcmp(status, LEAVESTATUS) == 0 ||		    strcmp(status, OFFLINESTATUS) == 0) {			/*			 * cms server on that node is off line, 			 * take it off from the mqmember list			 */			mqmember_list = g_list_remove_link(mqmember_list			,	element);			/*			 * close all mqueues belongs to this node			 */			mqueue_close_node(element->data);			ha_free(element->data);			g_list_free_1(element);			/*			 * If I am now the only one in the list and 			 * I am still waiting for the update, then set			 * myself to ready.			 */			if (g_list_length(mqmember_list) == 1) {				gcms_data->cms_ready = 1;			}		}	} else {		if (strcmp(status, JOINSTATUS) == 0 ||		    strcmp(status, ONLINESTATUS) == 0) {			mqmember_list = g_list_insert_sorted(mqmember_list			,	host, comp_mqmember);			send_mq_update(node);		}	}	mqname_dump_membership();	return HA_OK;}static intsend_mq_update(const char * node){	char * host;	/* we ourselves just joined, no update needed. */	if (g_list_length(mqmember_list) <= 1) {		return HA_OK;	}	/*	 * always the first node in the list should send out the mq 	 * update.  in case that the new node is the first node, 	 * choose the second node to send out the mq update	 */	host = g_list_nth_data(mqmember_list, 0);	if (strcmp(host, node) == 0) {		host = g_list_nth_data(mqmember_list, 1);	}	/* are we the one that should send out the mq update? */	if (strcmp(host, gcms_data->my_nodeid) != 0) {		return HA_OK;	}	cl_log(LOG_INFO, "%s: host is %s", __FUNCTION__, host);	return request_mqname_update(node, &cms_data);}static voidfree_mqmember(gpointer data, gpointer user_data){	char * node = data;	ha_free(node);}static voidfree_clm_member(gpointer data, gpointer user_data){	SaClmClusterNotificationT * nbuf = data;	ha_free(nbuf);}static intfree_member_lists(void){	if (g_list_length(mqmember_list)) {		g_list_foreach(mqmember_list, free_mqmember, NULL);		g_list_free(mqmember_list);		mqmember_list = NULL;	}	if (g_list_length(clm_member_list)) {		g_list_foreach(clm_member_list, free_clm_member, NULL);		g_list_free(clm_member_list);		clm_member_list = NULL;	}	return HA_OK;}static voiddump_nodeinfo(SaClmClusterNodeT *cn){	dprintf("Dump information from SaClmClusterNodeGet\n");	dprintf("\n");	dprintf("nodeId = %ld\n", cn->nodeId);	dprintf("nodeAddress = %s\n"	,	cn->nodeAddress.length > 0	?	(char *)cn->nodeAddress.value : "N/A");	dprintf("nodeName = %s\n"	,	cn->nodeName.length > 0	?	(char *)cn->nodeName.value : "N/A");	dprintf("clusterName = %s\n"	,	cn->clusterName.length > 0	?	(char *)cn->clusterName.value : "N/A");	dprintf("member = %d\n", cn->member);	dprintf("bootTimestamp = %lld\n", cn->bootTimestamp);	dprintf("\n");}static voidgetnode_callback(SaInvocationT invocation, SaClmClusterNodeT *clusterNode,	SaErrorT error){	if (error != SA_OK) {		cl_log(LOG_ERR, "Get Node Callback failed [%d]\n", error);		/* exit(1); */	}	dprintf("Invocation [%d]\n", invocation);	dump_nodeinfo(clusterNode);}static voidmqclm_track(SaClmClusterNotificationT *nbuf, SaUint32T nitem,	SaUint32T nmem, SaUint64T nview, SaErrorT error){	int i;	SaClmClusterNotificationT *buffer;	char * node;	free_member_lists();	for (i = 0; i < nitem; i++) {		if (nbuf[i].clusterChanges == SA_CLM_NODE_LEFT) {			if (!is_cms_online(nbuf[i].clusterNode.nodeName.value))				continue;			mqueue_close_node(nbuf[i].clusterNode.nodeName.value);		}		buffer = (SaClmClusterNotificationT *)				ha_malloc(sizeof(SaClmClusterNotificationT));		if (!buffer) {			cl_log(LOG_CRIT, "malloc failed for mqclm_track buffer.");			return;		}		*buffer = nbuf[i];		clm_member_list = g_list_append(clm_member_list, buffer);		if (!is_cms_online(nbuf[i].clusterNode.nodeName.value)) {			continue;		}		node = ha_strdup(nbuf[i].clusterNode.nodeName.value);		dprintf("insert node [%s]\n", node);		mqmember_list = g_list_insert_sorted(mqmember_list, node,					comp_mqmember);	}	mqname_dump_membership();}intcms_membership_init(cms_data_t * cmsdata){	SaErrorT ret;	SaClmHandleT handle;	SaSelectionObjectT st;	SaClmClusterNotificationT * nbuf;	SaClmCallbacksT my_callbacks = {		.saClmClusterTrackCallback		=	(SaClmClusterTrackCallbackT)mqclm_track,		.saClmClusterNodeGetCallback		=	(SaClmClusterNodeGetCallbackT)getnode_callback	};	cl_log(LOG_INFO, "initializing with clm...");	gcms_data = cmsdata;	cmsdata->clm_nbuf = NULL;	if ((ret = saClmInitialize(&handle, &my_callbacks, NULL)) != SA_OK) {		cl_log(LOG_ERR, "saClmInitialize error, errno [%d]", ret);		return HA_FAIL;	}	nbuf = (SaClmClusterNotificationT *) ha_malloc(cmsdata->node_count * 				sizeof (SaClmClusterNotificationT));	if (!nbuf) {		cl_log(LOG_CRIT, "malloc failed for notification buffer.");		return HA_FAIL;	}	if (saClmClusterTrackStart(&handle, SA_TRACK_CURRENT, nbuf, 		cmsdata->node_count) != SA_OK) {		cl_log(LOG_ERR, "SA_TRACK_CURRENT error, errno [%d]\n", ret);		ha_free(nbuf);		return HA_FAIL; 	}	if ((ret = saClmSelectionObjectGet(&handle, &st)) != SA_OK) {		cl_log(LOG_ERR, "saClmSelectionObjectGet error, errno [%d]\n", ret);		ha_free(nbuf);		return HA_FAIL;	}	/* Start to track cluster membership changes events */	if (saClmClusterTrackStart(&handle, SA_TRACK_CHANGES, nbuf, 		cmsdata->node_count) != SA_OK) {		cl_log(LOG_ERR, "SA_TRACK_CURRENT error, errno [%d]\n", ret);		ha_free(nbuf);		return HA_FAIL; 	}	cmsdata->clm_handle = handle;	cmsdata->clm_fd = st;	cmsdata->clm_nbuf = nbuf;	if (g_list_length(mqmember_list) == 1) {		cmsdata->cms_ready = 1;	} else {		cmsdata->cms_ready = 0;	}	return HA_OK;}voidcms_membership_finalize(SaClmHandleT * handle){	saClmFinalize(handle);	return;}intcms_membership_dispatch(SaClmHandleT * handle, SaDispatchFlagsT flags){	SaErrorT ret;	if ((ret = saClmDispatch(handle, flags)) != SA_OK) {		if (ret == SA_ERR_LIBRARY) {			cl_log(LOG_ERR, "cms: evicted by membership!");			return HA_FAIL;		} else if (ret == SA_ERR_BAD_HANDLE) {			cl_log(LOG_ERR, "cms: Membership Service not "				"available, errno [%d]", ret);			return HA_FAIL;		}		cl_log(LOG_ERR, "cms: saClmDispatch error, errno [%d]", ret);		return HA_FAIL;	}	return HA_OK;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -