⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmslib_client.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 3 页
字号:
/* * cmslib_client.c: SAForum AIS Message Service client library * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA * */#include <stdio.h>#include <stdlib.h>#include <unistd.h>	/* dup, dup2 */#include <string.h>#include <assert.h>#include <clplumbing/realtime.h>#include <cl_log.h>#include <heartbeat.h>#include <saf/ais.h>#include "cmslib_client.h"#include "cms_client_types.h"#define PIPETRICK_DEBUG	0#ifdef DEBUG_LIBRARY#define dprintf(arg...)		fprintf(stderr, ##arg)#else#define dprintf(arg...)		{}#endif#define GET_CMS_HANDLE(x) ((x == NULL) ? NULL : \			   (__cms_handle_t *)g_hash_table_lookup( \			   __cmshandle_hash, x))#define GET_MQ_HANDLE(x)  ((x == NULL) ? NULL : \			   (__cms_handle_t *)g_hash_table_lookup( \			   __mqhandle_hash, x))static GHashTable * __cmshandle_hash;static GHashTable * __mqhandle_hash;static GHashTable * __group_tracking_hash;static guint __cmshandle_counter = 0;static gboolean __cmsclient_init_flag = FALSE;static gboolean __notify_acked = TRUE;void cmsclient_hash_init(void);IPC_Channel *cms_channel_conn(void);int enqueue_dispatch_msg(__cms_handle_t * hd, client_header_t * msg);client_header_t * dequeue_dispatch_msg(GList ** queue);int read_and_queue_ipc_msg(__cms_handle_t * handle);int dispatch_msg(__cms_handle_t * handle, client_header_t * msg);int wait_for_msg(__cms_handle_t * handle, size_t msgtype,		 const SaNameT * name, client_header_t ** msg, SaTimeT timeout);int get_timeout_value(SaTimeT timeout, struct timeval * tv);static intsaname_cmp(const SaNameT s1, const SaNameT s2){	SaUint16T len1, len2;	// dprintf("Length of s1: %d, s2: %d\n", s1.length, s2.length);	len1 = s1.value[s1.length - 1] ? s1.length : s1.length - 1;	len2 = s2.value[s2.length - 1] ? s2.length : s2.length - 1;	if (len1 != len2)		return len2 - len1;	return strncmp(s1.value, s2.value, len1);}static intbad_saname(const SaNameT * name){	int i;	if (!name || name->length <= 0	||	name->length > SA_MAX_NAME_LENGTH - 1)		return TRUE;	/*	 * We don't support '\0' inside a SaNameT.value.	 */	for (i = 0; i < name->length; i++)		if (name->value[i] == '\0')			return TRUE;	return FALSE;}static char *saname2str(SaNameT name){	char * str;		if (name.length <= 0)		return NULL;		if (name.length > SA_MAX_NAME_LENGTH - 1)		name.length = SA_MAX_NAME_LENGTH - 1;		if ((str = (char *)ha_malloc(name.length + 1)) == NULL)		return NULL;		strncpy(str, name.value, name.length);	str[name.length] = '\0';		return str;}static intactive_poll(__cms_handle_t * hd){	int fd;	if (hd->backup_fd >= 0) {		cl_log(LOG_WARNING, "%s: recursion detected", __FUNCTION__);		return 1;	}	if ((fd = hd->ch->ops->get_recv_select_fd(hd->ch)) < 0) {		cl_log(LOG_ERR, "%s: get_recv_select_fd failed", __FUNCTION__);		return 1;	}	if ((hd->backup_fd = dup(fd)) == -1) {		cl_log(LOG_ERR, "%s: dup2 failed", __FUNCTION__);		perror("dup2");		return 1;	}	close(fd);	if (dup2(hd->active_fd, fd) == -1) {		cl_log(LOG_ERR, "%s: dup2 failed", __FUNCTION__);		perror("dup2");		return 1;	}#if PIPETRICK_DEBUG	dprintf("acitve_poll for <%p>\n", hd);#endif	return 0;}static intrestore_poll(__cms_handle_t * hd){	int fd;	if (hd->backup_fd < 0) {		cl_log(LOG_WARNING, "%s: recursion detected", __FUNCTION__);		return 1;	}	if ((fd = hd->ch->ops->get_recv_select_fd(hd->ch)) < 0) {		cl_log(LOG_ERR, "%s: get_recv_select_fd failed", __FUNCTION__);		return 1;	}	if (dup2(hd->backup_fd, fd) == -1) {		cl_log(LOG_ERR, "%s: dup2 failed", __FUNCTION__);		return 1;	}	hd->backup_fd = -1;	/* mark as unused */	#if PIPETRICK_DEBUG	dprintf("restore_poll for <%p>\n", hd);#endif	return 0;}static intcmsclient_message_recv(__cms_handle_t * hd, client_header_t ** data){	int ret;	IPC_Message * ipc_msg;	if (hd->backup_fd >= 0)		restore_poll(hd);	ret = hd->ch->ops->recv(hd->ch, &ipc_msg);	if (ret != IPC_OK)		return ret;	*data = ha_malloc(ipc_msg->msg_len);	memcpy(*data, ipc_msg->msg_body, ipc_msg->msg_len);	ipc_msg->msg_done(ipc_msg);	return ret;}static voidcmsclient_message_done(IPC_Message * msg){	char * name;	client_header_t * message;	message = msg->msg_body;	name = saname2str(message->name);	ha_free(msg->msg_private);	ha_free(name);}static intcmsclient_message_send(__cms_handle_t * hd, size_t len, gpointer data){	IPC_Message * msg;	if ((msg = ha_malloc(sizeof(IPC_Message) + len)) == NULL) {		cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);		return FALSE;	}	if (hd->backup_fd >= 0)		restore_poll(hd);	msg->msg_body = msg + 1;	memcpy(msg->msg_body, data, len);	msg->msg_len = len;	msg->msg_private = msg;	msg->msg_done = cmsclient_message_done;	msg->msg_buf = 0;	return hd->ch->ops->send(hd->ch, msg);}static gbooleanmsgqueue_remove(gpointer key, gpointer value, gpointer user_data){	__cms_queue_handle_t * qhd = (__cms_queue_handle_t *) value;	client_mqueue_close_t cmg;	SaNameT * qname;	CMS_LIBRARY_TRACE();	qname = &(qhd->queue_name);	cmg.header.type = CMS_QUEUE_CLOSE;	cmg.header.name = *qname;	cmg.handle = qhd->queue_handle;	cmg.silent = TRUE;	cmsclient_message_send(qhd->cms_handle, sizeof(cmg), &cmg);	g_hash_table_remove(__mqhandle_hash, key);	ha_free((__cms_queue_handle_t *) qhd);	return TRUE;}static gbooleanlibrary_initialized(void){	return __cmsclient_init_flag;}voidcmsclient_hash_init(){	if (library_initialized())		return;	__cmshandle_hash = g_hash_table_new(g_int_hash, g_int_equal);	__mqhandle_hash = g_hash_table_new(g_int_hash, g_int_equal);	__group_tracking_hash = g_hash_table_new(g_str_hash, g_str_equal);	__cmsclient_init_flag = TRUE;}/* * This is a blocking wait for a particular type of msg on a particular queue.  * Note: memory allocated in this function.  caller needs to free(). */intwait_for_msg(__cms_handle_t * handle, size_t msgtype,             const SaNameT * queueName, client_header_t ** msg,             SaTimeT timeout){	int fd;	client_header_t * cms_msg;	longclock_t t_start = 0, t_end = 0;	if (timeout < 0)		return SA_ERR_INVALID_PARAM;	if (timeout != SA_TIME_END) {		t_start = time_longclock();		t_end = t_start + msto_longclock(timeout/1000);	}	if (handle->backup_fd >= 0)		restore_poll(handle);	fd = handle->ch->ops->get_recv_select_fd(handle->ch);	dprintf("In %s for message type 0x%x\n", __FUNCTION__, msgtype);	while (1) {		int ret = -1;		struct timeval * tv, to;		fd_set rset;		FD_ZERO(&rset);		FD_SET(fd, &rset);		tv = NULL;		if (timeout != SA_TIME_END) {			to.tv_sec = longclockto_ms((t_end - t_start))/1000;			to.tv_usec = (((t_end - t_start) -					secsto_longclock(to.tv_sec)))/1000;			tv = &to;		}		if (!handle->ch->ops->is_message_pending(handle->ch)		&&	(ret = select(fd + 1, &rset, NULL, NULL, tv)) == -1) {			cl_log(LOG_ERR, "%s: select error", __FUNCTION__);			return SA_ERR_LIBRARY;		} else if (ret == 0) {			cl_log(LOG_WARNING, "%s: timeout!", __FUNCTION__);			return SA_ERR_TIMEOUT;		} 		if ((ret = cmsclient_message_recv(handle, &cms_msg))!= IPC_OK) {			if (ret == IPC_FAIL) {				cl_shortsleep();				continue;			}			cl_log(LOG_ERR, "%s: cmsclient_message_recv failed, "					"rc = %d", __FUNCTION__, ret);			return SA_ERR_LIBRARY;		}		if (cms_msg->type & msgtype) {			if (!queueName || (queueName && (saname_cmp(cms_msg->name, *queueName) == 0))) {				*msg = cms_msg;				if (g_list_length(handle->dispatch_queue))					active_poll(handle);				return SA_OK;			} 		} 		enqueue_dispatch_msg(handle, cms_msg);		t_start = time_longclock();	}}IPC_Channel *cms_channel_conn(void){	IPC_Channel * ch;	GHashTable * attrs;	char path[] = IPC_PATH_ATTR;	char cms_socket[] = CMS_DOMAIN_SOCKET;	int ret;	attrs = g_hash_table_new(g_str_hash,g_str_equal);	g_hash_table_insert(attrs, path, cms_socket);	ch = ipc_channel_constructor(IPC_DOMAIN_SOCKET, attrs);	g_hash_table_destroy(attrs);	if (ch ) {		ret = ch->ops->initiate_connection(ch);		if (ret != IPC_OK) {			cl_log(LOG_ERR, "cms_channel_conn failed, maybe "					"you don't have cms server running...");			return NULL;		}		return ch;	}	else 		return NULL;}static intenqueue_dispatch_item(GList **queue, client_header_t * item){	*queue = g_list_append(*queue, item);	return SA_OK;}intenqueue_dispatch_msg(__cms_handle_t * hd, client_header_t * msg){	client_message_t * fmsg = (client_message_t *)msg;	dprintf("calling enqueue_dispatch_msg ..... \n");	/*	 * If it is a message, then add it to the msg queue.	 */	if (msg->type == CMS_MSG_NOTIFY) {		dprintf("got a CMS_MSG_NOTIFY msg\n");		__notify_acked = FALSE;	} 	return enqueue_dispatch_item(&(hd->dispatch_queue),				     (client_header_t *) fmsg);}client_header_t *dequeue_dispatch_msg(GList ** queue){	client_header_t * msg = NULL;	GList * head;	if (!g_list_length(*queue))		return NULL;	head = g_list_first(*queue);	*queue = g_list_remove_link(*queue, head);	msg = head->data;	g_list_free_1(head);	return msg;}/** * Read all the ipc msg in the buffer and queue them to  * the msg queue or the dispatch queue. */intread_and_queue_ipc_msg(__cms_handle_t * handle){	int ret, count = 0;	client_header_t *rcmg;	__mqgroup_track_t * track;	client_mqgroup_notify_t *nsg, *m;	dprintf("b4 the do loop of the read_and_queue_ipc_msg ...\n");	if (handle->backup_fd >= 0)		restore_poll(handle);	while (handle->ch->ops->is_message_pending(handle->ch)) {		ret = cmsclient_message_recv(handle, &rcmg);		if (ret == IPC_FAIL) {			cl_shortsleep();			cl_log(LOG_WARNING, "%s: cmsclient_message_recv "					"failed, rc = %d", __FUNCTION__, ret);			break;		} 		switch (rcmg->type) {		case CMS_QUEUEGROUP_NOTIFY:			/*			 * prepare the notify buffer			 */			m = (client_mqgroup_notify_t *)rcmg;			m->data = (char *)rcmg +				sizeof(client_mqgroup_notify_t);			track = g_hash_table_lookup(__group_tracking_hash,					(m->group_name).value);			if (track == NULL) {				/*				 * This is possible, because TrackStop				 * may be called before we get here.				 */				cl_log(LOG_INFO, "No one tracks the group"						 " [%s] membership now!"				,	m->group_name.value);				return TRUE;			}			track->policy = m->policy;			track->buf.numberOfItems = m->number;			track->buf.notification =				(SaMsgQueueGroupNotificationT *)				ha_malloc(m->number					* sizeof(SaMsgQueueGroupNotificationT));			memcpy(track->buf.notification, m->data, m->number *				sizeof(SaMsgQueueGroupNotificationT));			/*			 * only enqueue head is enough for us			 */			dprintf("enqueue group notify msg head\n");			nsg = (client_mqgroup_notify_t *)				ha_malloc(sizeof(client_mqgroup_notify_t));			memcpy(nsg, m, sizeof(client_mqgroup_notify_t));			enqueue_dispatch_msg(handle, (client_header_t *)nsg);			ha_free(rcmg);			break;		default:			enqueue_dispatch_msg(handle, rcmg);			/* TODO: we have a memory leak here 			   need to call the msg_done()			 */			break;		}	} 	return count;}intdispatch_msg(__cms_handle_t * handle, client_header_t * msg) {	client_mqueue_open_t * omsg;	client_mqgroup_notify_t * nmsg;	client_message_ack_t * amsg;	__mqgroup_track_t * track;	client_message_t * gmsg;	char * name;	__cms_queue_handle_t * qhd;	dprintf("In Function %s..\n", __FUNCTION__);	dprintf("handle=<%p> msg->type=<%d>\n", handle, msg->type);	if (handle == NULL || msg == NULL) 		return HA_FAIL;	switch (msg->type) {	case CMS_QUEUE_OPEN_ASYNC:		omsg = (client_mqueue_open_t *) msg;		if ((handle->callbacks).saMsgQueueOpenCallback) {			if (omsg->header.flag != SA_OK) {				omsg->handle = 0;			}			(handle->callbacks).saMsgQueueOpenCallback(			   	omsg->invocation, &(omsg->handle),				omsg->header.flag);		}		ha_free(omsg);		break;  	case CMS_MSG_NOTIFY:		gmsg = (client_message_t *) msg;		qhd = g_hash_table_lookup(handle->queue_handle_hash, 				&(gmsg->handle));		if (handle->callbacks.saMsgMessageReceivedCallback)			handle->callbacks.saMsgMessageReceivedCallback(					&(qhd->queue_handle));					ha_free(gmsg); 		break; 	case CMS_MSG_ACK:		amsg = (client_message_ack_t *) msg;		if ((handle->callbacks).saMsgMessageDeliveredCallback) {			(handle->callbacks).saMsgMessageDeliveredCallback(					amsg->invocation,					msg->flag);		}		ha_free(amsg);		break; 	case CMS_QUEUEGROUP_NOTIFY: 		nmsg = (client_mqgroup_notify_t *)msg;  		name = (char *) ha_malloc(nmsg->group_name.length + 1); 		if (name == NULL) { 			cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); 			return FALSE; 		} 		dprintf("group name [%s], length [%d]\n" 		,	nmsg->group_name.value, nmsg->group_name.length); 		strncpy(name, nmsg->group_name.value, nmsg->group_name.length); 		name[nmsg->group_name.length] = '\0'; 		dprintf("name = [%s]\n", name);  		track = g_hash_table_lookup(__group_tracking_hash, name);  		if (track == NULL) { 			cl_log(LOG_ERR, "Cannot find track buffer"); 			return FALSE; 		}  		if ((handle->callbacks).saMsgQueueGroupTrackCallback == NULL) 			return FALSE;  		(handle->callbacks).saMsgQueueGroupTrackCallback( 			track->name, &(track->buf), track->policy, 			track->buf.numberOfItems, SA_OK);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -