⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ccmlib_clm.c

📁 在LINUX下实现HA的源代码
💻 C
字号:
/*  * libclm.c: SAForum AIS Membership Service library * * Copyright (c) 2003 Intel Corp. * Author: Zhu Yi (yi.zhu@intel.com) *  * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA * */#include <portability.h>#include <strings.h>#include <unistd.h>#include <glib.h>#include <errno.h>#include <assert.h>#include <ocf/oc_event.h>#include <clplumbing/cl_log.h>#include <saf/ais.h>#ifdef POSIX_THREADS#  include <pthread.h>#endif#include <sys/time.h>#include <string.h>#define CLM_TRACK_STOP 0#define CLM_DEBUG 0#define GET_CLM_HANDLE(x) (__clm_handle_t *)g_hash_table_lookup(__handle_hash,x)typedef struct __clm_handle_s {	oc_ev_t *ev_token;	SaClmCallbacksT callbacks;	SaSelectionObjectT fd;	SaUint8T trackflags;	SaUint32T itemnum;	SaClmClusterNotificationT *nbuf;	SaSelectionObjectT st;} __clm_handle_t;static GHashTable *__handle_hash = NULL;static guint __handle_counter = 0;static const oc_ev_membership_t *__ccm_data = NULL;static oc_ev_t __ccm_event = OC_EV_MS_INVALID;static void *__ccm_cookie = NULL;#ifdef POSIX_THREADS	static pthread_mutex_t __clmlib_mutex = PTHREAD_MUTEX_INITIALIZER;#endifstatic void pthread_lock(void);static void pthread_unlock(void);static void clm_init(void);extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t , int );static void retrieve_current_buffer(__clm_handle_t *hd);static void retrieve_changes_buffer(__clm_handle_t *hd);static void retrieve_changes_only_buffer(__clm_handle_t *hd);static SaErrorT retrieve_node_buffer(SaClmNodeIdT nodeId,		SaClmClusterNodeT *clusterNode);static void pthread_lock(){#ifdef POSIX_THREADS	pthread_mutex_lock(&__clmlib_mutex);#endif}static void pthread_unlock(){#ifdef POSIX_THREADS	pthread_mutex_unlock(&__clmlib_mutex);#endif}static voidclm_init(){	static gboolean clminit_flag = FALSE;	if (clminit_flag == FALSE) {		__handle_hash = g_hash_table_new(g_int_hash		,	g_int_equal);		clminit_flag = TRUE;	}	return;}static voidccm_events(oc_ed_t event, void *cookie, size_t size, const void *data){	pthread_lock();	/* dereference old cache */	if (__ccm_cookie)		oc_ev_callback_done(__ccm_cookie);	__ccm_cookie = cookie;	__ccm_event = event;	__ccm_data = (const oc_ev_membership_t *)data;#if CLM_DEBUG	cl_log(LOG_DEBUG, "__ccm_data = <0x%x>"	,	(unsigned int)data);#endif	pthread_unlock();	if (event == OC_EV_MS_EVICTED || event == OC_EV_MS_NOT_PRIMARY	||	event == OC_EV_MS_PRIMARY_RESTORED) {		/* We do not care about this info */		return;	}	if (!data) {		cl_log(LOG_ERR, "CCM event callback return NULL data");		return;	}	/*	 * Note: No need to worry about the buffer free problem, OCF	 * callback mechanism did this for us.	 */}SaErrorT saClmInitialize(SaClmHandleT *clmHandle, const SaClmCallbacksT *clmCallbacks,                const SaVersionT *version){	int ret;	oc_ev_t *ev_token;	__clm_handle_t *hd;	__clm_handle_t **phd;	SaClmHandleT *hash_key;	fd_set rset;	struct timeval tv;	oc_ev_register(&ev_token);	if ((ret = oc_ev_set_callback(ev_token, OC_EV_MEMB_CLASS	,	ccm_events, NULL)) != 0) {		if (ret == ENOMEM)			return SA_ERR_NO_MEMORY;		else			assert(0);	/* Never runs here */	}	/* We must call it to get non-quorum partition info */	oc_ev_special(ev_token, OC_EV_MEMB_CLASS, 0);	clm_init();	phd = (__clm_handle_t **)g_malloc(sizeof(__clm_handle_t *));	if (!phd)		return SA_ERR_NO_MEMORY;	hash_key = (SaClmHandleT *)g_malloc(sizeof(SaClmHandleT));	if (!hash_key)		return SA_ERR_NO_MEMORY;	*phd = (__clm_handle_t *)g_malloc(sizeof(__clm_handle_t));	hd = *phd;	if (!hd)		return SA_ERR_NO_MEMORY;	*clmHandle = __handle_counter++;	*hash_key = *clmHandle;	hd->ev_token = ev_token;	hd->callbacks = *clmCallbacks;	hd->trackflags = CLM_TRACK_STOP;	cl_log(LOG_INFO, "g_hash_table_insert hd = [%p]", hd);	g_hash_table_insert(__handle_hash, hash_key, hd);	if ((ret = oc_ev_activate(hd->ev_token, &hd->fd)) != 0) {		cl_log(LOG_ERR, "oc_ev_activate error [%d]", ret);		return SA_ERR_LIBRARY;	}	/* Prepare information for saClmClusterNodeGet() series calls */	FD_ZERO(&rset);	FD_SET(hd->fd, &rset);	tv.tv_sec = 2;	tv.tv_usec = 0;	if ((ret = select(hd->fd + 1, &rset, NULL, NULL, &tv)) == -1) {		cl_log(LOG_ERR, "%s: select error [%d]"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	} else if (ret == 0) {		cl_log(LOG_WARNING, "%s: select timeout", __FUNCTION__);		return SA_ERR_TIMEOUT;	}	if ((ret = oc_ev_handle_event(hd->ev_token) != 0)) {		cl_log(LOG_ERR, "%s: oc_ev_handle_event error [%d]"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	return SA_OK;}SaErrorT saClmSelectionObjectGet(const SaClmHandleT *clmHandle,                         SaSelectionObjectT *selectionObject){	__clm_handle_t *hd = GET_CLM_HANDLE(clmHandle);	if (!hd)		return SA_ERR_BAD_HANDLE;	*selectionObject = hd->fd;	return SA_OK;}#define MEMCHANGE(x)	hd->nbuf[x].clusterChanges#define MEMNODE(x)	hd->nbuf[x].clusterNodestatic voidset_misc_node_info(SaClmClusterNodeT *cn){	cn->nodeAddress.length = 0;	cn->nodeAddress.value[0] = '\0';	cn->nodeName.length = strlen((char*)cn->nodeName.value);	cn->clusterName.length = 0;	cn->clusterName.value[0] = '\0';	cn->bootTimestamp = 0;}static voidretrieve_current_buffer(__clm_handle_t *hd){	uint i;	char *p;	const oc_ev_membership_t *oc = __ccm_data;	for (i = 0; i < oc->m_n_member; i++) {		MEMCHANGE(i) = SA_CLM_NODE_NO_CHANGE;		MEMNODE(i).nodeId = oc->m_array[oc->m_memb_idx+i].node_id;		MEMNODE(i).member = 1;		p = oc->m_array[oc->m_memb_idx+i].node_uname;		if (p) {			strncpy((char*)MEMNODE(i).nodeName.value, p, 					SA_MAX_NAME_LENGTH - 1);			MEMNODE(i).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0';		} else {			MEMNODE(i).nodeName.value[0] = '\0';		}		set_misc_node_info(&MEMNODE(i));	}}static voidretrieve_changes_buffer(__clm_handle_t *hd){	uint i, j;	int n;	char *p;	const oc_ev_membership_t *oc = __ccm_data;	retrieve_current_buffer(hd);	for (i = 0; i < oc->m_n_in; i++) {		for (j = 0; j < oc->m_n_member; j++) {			if (MEMNODE(j).nodeId			==	oc->m_array[oc->m_in_idx+i].node_id) {				MEMCHANGE(j) = SA_CLM_NODE_JOINED;				p = oc->m_array[oc->m_in_idx+i].node_uname;				if (p) {					strncpy((char*)MEMNODE(j).nodeName.value, p, 							SA_MAX_NAME_LENGTH-1);					MEMNODE(j).nodeName.value \						[SA_MAX_NAME_LENGTH-1] = '\0';				} else {					MEMNODE(j).nodeName.value[0] = '\0';				}				break;			}		}		assert(j < oc->m_n_member); /* must find new in all */	}	for (j = 0, n = oc->m_n_member; j < oc->m_n_out; j++, n++) {		MEMCHANGE(n) = SA_CLM_NODE_LEFT;		MEMNODE(n).nodeId = oc->m_array[oc->m_out_idx+j].node_id;		MEMNODE(n).member = 0;		p = oc->m_array[oc->m_out_idx+j].node_uname;		if (p) {			strncpy((char*)MEMNODE(n).nodeName.value, p,					SA_MAX_NAME_LENGTH - 1);			MEMNODE(n).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0';		} else {			MEMNODE(n).nodeName.value[0] = '\0';		}		set_misc_node_info(&MEMNODE(n));	}}static voidretrieve_changes_only_buffer(__clm_handle_t *hd){	uint i;	int n;	char *p;	const oc_ev_membership_t *oc = __ccm_data;	for (i = 0, n = 0; i < oc->m_n_in; i++, n++) {		MEMCHANGE(n) = SA_CLM_NODE_JOINED;		MEMNODE(n).nodeId = oc->m_array[oc->m_in_idx+i].node_id;		MEMNODE(n).member = 1;		p = oc->m_array[oc->m_in_idx+i].node_uname;		if (p) {			strncpy((char*)MEMNODE(n).nodeName.value, p,					SA_MAX_NAME_LENGTH - 1);			MEMNODE(n).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0';		} else {			MEMNODE(n).nodeName.value[0] = '\0';		}		set_misc_node_info(&MEMNODE(n));	}	for (i = 0; i < oc->m_n_out; i++, n++) {		MEMCHANGE(n) = SA_CLM_NODE_LEFT;		MEMNODE(n).nodeId = oc->m_array[oc->m_out_idx+i].node_id;		MEMNODE(n).member = 0;		p = oc->m_array[oc->m_out_idx+i].node_uname;		if (p) {			strncpy((char*)MEMNODE(n).nodeName.value, p,					SA_MAX_NAME_LENGTH - 1);			MEMNODE(n).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0';		} else {			MEMNODE(n).nodeName.value[0] = '\0';		}		set_misc_node_info(&MEMNODE(n));	}}SaErrorTsaClmDispatch(const SaClmHandleT *clmHandle,               SaDispatchFlagsT dispatchFlags){	int ret;	const oc_ev_membership_t *oc;	uint itemnum;	__clm_handle_t *hd = GET_CLM_HANDLE(clmHandle);	if (!hd)		return SA_ERR_BAD_HANDLE;	if ((ret = oc_ev_handle_event(hd->ev_token)) != 0) {		if (ret == EINVAL)			return SA_ERR_BAD_HANDLE;		/* else we must be evicted */	}	/* We did not lock for read here because other writers will set it	 * with the same value (if there really exist some). Otherwise we	 * need to lock here.	 */	if (__ccm_event == OC_EV_MS_EVICTED) {		cl_log(LOG_WARNING		,	"This node is evicted from the current partition!");		return SA_ERR_LIBRARY;	}	if (__ccm_event == OC_EV_MS_NOT_PRIMARY	||	__ccm_event == OC_EV_MS_PRIMARY_RESTORED) {		cl_log(LOG_DEBUG, "Received not interested event [%d]"		,	__ccm_event);		return SA_OK;	}	if (!__ccm_data)		return SA_ERR_INIT;	oc = __ccm_data;	if(CLM_TRACK_STOP == hd->trackflags)		return SA_OK;	/* SA_TRACK_CURRENT is cleared in saClmClusterTrackStart, hence we 	 * needn't to deal with it now*/	if (hd->trackflags & SA_TRACK_CHANGES) {		itemnum = oc->m_n_member + oc->m_n_out;		if (itemnum > hd->itemnum) {			hd->callbacks.saClmClusterTrackCallback(hd->nbuf			,	hd->itemnum, oc->m_n_member, oc->m_instance			,	SA_ERR_NO_SPACE);			return SA_OK;		}		pthread_lock();		retrieve_changes_buffer(hd);		pthread_unlock();		hd->callbacks.saClmClusterTrackCallback(hd->nbuf, itemnum		,	oc->m_n_member, oc->m_instance, SA_OK);	} else if (hd->trackflags & SA_TRACK_CHANGES_ONLY) {		itemnum = oc->m_n_in + oc->m_n_out;		if (itemnum > hd->itemnum) {			hd->callbacks.saClmClusterTrackCallback(hd->nbuf			,	hd->itemnum, oc->m_n_member, oc->m_instance			,	SA_ERR_NO_SPACE);			return SA_OK;		}		pthread_lock();		retrieve_changes_only_buffer(hd);		pthread_unlock();		hd->callbacks.saClmClusterTrackCallback(hd->nbuf, itemnum		,	oc->m_n_member, oc->m_instance, SA_OK);	} else {		assert(0);	}	/* unlock */	return SA_OK;}SaErrorT saClmFinalize(SaClmHandleT *clmHandle){	gpointer hd, oldkey;	if (g_hash_table_lookup_extended(__handle_hash, clmHandle	,	&oldkey, &hd) == FALSE) {		return SA_ERR_BAD_HANDLE;	} 	oc_ev_unregister(((__clm_handle_t *)hd)->ev_token);	/* TODO: unregister saClmClusterNodeGetCall here */	g_free(hd);	g_free(oldkey);	return SA_OK;}SaErrorT saClmClusterTrackStart(const SaClmHandleT *clmHandle,                       SaUint8T trackFlags,                       SaClmClusterNotificationT *notificationBuffer,                       SaUint32T numberOfItems){	__clm_handle_t *hd = GET_CLM_HANDLE(clmHandle);	if (!hd)		return SA_ERR_BAD_HANDLE;	hd->trackflags = trackFlags;	hd->itemnum = numberOfItems;	hd->nbuf = notificationBuffer;	if (trackFlags & SA_TRACK_CURRENT) {		const oc_ev_membership_t *oc;		SaUint32T itemnum;				/* Clear SA_TRACK_CURRENT, it's no use since now. */		hd->trackflags &= ~SA_TRACK_CURRENT;				oc = __ccm_data;		itemnum = oc->m_n_member;		if (itemnum > numberOfItems) {			hd->callbacks.saClmClusterTrackCallback(hd->nbuf			,	hd->itemnum, oc->m_n_member, oc->m_instance			,	SA_ERR_NO_SPACE);			return SA_OK;		}		pthread_lock();		retrieve_current_buffer(hd);		pthread_unlock();		hd->callbacks.saClmClusterTrackCallback(hd->nbuf, itemnum		,	oc->m_n_member, oc->m_instance, SA_OK);		return SA_OK;	}	return SA_OK;}SaErrorT saClmClusterTrackStop(const SaClmHandleT *clmHandle){	__clm_handle_t *hd = GET_CLM_HANDLE(clmHandle);	if (!hd)		return SA_ERR_BAD_HANDLE;	/* This is ugly. But we currently depends on OCF interface, we have	 * no choice. This should be fixed in the next version after we remove	 * the dependency with OCF.	 */	hd->trackflags = CLM_TRACK_STOP;	return SA_OK;}static SaErrorTretrieve_node_buffer(SaClmNodeIdT nodeId, SaClmClusterNodeT *clusterNode){	const oc_ev_membership_t *oc;	uint i;	char *p;	oc = (const oc_ev_membership_t *)__ccm_data;	for (i = 0; i < oc->m_n_member; i++) {		if (oc->m_array[oc->m_memb_idx+i].node_id == nodeId) {			clusterNode->nodeId = nodeId;			clusterNode->member = 1;			p = oc->m_array[oc->m_memb_idx+i].node_uname;			if (p) {				strncpy((char*)clusterNode->nodeName.value, p,						SA_MAX_NAME_LENGTH - 1);				clusterNode->nodeName.value \					[SA_MAX_NAME_LENGTH-1] = '\0';			} else {				clusterNode->nodeName.value[0] = '\0';			}			goto found;		}	}	for (i = 0; i < oc->m_n_out; i++) {		if (oc->m_array[oc->m_out_idx+i].node_id == nodeId) {			clusterNode->nodeId = nodeId;			clusterNode->member = 0;			p = oc->m_array[oc->m_out_idx+i].node_uname;			if (p) {				strncpy((char*)clusterNode->nodeName.value, p,						SA_MAX_NAME_LENGTH - 1);				clusterNode->nodeName.value \					[SA_MAX_NAME_LENGTH-1] = '\0';			} else {				clusterNode->nodeName.value[0] = '\0';			}			goto found;		}	}	cl_log(LOG_WARNING, "%s: no record for nodeId [%lu]"	,	__FUNCTION__, nodeId);	return SA_ERR_INVALID_PARAM;found:	set_misc_node_info(clusterNode);	return SA_OK;}SaErrorT saClmClusterNodeGet(SaClmNodeIdT nodeId, SaTimeT timeout,                    SaClmClusterNodeT *clusterNode){	int i;	SaErrorT ret;	if (!clusterNode) {		cl_log(LOG_ERR, "Invalid parameter clusterNode <%p>"		,	clusterNode);		return SA_ERR_INVALID_PARAM;	}	for (i = 0; i < timeout; i++) {		if (__ccm_data)			break;		sleep(1);	}	if (i == timeout)		return SA_ERR_TIMEOUT;	pthread_lock();	ret = retrieve_node_buffer(nodeId, clusterNode);	pthread_unlock();	return ret;}/* * This API is highly deprecated in version 1 implementation base on OCF. * It is actually _not_ an asynchronous call. TODO fix in version 2. */SaErrorTsaClmClusterNodeGetAsync(const SaClmHandleT *clmHandle,                         SaInvocationT invocation,                         SaClmNodeIdT nodeId,                         SaClmClusterNodeT *clusterNode){	int ret;	__clm_handle_t *hd = GET_CLM_HANDLE(clmHandle);	if (!hd)		return SA_ERR_BAD_HANDLE;	if (!clusterNode) {		cl_log(LOG_ERR, "Invalid parameter clusterNode <%p>"		,	clusterNode);		return SA_ERR_INVALID_PARAM;	}	if (!__ccm_data) {		cl_log(LOG_ERR, "__ccm_data is NULL");		return SA_ERR_INIT;	}	pthread_lock();	if ((ret = retrieve_node_buffer(nodeId, clusterNode)) != SA_OK) {		cl_log(LOG_ERR, "retrieve_node_buffer error [%d]", ret);		pthread_unlock();		return ret;	}	pthread_unlock();	hd->callbacks.saClmClusterNodeGetCallback(invocation, clusterNode	,	SA_OK);	return SA_OK;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -