📄 event_lib.c
字号:
/* $Id: event_lib.c,v 1.10 2004/11/22 20:06:42 gshi Exp $ *//* * event_lib.c: source file for event library * * Copyright (C) 2004 Forrest,Zhao <forrest.zhao@intel.com> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <clplumbing/cl_signal.h>#include <event.h>int global_debug=0;int global_verbose=0;static GHashTable *evt_handle_hash = NULL;static GHashTable *evt_channel_hash = NULL;static GHashTable *evt_event_hash = NULL;static GHashTable *subscription_global = NULL;#define RELEASE_CODE 'A'#define MAJOR_VERSION 1#define MINOR_VERSION 1typedef struct evt_event_handle_s { SaSizeT event_size; void *patternArray; SaUint8T priority; SaTimeT retentionTime; SaNameT publisherName; SaTimeT publishTime; SaEvtEventIdT eventId; void *eventData; SaSizeT eventDataSize; SaEvtChannelHandleT ch_handle; SaEvtSubscriptionIdT subscription_id; SaEvtEventIdT evtId; SaEvtChannelHandleT channelId; struct evt_event_handle_s *next; int set_flag;} evt_event_handle;struct queue_head { evt_event_handle *head; evt_event_handle *tail; };struct open_channel_reply_queue { struct open_channel_reply *head; struct open_channel_reply *tail;};struct event_queue_s { SaSizeT event_number; struct queue_head queue[SA_EVT_HIGHEST_PRIORITY+1]; SaSizeT reply_number; struct open_channel_reply_queue open_ch_reply_queue;};typedef struct evt_handle_s { struct IPC_CHANNEL *ch; SaSelectionObjectT selectionObject; SaEvtCallbacksT callbacks; GHashTable *evt_channel_hash; struct event_queue_s *event_queue;} evt_handle;typedef struct evt_channel_handle_s{ SaNameT channelName; struct IPC_CHANNEL *ch; SaSelectionObjectT selectionObject; SaEvtEventIdT channelId; GHashTable *event_handle_hash; GHashTable *subscription_hash; SaEvtHandleT evt_handle; SaInvocationT invocation; SaEvtChannelOpenFlagsT open_flags; void *ch_instance; } evt_channel_handle;typedef struct evt_subscription_s { const SaNameT *channelName; SaEvtEventFilterArrayT *filters; SaEvtEventIdT channelId; SaEvtSubscriptionIdT subscriptionId;} evt_subscription;struct open_channel_reply{ SaNameT channel_name; SaEvtChannelHandleT clt_ch_handle; void* ch_instance; SaErrorT ret_code; struct open_channel_reply *next;};struct publish_reply{ SaEvtEventHandleT eventHandle; SaErrorT ret_code; SaEvtEventIdT event_id;};struct clear_retention_time_reply{ SaNameT channel_name; SaEvtEventIdT event_id; SaErrorT ret_code;};struct daemon_msg { enum evt_type msg_type; union { struct open_channel_reply *open_ch_reply; struct publish_reply *pub_reply; struct clear_retention_time_reply *clear_retention_reply; evt_event_handle *event; } private;};static struct sa_handle_database evt_handle_database;static struct sa_handle_database ch_handle_database;static struct sa_handle_database event_handle_database;static void init_global_variable(void){ evt_handle_hash = g_hash_table_new(g_direct_hash, g_direct_equal); evt_channel_hash = g_hash_table_new(g_direct_hash, g_direct_equal); evt_event_hash = g_hash_table_new(g_direct_hash, g_direct_equal); subscription_global = g_hash_table_new(g_direct_hash, g_direct_equal); evt_handle_database.handle_count = 0; ch_handle_database.handle_count = 0; event_handle_database.handle_count = 0; return;}static SaErrorT send_to_evt_daemon(struct IPC_CHANNEL *ch, void *msg, SaSizeT msg_size){ struct IPC_MESSAGE Msg; memset(&Msg, 0, sizeof(Msg)); Msg.msg_body = msg; Msg.msg_len = msg_size; Msg.msg_done = NULL; Msg.msg_private = NULL; Msg.msg_ch = ch; if(ch->ops->send(ch, &Msg) != IPC_OK){ return SA_ERR_LIBRARY; } return SA_OK;}static SaErrorT send_evt_init(struct IPC_CHANNEL *ch){ SaSizeT msg_size; SaUint8T *tmp_char; void *msg; msg_size = 1; msg = g_malloc(msg_size); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_INITIALIZE; printf("%d", *(tmp_char)); send_to_evt_daemon(ch, msg, msg_size); return SA_OK;}static void free_event(evt_event_handle *event_hd);SaErrorT saEvtInitialize(SaEvtHandleT *evtHandle, const SaEvtCallbacksT *callbacks, SaVersionT *version){ struct IPC_CHANNEL *ch; evt_handle *evt_hd; GHashTable *attrs, *channel_hash; char path[] = IPC_PATH_ATTR; char sockpath[] = EVTFIFO; struct event_queue_s *event_queue; static int init = 0; if((evtHandle == NULL) || (callbacks == NULL) || (version == NULL)){ return SA_ERR_INVALID_PARAM; } if(init == 0){ init_global_variable(); init++; } if((version->releaseCode == RELEASE_CODE) && (MAJOR_VERSION >= version->major)){ version->major = MAJOR_VERSION; version->minor = MINOR_VERSION; }else{ version->releaseCode = RELEASE_CODE; version->major = MAJOR_VERSION; version->minor = MINOR_VERSION; return SA_ERR_VERSION; } if(get_handle(&evt_handle_database, evtHandle) != SA_OK) return SA_ERR_NO_MEMORY; evt_hd = (evt_handle *)g_malloc(sizeof(evt_handle)); if (!evt_hd){ put_handle(&evt_handle_database, *evtHandle); return SA_ERR_NO_MEMORY; } channel_hash = g_hash_table_new(g_direct_hash,g_direct_equal); if(!channel_hash){ g_free(evt_hd); put_handle(&evt_handle_database, *evtHandle); return SA_ERR_NO_MEMORY; } event_queue = (struct event_queue_s *)g_malloc0( sizeof(struct event_queue_s)); attrs = g_hash_table_new(g_str_hash,g_str_equal); if(!attrs){ g_free(evt_hd); put_handle(&evt_handle_database, *evtHandle); g_hash_table_destroy(channel_hash); return SA_ERR_NO_MEMORY; } g_hash_table_insert(attrs, path, sockpath); ch = ipc_channel_constructor(IPC_DOMAIN_SOCKET, attrs); g_hash_table_destroy(attrs); if(!ch || ch->ops->initiate_connection(ch) != IPC_OK){ g_free(evt_hd); put_handle(&evt_handle_database, *evtHandle); g_hash_table_destroy(channel_hash); return SA_ERR_LIBRARY; } ch->ops->set_recv_qlen(ch, 0); if(send_evt_init(ch) != SA_OK){ g_free(evt_hd); put_handle(&evt_handle_database, *evtHandle); g_hash_table_destroy(channel_hash); ch->ops->destroy(ch); return SA_ERR_LIBRARY; } evt_hd->ch = ch; evt_hd->selectionObject = ch->ops->get_recv_select_fd(ch); evt_hd->callbacks.saEvtChannelOpenCallback = callbacks->saEvtChannelOpenCallback; evt_hd->callbacks.saEvtEventDeliverCallback = callbacks->saEvtEventDeliverCallback; evt_hd->evt_channel_hash = channel_hash; evt_hd->event_queue = event_queue; g_hash_table_insert(evt_handle_hash, (gpointer)*evtHandle, evt_hd); return SA_OK; }static void read_normal_event(void *msg, struct daemon_msg *ret){ evt_event_handle *event; SaSizeT publisher_len; SaUint8T *tmp_char; event = (evt_event_handle *)g_malloc(sizeof(evt_event_handle)); ret->private.event = event; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&(event->channelId), tmp_char, sizeof(SaEvtChannelHandleT)); tmp_char += sizeof(SaEvtChannelHandleT); memcpy(&(event->subscription_id), tmp_char, sizeof(SaEvtSubscriptionIdT)); tmp_char += sizeof(SaEvtSubscriptionIdT); memcpy(&(event->event_size), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); event->patternArray = g_malloc(event->event_size); memcpy(event->patternArray, tmp_char, event->event_size); tmp_char += event->event_size; event->priority = *(tmp_char); tmp_char++; memcpy(&(event->retentionTime), tmp_char, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(&(publisher_len), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); event->publisherName.length = publisher_len; memcpy(event->publisherName.value, tmp_char, publisher_len); event->publisherName.value[publisher_len] = '\0'; tmp_char += publisher_len; memcpy(&(event->publishTime), tmp_char, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(&(event->eventId), tmp_char, sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); memcpy(&(event->eventDataSize), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); event->eventData = g_malloc(event->eventDataSize); memcpy(event->eventData, tmp_char, event->eventDataSize); return;}static void read_ch_open_reply(void *msg, struct daemon_msg *ret){ SaUint8T *tmp_char; SaSizeT str_len; struct open_channel_reply *open_ch_reply; open_ch_reply = (struct open_channel_reply *)g_malloc( sizeof(struct open_channel_reply)); ret->private.open_ch_reply = open_ch_reply; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&(str_len), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); open_ch_reply->channel_name.length = str_len; memcpy(open_ch_reply->channel_name.value, tmp_char, str_len); open_ch_reply->channel_name.value[str_len] = '\0'; tmp_char += str_len; memcpy(&(open_ch_reply->clt_ch_handle), tmp_char, sizeof(SaEvtChannelHandleT)); tmp_char += sizeof(SaEvtChannelHandleT); memcpy(&(open_ch_reply->ch_instance), tmp_char, sizeof(void *)); tmp_char += sizeof(void *); memcpy(&(open_ch_reply->ret_code), tmp_char, sizeof(SaErrorT)); return;}static void read_publish_reply(void *msg, struct daemon_msg *ret){ struct publish_reply *pub_reply; SaUint8T *tmp_char; pub_reply = (struct publish_reply *)g_malloc( sizeof(struct publish_reply)); ret->private.pub_reply = pub_reply; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&(pub_reply->eventHandle), tmp_char, sizeof(SaEvtEventHandleT)); tmp_char += sizeof(SaEvtEventHandleT); memcpy(&(pub_reply->event_id), tmp_char, sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); memcpy(&(pub_reply->ret_code), tmp_char, sizeof(SaErrorT)); return;}static void read_clear_retention_reply(void *msg, struct daemon_msg *ret){ struct clear_retention_time_reply *clear_reply; SaUint8T *tmp_char; SaSizeT str_len; clear_reply = (struct clear_retention_time_reply *)g_malloc( sizeof(struct clear_retention_time_reply)); ret->private.clear_retention_reply = clear_reply; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); clear_reply->channel_name.length = str_len; tmp_char += sizeof(SaSizeT); memcpy(clear_reply->channel_name.value, tmp_char, str_len); clear_reply->channel_name.value[str_len] = '\0'; tmp_char += str_len; memcpy(&(clear_reply->event_id), tmp_char, sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); memcpy(&(clear_reply->ret_code), tmp_char, sizeof(SaErrorT)); return;}static struct daemon_msg *read_from_ipc(IPC_Channel *ch){ IPC_Message* ipc_msg; char* msg_type; struct daemon_msg *ret; ret = (struct daemon_msg *)g_malloc(sizeof(struct daemon_msg)); ch->ops->is_message_pending(ch); if(ch->ops->recv(ch, &ipc_msg) != IPC_OK){ return NULL; } msg_type = (char *)ipc_msg->msg_body; ret->msg_type = *(msg_type); printf("the msg type is: %d\n", ret->msg_type); switch(*(msg_type)){ case EVT_NORMAL_EVENT: read_normal_event(ipc_msg->msg_body, ret); break; case EVT_CH_OPEN_REPLY_FROM_DAEMON: read_ch_open_reply(ipc_msg->msg_body, ret); break; case EVT_PUBLISH_REPLY: read_publish_reply(ipc_msg->msg_body, ret); break; case EVT_CLEAR_RETENTION_TIME_REPLY: read_clear_retention_reply(ipc_msg->msg_body, ret); break; default: break; } ipc_msg->msg_done(ipc_msg); return ret;}static SaErrorT append_to_event_queue(struct event_queue_s *event_queue, evt_event_handle *event_hd){ SaUint8T priority; struct queue_head *queue; priority = event_hd->priority; if(priority > SA_EVT_LOWEST_PRIORITY) { return SA_ERR_INVALID_PARAM; } event_queue->event_number++; queue = &(event_queue->queue[priority]); if((queue->head == NULL) && (queue->tail == NULL)){ queue->head = event_hd; queue->tail = event_hd; }else{ queue->tail->next = event_hd; queue->tail = event_hd; } return SA_OK;}static SaErrorT append_to_reply_queue(struct event_queue_s *event_queue, struct open_channel_reply *open_ch_reply){ event_queue->reply_number++; if((event_queue->open_ch_reply_queue.tail == NULL) && (event_queue->open_ch_reply_queue.head == NULL)){ event_queue->open_ch_reply_queue.head = open_ch_reply; event_queue->open_ch_reply_queue.tail = open_ch_reply; }else{ event_queue->open_ch_reply_queue.tail->next = open_ch_reply; event_queue->open_ch_reply_queue.tail = open_ch_reply; } return SA_OK;}SaErrorT saEvtChannelOpen(const SaEvtHandleT evtHandle, const SaNameT *channelName, SaEvtChannelOpenFlagsT channelOpenFlags, SaTimeT timeout, SaEvtChannelHandleT *channelHandle){ evt_handle *evt_hd; evt_channel_handle *evt_channel_hd; GHashTable *channel_hash; struct IPC_CHANNEL *ch; fd_set rset; struct timeval time_out; int fd, select_ret; SaEvtChannelHandleT channel_handle; void *msg; struct daemon_msg *msg_reply; evt_event_handle *event_hd; struct open_channel_reply *open_ch_reply; SaSizeT msg_len, str_len; SaUint8T *tmp_char; if((channelHandle == NULL) || (channelName == NULL)){ return SA_ERR_INVALID_PARAM; } if(channelOpenFlags > 7){ return SA_ERR_BAD_FLAGS; } evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash, (gpointer)evtHandle); if( evt_hd == NULL){ return SA_ERR_BAD_HANDLE; } ch = evt_hd->ch; channel_hash = evt_hd->evt_channel_hash; evt_channel_hd = (evt_channel_handle *)g_malloc( sizeof(evt_channel_handle)); if (!evt_channel_hd) return SA_ERR_NO_MEMORY; if(get_handle(&ch_handle_database, &channel_handle) != SA_OK){ g_free(evt_channel_hd); return SA_ERR_LIBRARY; } /*send channel_open request */ str_len = channelName->length; msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtChannelHandleT) +sizeof(SaEvtChannelOpenFlagsT); msg = g_malloc(msg_len); if(msg == NULL){ return SA_ERR_NO_MEMORY; } tmp_char = (SaUint8T *)msg; *tmp_char = EVT_OPEN_EVENT_CHANNEL; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); strncpy(tmp_char, channelName->value, str_len); tmp_char += str_len; memcpy(tmp_char, &(channel_handle), sizeof(SaEvtChannelHandleT)); tmp_char += sizeof(SaEvtChannelHandleT); memcpy(tmp_char, &(channelOpenFlags), sizeof(SaEvtChannelOpenFlagsT)); send_to_evt_daemon(evt_hd->ch, msg, msg_len);/* sleep(1); */ g_free(msg); /*wait for reply */ fd = evt_hd->selectionObject; time_out.tv_sec = 0; time_out.tv_usec = timeout; /*if msg_type is normal event, buffer it and continue */ /*if msg_type is open_channel_reply, break */ for(;;){ FD_ZERO(&rset); FD_SET(fd, &rset); select_ret = select(fd + 1, &rset, NULL,NULL, NULL); if(select_ret == -1){ /*perror("select"); */ return SA_ERR_LIBRARY; }else if(select_ret == 0){ return SA_ERR_TIMEOUT; } msg_reply = read_from_ipc(ch);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -