📄 ccmlib_clm.c
字号:
/* * libclm.c: SAForum AIS Membership Service library * * Copyright (c) 2003 Intel Corp. * Author: Zhu Yi (yi.zhu@intel.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */#include <portability.h>#include <strings.h>#include <unistd.h>#include <glib.h>#include <errno.h>#include <assert.h>#include <ocf/oc_event.h>#include <clplumbing/cl_log.h>#include <saf/ais.h>#ifdef POSIX_THREADS# include <pthread.h>#endif#include <sys/time.h>#include <string.h>#define CLM_TRACK_STOP 0#define CLM_DEBUG 0#define GET_CLM_HANDLE(x) (__clm_handle_t *)g_hash_table_lookup(__handle_hash,x)typedef struct __clm_handle_s { oc_ev_t *ev_token; SaClmCallbacksT callbacks; SaSelectionObjectT fd; SaUint8T trackflags; SaUint32T itemnum; SaClmClusterNotificationT *nbuf; SaSelectionObjectT st;} __clm_handle_t;static GHashTable *__handle_hash = NULL;static guint __handle_counter = 0;static const oc_ev_membership_t *__ccm_data = NULL;static oc_ev_t __ccm_event = OC_EV_MS_INVALID;static void *__ccm_cookie = NULL;#ifdef POSIX_THREADS static pthread_mutex_t __clmlib_mutex = PTHREAD_MUTEX_INITIALIZER;#endifstatic void pthread_lock(void);static void pthread_unlock(void);static void clm_init(void);extern void oc_ev_special(const oc_ev_t *, oc_ev_class_t , int );static void retrieve_current_buffer(__clm_handle_t *hd);static void retrieve_changes_buffer(__clm_handle_t *hd);static void retrieve_changes_only_buffer(__clm_handle_t *hd);static SaErrorT retrieve_node_buffer(SaClmNodeIdT nodeId, SaClmClusterNodeT *clusterNode);static void pthread_lock(){#ifdef POSIX_THREADS pthread_mutex_lock(&__clmlib_mutex);#endif}static void pthread_unlock(){#ifdef POSIX_THREADS pthread_mutex_unlock(&__clmlib_mutex);#endif}static voidclm_init(){ static gboolean clminit_flag = FALSE; if (clminit_flag == FALSE) { __handle_hash = g_hash_table_new(g_int_hash , g_int_equal); clminit_flag = TRUE; } return;}static voidccm_events(oc_ed_t event, void *cookie, size_t size, const void *data){ pthread_lock(); /* dereference old cache */ if (__ccm_cookie) oc_ev_callback_done(__ccm_cookie); __ccm_cookie = cookie; __ccm_event = event; __ccm_data = (const oc_ev_membership_t *)data;#if CLM_DEBUG cl_log(LOG_DEBUG, "__ccm_data = <0x%x>" , (unsigned int)data);#endif pthread_unlock(); if (event == OC_EV_MS_EVICTED || event == OC_EV_MS_NOT_PRIMARY || event == OC_EV_MS_PRIMARY_RESTORED) { /* We do not care about this info */ return; } if (!data) { cl_log(LOG_ERR, "CCM event callback return NULL data"); return; } /* * Note: No need to worry about the buffer free problem, OCF * callback mechanism did this for us. */}SaErrorT saClmInitialize(SaClmHandleT *clmHandle, const SaClmCallbacksT *clmCallbacks, const SaVersionT *version){ int ret; oc_ev_t *ev_token; __clm_handle_t *hd; __clm_handle_t **phd; SaClmHandleT *hash_key; fd_set rset; struct timeval tv; oc_ev_register(&ev_token); if ((ret = oc_ev_set_callback(ev_token, OC_EV_MEMB_CLASS , ccm_events, NULL)) != 0) { if (ret == ENOMEM) return SA_ERR_NO_MEMORY; else assert(0); /* Never runs here */ } /* We must call it to get non-quorum partition info */ oc_ev_special(ev_token, OC_EV_MEMB_CLASS, 0); clm_init(); phd = (__clm_handle_t **)g_malloc(sizeof(__clm_handle_t *)); if (!phd) return SA_ERR_NO_MEMORY; hash_key = (SaClmHandleT *)g_malloc(sizeof(SaClmHandleT)); if (!hash_key) return SA_ERR_NO_MEMORY; *phd = (__clm_handle_t *)g_malloc(sizeof(__clm_handle_t)); hd = *phd; if (!hd) return SA_ERR_NO_MEMORY; *clmHandle = __handle_counter++; *hash_key = *clmHandle; hd->ev_token = ev_token; hd->callbacks = *clmCallbacks; hd->trackflags = CLM_TRACK_STOP; cl_log(LOG_INFO, "g_hash_table_insert hd = [%p]", hd); g_hash_table_insert(__handle_hash, hash_key, hd); if ((ret = oc_ev_activate(hd->ev_token, &hd->fd)) != 0) { cl_log(LOG_ERR, "oc_ev_activate error [%d]", ret); return SA_ERR_LIBRARY; } /* Prepare information for saClmClusterNodeGet() series calls */ FD_ZERO(&rset); FD_SET(hd->fd, &rset); tv.tv_sec = 2; tv.tv_usec = 0; if ((ret = select(hd->fd + 1, &rset, NULL, NULL, &tv)) == -1) { cl_log(LOG_ERR, "%s: select error [%d]" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } else if (ret == 0) { cl_log(LOG_WARNING, "%s: select timeout", __FUNCTION__); return SA_ERR_TIMEOUT; } if ((ret = oc_ev_handle_event(hd->ev_token) != 0)) { cl_log(LOG_ERR, "%s: oc_ev_handle_event error [%d]" , __FUNCTION__, ret); return SA_ERR_LIBRARY; } return SA_OK;}SaErrorT saClmSelectionObjectGet(const SaClmHandleT *clmHandle, SaSelectionObjectT *selectionObject){ __clm_handle_t *hd = GET_CLM_HANDLE(clmHandle); if (!hd) return SA_ERR_BAD_HANDLE; *selectionObject = hd->fd; return SA_OK;}#define MEMCHANGE(x) hd->nbuf[x].clusterChanges#define MEMNODE(x) hd->nbuf[x].clusterNodestatic voidset_misc_node_info(SaClmClusterNodeT *cn){ cn->nodeAddress.length = 0; cn->nodeAddress.value[0] = '\0'; cn->nodeName.length = strlen((char*)cn->nodeName.value); cn->clusterName.length = 0; cn->clusterName.value[0] = '\0'; cn->bootTimestamp = 0;}static voidretrieve_current_buffer(__clm_handle_t *hd){ uint i; char *p; const oc_ev_membership_t *oc = __ccm_data; for (i = 0; i < oc->m_n_member; i++) { MEMCHANGE(i) = SA_CLM_NODE_NO_CHANGE; MEMNODE(i).nodeId = oc->m_array[oc->m_memb_idx+i].node_id; MEMNODE(i).member = 1; p = oc->m_array[oc->m_memb_idx+i].node_uname; if (p) { strncpy((char*)MEMNODE(i).nodeName.value, p, SA_MAX_NAME_LENGTH - 1); MEMNODE(i).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0'; } else { MEMNODE(i).nodeName.value[0] = '\0'; } set_misc_node_info(&MEMNODE(i)); }}static voidretrieve_changes_buffer(__clm_handle_t *hd){ uint i, j; int n; char *p; const oc_ev_membership_t *oc = __ccm_data; retrieve_current_buffer(hd); for (i = 0; i < oc->m_n_in; i++) { for (j = 0; j < oc->m_n_member; j++) { if (MEMNODE(j).nodeId == oc->m_array[oc->m_in_idx+i].node_id) { MEMCHANGE(j) = SA_CLM_NODE_JOINED; p = oc->m_array[oc->m_in_idx+i].node_uname; if (p) { strncpy((char*)MEMNODE(j).nodeName.value, p, SA_MAX_NAME_LENGTH-1); MEMNODE(j).nodeName.value \ [SA_MAX_NAME_LENGTH-1] = '\0'; } else { MEMNODE(j).nodeName.value[0] = '\0'; } break; } } assert(j < oc->m_n_member); /* must find new in all */ } for (j = 0, n = oc->m_n_member; j < oc->m_n_out; j++, n++) { MEMCHANGE(n) = SA_CLM_NODE_LEFT; MEMNODE(n).nodeId = oc->m_array[oc->m_out_idx+j].node_id; MEMNODE(n).member = 0; p = oc->m_array[oc->m_out_idx+j].node_uname; if (p) { strncpy((char*)MEMNODE(n).nodeName.value, p, SA_MAX_NAME_LENGTH - 1); MEMNODE(n).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0'; } else { MEMNODE(n).nodeName.value[0] = '\0'; } set_misc_node_info(&MEMNODE(n)); }}static voidretrieve_changes_only_buffer(__clm_handle_t *hd){ uint i; int n; char *p; const oc_ev_membership_t *oc = __ccm_data; for (i = 0, n = 0; i < oc->m_n_in; i++, n++) { MEMCHANGE(n) = SA_CLM_NODE_JOINED; MEMNODE(n).nodeId = oc->m_array[oc->m_in_idx+i].node_id; MEMNODE(n).member = 1; p = oc->m_array[oc->m_in_idx+i].node_uname; if (p) { strncpy((char*)MEMNODE(n).nodeName.value, p, SA_MAX_NAME_LENGTH - 1); MEMNODE(n).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0'; } else { MEMNODE(n).nodeName.value[0] = '\0'; } set_misc_node_info(&MEMNODE(n)); } for (i = 0; i < oc->m_n_out; i++, n++) { MEMCHANGE(n) = SA_CLM_NODE_LEFT; MEMNODE(n).nodeId = oc->m_array[oc->m_out_idx+i].node_id; MEMNODE(n).member = 0; p = oc->m_array[oc->m_out_idx+i].node_uname; if (p) { strncpy((char*)MEMNODE(n).nodeName.value, p, SA_MAX_NAME_LENGTH - 1); MEMNODE(n).nodeName.value[SA_MAX_NAME_LENGTH-1] = '\0'; } else { MEMNODE(n).nodeName.value[0] = '\0'; } set_misc_node_info(&MEMNODE(n)); }}SaErrorTsaClmDispatch(const SaClmHandleT *clmHandle, SaDispatchFlagsT dispatchFlags){ int ret; const oc_ev_membership_t *oc; uint itemnum; __clm_handle_t *hd = GET_CLM_HANDLE(clmHandle); if (!hd) return SA_ERR_BAD_HANDLE; if ((ret = oc_ev_handle_event(hd->ev_token)) != 0) { if (ret == EINVAL) return SA_ERR_BAD_HANDLE; /* else we must be evicted */ } /* We did not lock for read here because other writers will set it * with the same value (if there really exist some). Otherwise we * need to lock here. */ if (__ccm_event == OC_EV_MS_EVICTED) { cl_log(LOG_WARNING , "This node is evicted from the current partition!"); return SA_ERR_LIBRARY; } if (__ccm_event == OC_EV_MS_NOT_PRIMARY || __ccm_event == OC_EV_MS_PRIMARY_RESTORED) { cl_log(LOG_DEBUG, "Received not interested event [%d]" , __ccm_event); return SA_OK; } if (!__ccm_data) return SA_ERR_INIT; oc = __ccm_data; if(CLM_TRACK_STOP == hd->trackflags) return SA_OK; /* SA_TRACK_CURRENT is cleared in saClmClusterTrackStart, hence we * needn't to deal with it now*/ if (hd->trackflags & SA_TRACK_CHANGES) { itemnum = oc->m_n_member + oc->m_n_out; if (itemnum > hd->itemnum) { hd->callbacks.saClmClusterTrackCallback(hd->nbuf , hd->itemnum, oc->m_n_member, oc->m_instance , SA_ERR_NO_SPACE); return SA_OK; } pthread_lock(); retrieve_changes_buffer(hd); pthread_unlock(); hd->callbacks.saClmClusterTrackCallback(hd->nbuf, itemnum , oc->m_n_member, oc->m_instance, SA_OK); } else if (hd->trackflags & SA_TRACK_CHANGES_ONLY) { itemnum = oc->m_n_in + oc->m_n_out; if (itemnum > hd->itemnum) { hd->callbacks.saClmClusterTrackCallback(hd->nbuf , hd->itemnum, oc->m_n_member, oc->m_instance , SA_ERR_NO_SPACE); return SA_OK; } pthread_lock(); retrieve_changes_only_buffer(hd); pthread_unlock(); hd->callbacks.saClmClusterTrackCallback(hd->nbuf, itemnum , oc->m_n_member, oc->m_instance, SA_OK); } else { assert(0); } /* unlock */ return SA_OK;}SaErrorT saClmFinalize(SaClmHandleT *clmHandle){ gpointer hd, oldkey; if (g_hash_table_lookup_extended(__handle_hash, clmHandle , &oldkey, &hd) == FALSE) { return SA_ERR_BAD_HANDLE; } oc_ev_unregister(((__clm_handle_t *)hd)->ev_token); /* TODO: unregister saClmClusterNodeGetCall here */ g_free(hd); g_free(oldkey); return SA_OK;}SaErrorT saClmClusterTrackStart(const SaClmHandleT *clmHandle, SaUint8T trackFlags, SaClmClusterNotificationT *notificationBuffer, SaUint32T numberOfItems){ __clm_handle_t *hd = GET_CLM_HANDLE(clmHandle); if (!hd) return SA_ERR_BAD_HANDLE; hd->trackflags = trackFlags; hd->itemnum = numberOfItems; hd->nbuf = notificationBuffer; if (trackFlags & SA_TRACK_CURRENT) { const oc_ev_membership_t *oc; SaUint32T itemnum; /* Clear SA_TRACK_CURRENT, it's no use since now. */ hd->trackflags &= ~SA_TRACK_CURRENT; oc = __ccm_data; itemnum = oc->m_n_member; if (itemnum > numberOfItems) { hd->callbacks.saClmClusterTrackCallback(hd->nbuf , hd->itemnum, oc->m_n_member, oc->m_instance , SA_ERR_NO_SPACE); return SA_OK; } pthread_lock(); retrieve_current_buffer(hd); pthread_unlock(); hd->callbacks.saClmClusterTrackCallback(hd->nbuf, itemnum , oc->m_n_member, oc->m_instance, SA_OK); return SA_OK; } return SA_OK;}SaErrorT saClmClusterTrackStop(const SaClmHandleT *clmHandle){ __clm_handle_t *hd = GET_CLM_HANDLE(clmHandle); if (!hd) return SA_ERR_BAD_HANDLE; /* This is ugly. But we currently depends on OCF interface, we have * no choice. This should be fixed in the next version after we remove * the dependency with OCF. */ hd->trackflags = CLM_TRACK_STOP; return SA_OK;}static SaErrorTretrieve_node_buffer(SaClmNodeIdT nodeId, SaClmClusterNodeT *clusterNode){ const oc_ev_membership_t *oc; uint i; char *p; oc = (const oc_ev_membership_t *)__ccm_data; for (i = 0; i < oc->m_n_member; i++) { if (oc->m_array[oc->m_memb_idx+i].node_id == nodeId) { clusterNode->nodeId = nodeId; clusterNode->member = 1; p = oc->m_array[oc->m_memb_idx+i].node_uname; if (p) { strncpy((char*)clusterNode->nodeName.value, p, SA_MAX_NAME_LENGTH - 1); clusterNode->nodeName.value \ [SA_MAX_NAME_LENGTH-1] = '\0'; } else { clusterNode->nodeName.value[0] = '\0'; } goto found; } } for (i = 0; i < oc->m_n_out; i++) { if (oc->m_array[oc->m_out_idx+i].node_id == nodeId) { clusterNode->nodeId = nodeId; clusterNode->member = 0; p = oc->m_array[oc->m_out_idx+i].node_uname; if (p) { strncpy((char*)clusterNode->nodeName.value, p, SA_MAX_NAME_LENGTH - 1); clusterNode->nodeName.value \ [SA_MAX_NAME_LENGTH-1] = '\0'; } else { clusterNode->nodeName.value[0] = '\0'; } goto found; } } cl_log(LOG_WARNING, "%s: no record for nodeId [%lu]" , __FUNCTION__, nodeId); return SA_ERR_INVALID_PARAM;found: set_misc_node_info(clusterNode); return SA_OK;}SaErrorT saClmClusterNodeGet(SaClmNodeIdT nodeId, SaTimeT timeout, SaClmClusterNodeT *clusterNode){ int i; SaErrorT ret; if (!clusterNode) { cl_log(LOG_ERR, "Invalid parameter clusterNode <%p>" , clusterNode); return SA_ERR_INVALID_PARAM; } for (i = 0; i < timeout; i++) { if (__ccm_data) break; sleep(1); } if (i == timeout) return SA_ERR_TIMEOUT; pthread_lock(); ret = retrieve_node_buffer(nodeId, clusterNode); pthread_unlock(); return ret;}/* * This API is highly deprecated in version 1 implementation base on OCF. * It is actually _not_ an asynchronous call. TODO fix in version 2. */SaErrorTsaClmClusterNodeGetAsync(const SaClmHandleT *clmHandle, SaInvocationT invocation, SaClmNodeIdT nodeId, SaClmClusterNodeT *clusterNode){ int ret; __clm_handle_t *hd = GET_CLM_HANDLE(clmHandle); if (!hd) return SA_ERR_BAD_HANDLE; if (!clusterNode) { cl_log(LOG_ERR, "Invalid parameter clusterNode <%p>" , clusterNode); return SA_ERR_INVALID_PARAM; } if (!__ccm_data) { cl_log(LOG_ERR, "__ccm_data is NULL"); return SA_ERR_INIT; } pthread_lock(); if ((ret = retrieve_node_buffer(nodeId, clusterNode)) != SA_OK) { cl_log(LOG_ERR, "retrieve_node_buffer error [%d]", ret); pthread_unlock(); return ret; } pthread_unlock(); hd->callbacks.saClmClusterNodeGetCallback(invocation, clusterNode , SA_OK); return SA_OK;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -