⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_lib.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 3 页
字号:
/* $Id: event_lib.c,v 1.10 2004/11/22 20:06:42 gshi Exp $ *//*  * event_lib.c: source file for event library * * Copyright (C) 2004 Forrest,Zhao <forrest.zhao@intel.com> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <clplumbing/cl_signal.h>#include <event.h>int global_debug=0;int global_verbose=0;static GHashTable *evt_handle_hash = NULL;static GHashTable *evt_channel_hash = NULL;static GHashTable *evt_event_hash = NULL;static GHashTable *subscription_global = NULL;#define RELEASE_CODE 'A'#define MAJOR_VERSION 1#define MINOR_VERSION 1typedef struct evt_event_handle_s {	SaSizeT event_size;	void *patternArray;	SaUint8T priority;	SaTimeT retentionTime;	SaNameT publisherName;	SaTimeT publishTime;	SaEvtEventIdT eventId;	void *eventData;	SaSizeT eventDataSize;	SaEvtChannelHandleT ch_handle;	SaEvtSubscriptionIdT subscription_id;	SaEvtEventIdT evtId;	SaEvtChannelHandleT channelId;	struct evt_event_handle_s *next;	int set_flag;} evt_event_handle;struct queue_head {	evt_event_handle *head;	evt_event_handle *tail; };struct open_channel_reply_queue {	struct open_channel_reply *head;	struct open_channel_reply *tail;};struct event_queue_s {	SaSizeT event_number;	struct queue_head queue[SA_EVT_HIGHEST_PRIORITY+1];	SaSizeT reply_number;	struct open_channel_reply_queue open_ch_reply_queue;};typedef struct evt_handle_s {	struct IPC_CHANNEL *ch;	SaSelectionObjectT selectionObject;	SaEvtCallbacksT callbacks;	GHashTable *evt_channel_hash;	struct event_queue_s *event_queue;} evt_handle;typedef struct evt_channel_handle_s{	SaNameT channelName;	struct IPC_CHANNEL *ch;	SaSelectionObjectT selectionObject;	SaEvtEventIdT channelId;		GHashTable *event_handle_hash;	GHashTable *subscription_hash;	SaEvtHandleT evt_handle;	SaInvocationT invocation; 	SaEvtChannelOpenFlagsT open_flags;	void *ch_instance; } evt_channel_handle;typedef struct evt_subscription_s {	const SaNameT *channelName;	SaEvtEventFilterArrayT *filters;	SaEvtEventIdT channelId;	SaEvtSubscriptionIdT subscriptionId;} evt_subscription;struct open_channel_reply{	SaNameT channel_name;	SaEvtChannelHandleT clt_ch_handle;	void* ch_instance;	SaErrorT ret_code;	struct open_channel_reply *next;};struct publish_reply{	SaEvtEventHandleT eventHandle;	SaErrorT ret_code;	SaEvtEventIdT event_id;};struct clear_retention_time_reply{	SaNameT channel_name;	SaEvtEventIdT event_id;	SaErrorT ret_code;};struct daemon_msg {	enum evt_type msg_type;		union {		struct open_channel_reply *open_ch_reply;		struct publish_reply *pub_reply;		struct clear_retention_time_reply *clear_retention_reply;		evt_event_handle *event;	} private;};static struct sa_handle_database evt_handle_database;static struct sa_handle_database ch_handle_database;static struct sa_handle_database event_handle_database;static void init_global_variable(void){	evt_handle_hash = g_hash_table_new(g_direct_hash, g_direct_equal);	evt_channel_hash = g_hash_table_new(g_direct_hash, g_direct_equal);	evt_event_hash = g_hash_table_new(g_direct_hash, g_direct_equal);	subscription_global = g_hash_table_new(g_direct_hash, g_direct_equal);	evt_handle_database.handle_count = 0;	ch_handle_database.handle_count = 0;	event_handle_database.handle_count = 0;	return;}static SaErrorT send_to_evt_daemon(struct IPC_CHANNEL *ch,				void *msg, SaSizeT msg_size){	struct IPC_MESSAGE	Msg;		memset(&Msg, 0, sizeof(Msg));	Msg.msg_body = msg;	Msg.msg_len = msg_size;	Msg.msg_done = NULL;	Msg.msg_private = NULL;	Msg.msg_ch = ch;	if(ch->ops->send(ch, &Msg) != IPC_OK){		return SA_ERR_LIBRARY;	}	return SA_OK;}static SaErrorT send_evt_init(struct IPC_CHANNEL *ch){	SaSizeT msg_size;	SaUint8T *tmp_char;	void *msg;					msg_size = 1;	msg = g_malloc(msg_size);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_INITIALIZE;	printf("%d", *(tmp_char));	send_to_evt_daemon(ch, msg, msg_size);		return SA_OK;}static void free_event(evt_event_handle *event_hd);SaErrorT saEvtInitialize(SaEvtHandleT *evtHandle,				const SaEvtCallbacksT *callbacks,				                SaVersionT *version){	struct IPC_CHANNEL *ch;	evt_handle *evt_hd;	GHashTable *attrs, *channel_hash;	char	path[] = IPC_PATH_ATTR;	char	sockpath[] = EVTFIFO;	struct event_queue_s *event_queue;	static int init = 0;	if((evtHandle == NULL) || (callbacks == NULL) || (version == NULL)){		return SA_ERR_INVALID_PARAM;	}	if(init == 0){		init_global_variable();		init++;	}	if((version->releaseCode == RELEASE_CODE) &&						(MAJOR_VERSION >= version->major)){		version->major = MAJOR_VERSION;		version->minor = MINOR_VERSION;	}else{		version->releaseCode = RELEASE_CODE;		version->major = MAJOR_VERSION;		version->minor = MINOR_VERSION;		return SA_ERR_VERSION;	}	if(get_handle(&evt_handle_database, evtHandle) != SA_OK)		return SA_ERR_NO_MEMORY;				evt_hd = (evt_handle *)g_malloc(sizeof(evt_handle));	if (!evt_hd){		put_handle(&evt_handle_database, *evtHandle);		return SA_ERR_NO_MEMORY;	}	channel_hash = g_hash_table_new(g_direct_hash,g_direct_equal);	if(!channel_hash){		g_free(evt_hd);		put_handle(&evt_handle_database, *evtHandle);		return SA_ERR_NO_MEMORY;	}	event_queue = (struct event_queue_s *)g_malloc0(			sizeof(struct event_queue_s));	attrs = g_hash_table_new(g_str_hash,g_str_equal);	if(!attrs){		g_free(evt_hd);		put_handle(&evt_handle_database, *evtHandle);		g_hash_table_destroy(channel_hash);		return SA_ERR_NO_MEMORY;	}	g_hash_table_insert(attrs, path, sockpath);	ch = ipc_channel_constructor(IPC_DOMAIN_SOCKET, attrs);	g_hash_table_destroy(attrs);	if(!ch || ch->ops->initiate_connection(ch) != IPC_OK){		g_free(evt_hd);		put_handle(&evt_handle_database, *evtHandle);		g_hash_table_destroy(channel_hash);		return SA_ERR_LIBRARY;	}	ch->ops->set_recv_qlen(ch, 0);	if(send_evt_init(ch) != SA_OK){		g_free(evt_hd);		put_handle(&evt_handle_database, *evtHandle);		g_hash_table_destroy(channel_hash);		ch->ops->destroy(ch);		return SA_ERR_LIBRARY;	}							evt_hd->ch = ch;	evt_hd->selectionObject = ch->ops->get_recv_select_fd(ch);		evt_hd->callbacks.saEvtChannelOpenCallback = 							callbacks->saEvtChannelOpenCallback;	evt_hd->callbacks.saEvtEventDeliverCallback = 							callbacks->saEvtEventDeliverCallback;	evt_hd->evt_channel_hash = channel_hash;		evt_hd->event_queue = event_queue;	g_hash_table_insert(evt_handle_hash, (gpointer)*evtHandle, evt_hd);		return SA_OK;	}static void read_normal_event(void *msg, struct daemon_msg *ret){	evt_event_handle *event;	SaSizeT  publisher_len;	SaUint8T *tmp_char;		event = (evt_event_handle *)g_malloc(sizeof(evt_event_handle));	ret->private.event = event;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&(event->channelId), tmp_char, sizeof(SaEvtChannelHandleT));	tmp_char += sizeof(SaEvtChannelHandleT);	memcpy(&(event->subscription_id), tmp_char, sizeof(SaEvtSubscriptionIdT));	tmp_char += sizeof(SaEvtSubscriptionIdT);	memcpy(&(event->event_size), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	event->patternArray = g_malloc(event->event_size);	memcpy(event->patternArray, tmp_char, event->event_size);	tmp_char += event->event_size;	event->priority = *(tmp_char);	tmp_char++;	memcpy(&(event->retentionTime), tmp_char, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(&(publisher_len), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	event->publisherName.length = publisher_len;	memcpy(event->publisherName.value, tmp_char, publisher_len);	event->publisherName.value[publisher_len] = '\0';	tmp_char += publisher_len;	memcpy(&(event->publishTime), tmp_char, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(&(event->eventId), tmp_char, sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(&(event->eventDataSize), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	event->eventData = g_malloc(event->eventDataSize);	memcpy(event->eventData, tmp_char, event->eventDataSize);	return;}static void read_ch_open_reply(void *msg, struct daemon_msg *ret){	SaUint8T *tmp_char;	SaSizeT str_len;	struct open_channel_reply *open_ch_reply;		open_ch_reply = (struct open_channel_reply *)g_malloc(			sizeof(struct open_channel_reply));	ret->private.open_ch_reply = open_ch_reply;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&(str_len), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	open_ch_reply->channel_name.length = str_len;	memcpy(open_ch_reply->channel_name.value, tmp_char, str_len);	open_ch_reply->channel_name.value[str_len] = '\0';	tmp_char += str_len;	memcpy(&(open_ch_reply->clt_ch_handle), tmp_char, sizeof(SaEvtChannelHandleT));	tmp_char += sizeof(SaEvtChannelHandleT);	memcpy(&(open_ch_reply->ch_instance), tmp_char, sizeof(void *));	tmp_char += sizeof(void *);	memcpy(&(open_ch_reply->ret_code), tmp_char, sizeof(SaErrorT));	return;}static void read_publish_reply(void *msg, struct daemon_msg *ret){	struct publish_reply *pub_reply;	SaUint8T *tmp_char;		pub_reply = (struct publish_reply *)g_malloc(			sizeof(struct publish_reply));	ret->private.pub_reply = pub_reply;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&(pub_reply->eventHandle), tmp_char, sizeof(SaEvtEventHandleT));	tmp_char += sizeof(SaEvtEventHandleT);	memcpy(&(pub_reply->event_id), tmp_char, sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(&(pub_reply->ret_code), tmp_char, sizeof(SaErrorT));	return;}static void read_clear_retention_reply(void *msg, struct daemon_msg *ret){	struct clear_retention_time_reply *clear_reply;	SaUint8T *tmp_char;	SaSizeT str_len;		clear_reply = (struct clear_retention_time_reply *)g_malloc(			sizeof(struct clear_retention_time_reply));	ret->private.clear_retention_reply = clear_reply;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	clear_reply->channel_name.length = str_len;		tmp_char += sizeof(SaSizeT);	memcpy(clear_reply->channel_name.value, tmp_char, str_len);	clear_reply->channel_name.value[str_len] = '\0';	tmp_char += str_len;	memcpy(&(clear_reply->event_id), tmp_char, sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(&(clear_reply->ret_code), tmp_char, sizeof(SaErrorT));	return;}static struct daemon_msg *read_from_ipc(IPC_Channel *ch){	IPC_Message* ipc_msg;	char* msg_type;	struct daemon_msg *ret;		ret = (struct daemon_msg *)g_malloc(sizeof(struct daemon_msg));	ch->ops->is_message_pending(ch);	if(ch->ops->recv(ch, &ipc_msg) != IPC_OK){		return NULL;		}		msg_type = (char *)ipc_msg->msg_body;	ret->msg_type = *(msg_type);	printf("the msg type is: %d\n", ret->msg_type);	switch(*(msg_type)){		case EVT_NORMAL_EVENT:						read_normal_event(ipc_msg->msg_body, ret);			break;		case EVT_CH_OPEN_REPLY_FROM_DAEMON:			read_ch_open_reply(ipc_msg->msg_body, ret);			break;		case EVT_PUBLISH_REPLY:			read_publish_reply(ipc_msg->msg_body, ret);			break;		case EVT_CLEAR_RETENTION_TIME_REPLY:			read_clear_retention_reply(ipc_msg->msg_body, ret);			break;		default:			break;	}	ipc_msg->msg_done(ipc_msg);	return ret;}static SaErrorT append_to_event_queue(struct event_queue_s *event_queue,		evt_event_handle *event_hd){	SaUint8T priority;	struct queue_head *queue;	priority = event_hd->priority;	if(priority > SA_EVT_LOWEST_PRIORITY)	{		return SA_ERR_INVALID_PARAM;	}	event_queue->event_number++;	queue = &(event_queue->queue[priority]);	if((queue->head == NULL) && (queue->tail == NULL)){		queue->head = event_hd;		queue->tail = event_hd;	}else{			queue->tail->next = event_hd;		queue->tail = event_hd;	}	return SA_OK;}static SaErrorT append_to_reply_queue(struct event_queue_s *event_queue,		struct open_channel_reply *open_ch_reply){	event_queue->reply_number++;	if((event_queue->open_ch_reply_queue.tail == NULL) &&			(event_queue->open_ch_reply_queue.head == NULL)){		event_queue->open_ch_reply_queue.head = open_ch_reply;		event_queue->open_ch_reply_queue.tail = open_ch_reply;	}else{		event_queue->open_ch_reply_queue.tail->next = open_ch_reply;		event_queue->open_ch_reply_queue.tail = open_ch_reply;	}	return SA_OK;}SaErrorT saEvtChannelOpen(const SaEvtHandleT evtHandle, const SaNameT *channelName,                 SaEvtChannelOpenFlagsT channelOpenFlags, SaTimeT timeout,                 SaEvtChannelHandleT *channelHandle){	evt_handle *evt_hd;	evt_channel_handle *evt_channel_hd;	GHashTable *channel_hash;	struct IPC_CHANNEL *ch;	fd_set rset;	struct timeval time_out;	int fd, select_ret;	SaEvtChannelHandleT channel_handle;	void *msg;	struct daemon_msg *msg_reply;		evt_event_handle *event_hd;	struct open_channel_reply *open_ch_reply;	SaSizeT msg_len, str_len;	SaUint8T *tmp_char;			if((channelHandle == NULL) || (channelName == NULL)){		return SA_ERR_INVALID_PARAM;	}	if(channelOpenFlags > 7){		return SA_ERR_BAD_FLAGS;	}	evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash,			(gpointer)evtHandle);	if( evt_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	ch = evt_hd->ch;	channel_hash = evt_hd->evt_channel_hash;	evt_channel_hd = (evt_channel_handle *)g_malloc(			sizeof(evt_channel_handle));	if (!evt_channel_hd)		return SA_ERR_NO_MEMORY;	if(get_handle(&ch_handle_database, &channel_handle) != SA_OK){		g_free(evt_channel_hd);		return SA_ERR_LIBRARY;	}		/*send channel_open request */	str_len = channelName->length;	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtChannelHandleT)		+sizeof(SaEvtChannelOpenFlagsT);	msg = g_malloc(msg_len);	if(msg == NULL){		return SA_ERR_NO_MEMORY;	}	tmp_char = (SaUint8T *)msg;	*tmp_char = EVT_OPEN_EVENT_CHANNEL;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	strncpy(tmp_char, channelName->value, str_len);	tmp_char += str_len;	memcpy(tmp_char, &(channel_handle), sizeof(SaEvtChannelHandleT));	tmp_char += sizeof(SaEvtChannelHandleT);	memcpy(tmp_char, &(channelOpenFlags), sizeof(SaEvtChannelOpenFlagsT));	send_to_evt_daemon(evt_hd->ch, msg, msg_len);/*	sleep(1); */	g_free(msg);	/*wait for reply */	fd = evt_hd->selectionObject;	time_out.tv_sec = 0;	time_out.tv_usec = timeout;	/*if msg_type is normal event, buffer it and continue */	/*if msg_type is open_channel_reply, break */	for(;;){		FD_ZERO(&rset);		FD_SET(fd, &rset);		select_ret = select(fd + 1, &rset, NULL,NULL, NULL);		if(select_ret == -1){			/*perror("select"); */			return SA_ERR_LIBRARY;				}else if(select_ret == 0){			return SA_ERR_TIMEOUT;		}		msg_reply = read_from_ipc(ch);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -