📄 event_lib.c
字号:
FD_SET(fd, &rset); if(select(fd + 1, &rset, NULL,NULL,NULL) == -1){ /*perror("select"); */ return(1); } msg_reply = read_from_ipc(ch); if(msg_reply->msg_type == EVT_PUBLISH_REPLY){ if(eventHandle == msg_reply->private.pub_reply->eventHandle){ if(msg_reply->private.pub_reply->ret_code == SA_OK){ break; }else{ return msg_reply->private.pub_reply->ret_code; } }else{ /*update timeout, continue */ } }else if(msg_reply->msg_type == EVT_NORMAL_EVENT) { /*TODO: update timeout, continue */ event_hd = msg_reply->private.event; append_to_event_queue(evt_hd->event_queue, event_hd); }else if(msg_reply->msg_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON) { open_ch_reply = msg_reply->private.open_ch_reply; append_to_reply_queue(evt_hd->event_queue, open_ch_reply); }else { /*update timeout, continue */ /*error msg */ } /*if msg_type is normal event, buffer it and continue */ /*if msg_type is publish_reply, break */ } *(eventId) = (SaEvtEventIdT)msg_reply->private.pub_reply->event_id; return SA_OK; }SaErrorT saEvtEventSubscribe(const SaEvtChannelHandleT channelHandle, const SaEvtEventFilterArrayT *filters, SaEvtSubscriptionIdT subscriptionId){ evt_channel_handle *evt_channel_hd; struct IPC_CHANNEL *ch; GHashTable *subscription_hash; SaUint8T *tmp_char; SaSizeT msg_size; SaSizeT str_len, number, i, filter_size=0; void *msg; SaEvtEventFilterT *filter; void *tmp_pointer; evt_channel_hd = (evt_channel_handle *)g_hash_table_lookup( evt_channel_hash, (gpointer)channelHandle); if(evt_channel_hd == NULL){ return SA_ERR_BAD_HANDLE; } if((evt_channel_hd->open_flags & SA_EVT_CHANNEL_SUBSCRIBER) != SA_EVT_CHANNEL_SUBSCRIBER){ return SA_ERR_INVALID_PARAM; } ch = evt_channel_hd->ch; subscription_hash = evt_channel_hd->subscription_hash; tmp_pointer = g_hash_table_lookup(subscription_global, (gpointer)subscriptionId); if(tmp_pointer != NULL){ return SA_ERR_EXIST; } g_hash_table_insert(subscription_hash, (gpointer)subscriptionId, (gpointer)subscriptionId); g_hash_table_insert(subscription_global, (gpointer)subscriptionId, (gpointer)subscriptionId); /*msg_size= msg_type+ch_name_len+ch_name+channel_id+sub_id+filter_len+filters_number+pattern */ str_len = evt_channel_hd->channelName.length; /* calculate filter length, then copy it to msg */ number = filters->filtersNumber; filter = filters->filters; for(i=0; i<number; i++){ filter_size += filter[i].filter.patternSize; } filter_size = sizeof(SaSizeT)+(number*sizeof(SaSizeT))+ (number*sizeof(SaEvtEventFilterTypeT))+filter_size; msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(void *)+ sizeof(subscriptionId)+sizeof(SaSizeT)+ filter_size+sizeof(SaEvtChannelHandleT); msg = g_malloc(msg_size); if(msg == NULL){ return SA_ERR_NO_MEMORY; } tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_SUBSCRIBE; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, evt_channel_hd->channelName.value, str_len); tmp_char += str_len; memcpy(tmp_char, &(evt_channel_hd->ch_instance), sizeof(void *)); tmp_char += sizeof(void *); memcpy(tmp_char, &subscriptionId, sizeof(SaEvtSubscriptionIdT)); tmp_char += sizeof(SaEvtSubscriptionIdT); memcpy(tmp_char, &filter_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, &number, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); for(i=0; i<number; i++){ memcpy(tmp_char, &(filter[i].filterType), sizeof(SaEvtEventFilterTypeT)); tmp_char += sizeof(SaEvtEventFilterTypeT); memcpy(tmp_char, &(filter[i].filter.patternSize), sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, filter[i].filter.pattern, filter[i].filter.patternSize); tmp_char += filter[i].filter.patternSize; } memcpy(tmp_char, &channelHandle, sizeof(SaEvtChannelHandleT)); send_to_evt_daemon(ch, msg, msg_size); return SA_OK;}SaErrorT saEvtEventUnsubscribe( SaEvtChannelHandleT channelHandle, SaEvtSubscriptionIdT subscriptionId ){ evt_channel_handle *evt_channel_hd; GHashTable *subscription_hash; SaSizeT msg_size; SaSizeT str_len; void *msg; char *tmp_char; struct IPC_CHANNEL *ch; evt_channel_hd = (evt_channel_handle *)g_hash_table_lookup( evt_channel_hash, (gpointer)channelHandle); if(evt_channel_hd == NULL){ return SA_ERR_BAD_HANDLE; } ch = evt_channel_hd->ch; subscription_hash = evt_channel_hd->subscription_hash; if(g_hash_table_lookup(subscription_hash,(gpointer)subscriptionId) == NULL){ return SA_ERR_NAME_NOT_FOUND; } g_hash_table_remove(subscription_hash, (gpointer)subscriptionId); /* construct the msg and send to daemon */ str_len = evt_channel_hd->channelName.length; msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(void *)+ sizeof(SaEvtSubscriptionIdT); msg = g_malloc(msg_size); if(msg == NULL){ return SA_ERR_LIBRARY; } tmp_char = msg; *(tmp_char) = EVT_UNSUBSCRIBE; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); strncpy(tmp_char, evt_channel_hd->channelName.value, str_len); tmp_char += str_len; memcpy(tmp_char, &(evt_channel_hd->ch_instance), sizeof(void *)); tmp_char += sizeof(void *); memcpy(tmp_char, &subscriptionId, sizeof(SaEvtSubscriptionIdT)); send_to_evt_daemon(ch, msg, msg_size); return SA_OK;}SaErrorT saEvtSelectionObjectGet(const SaEvtHandleT evtHandle, SaSelectionObjectT *selectionObject){ evt_handle *evt_hd; if(selectionObject == NULL){ return SA_ERR_INVALID_PARAM; } evt_hd = g_hash_table_lookup(evt_handle_hash, (gpointer)evtHandle); if( evt_hd == NULL){ return SA_ERR_BAD_HANDLE; } *selectionObject = evt_hd->selectionObject; return SA_OK;}static void *read_from_event_queue(struct event_queue_s *event_queue, SaUint8T *type){ SaSizeT i; struct queue_head *queue; evt_event_handle *event_tmp = NULL; struct open_channel_reply *reply_tmp = NULL; void *ret = NULL; if(event_queue->event_number != 0){ for(i=SA_EVT_HIGHEST_PRIORITY;i<=SA_EVT_LOWEST_PRIORITY;i++){ queue = &(event_queue->queue[i]); if(queue->head != NULL){ event_tmp = queue->head; queue->head = queue->head->next; if(queue->head == NULL){ queue->tail = NULL; } event_tmp->next = NULL; event_queue->event_number--; *(type) = EVT_NORMAL_EVENT; ret = (void *)event_tmp; break; }else{ continue; } } }else if(event_queue->reply_number != 0){ reply_tmp = event_queue->open_ch_reply_queue.head; event_queue->open_ch_reply_queue.head = event_queue->open_ch_reply_queue.head->next; if(event_queue->open_ch_reply_queue.head == NULL){ event_queue->open_ch_reply_queue.tail = NULL; } reply_tmp->next = NULL; event_queue->reply_number--; *(type) = EVT_CH_OPEN_REPLY_FROM_DAEMON; ret = (void *)reply_tmp; } return ret;}SaErrorT saEvtDispatch(const SaEvtHandleT evtHandle, SaDispatchFlagsT dispatchFlags){ evt_handle *evt_hd; struct IPC_CHANNEL *ch; evt_event_handle *event_hd; evt_channel_handle *evt_channel_hd; struct daemon_msg *msg; SaEvtEventHandleT event_handle; SaUint8T event_type; void *tmp_void; struct open_channel_reply *open_ch_reply; int fd; fd_set rset; struct timeval time_out; int ret; evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash, (gpointer)evtHandle); if( evt_hd == NULL){ return SA_ERR_BAD_HANDLE; } if((dispatchFlags < 1) || (dispatchFlags > 3)){ return SA_ERR_BAD_FLAGS; } /*can read two types of message from IPC: */ /*1 EVT_NORMAL_EVENT */ /*2 EVT_CH_OPEN_REPLY_FROM_DAEMON: for channel_open_async */ switch(dispatchFlags){ case SA_DISPATCH_ONE: tmp_void = read_from_event_queue(evt_hd->event_queue, &event_type); if(tmp_void == NULL){ ch = evt_hd->ch; msg = read_from_ipc(ch); if(msg == NULL){ ch->ops->destroy(ch); return SA_ERR_LIBRARY; } event_type = msg->msg_type; tmp_void = (void *)msg->private.event; } if(event_type == EVT_NORMAL_EVENT){ event_hd = (evt_event_handle *)tmp_void; evt_channel_hd = g_hash_table_lookup( evt_channel_hash, (gpointer)event_hd->channelId); if(evt_channel_hd == NULL){ return SA_ERR_LIBRARY; } if(get_handle(&event_handle_database, &event_handle) != SA_OK){ return SA_ERR_LIBRARY; } g_hash_table_insert(evt_event_hash, (gpointer)event_handle, event_hd); g_hash_table_insert( evt_channel_hd->event_handle_hash, (gpointer)event_handle, event_hd); evt_hd->callbacks.saEvtEventDeliverCallback( event_hd->subscription_id, event_handle, event_hd->eventDataSize); }else if(event_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON){ open_ch_reply = (struct open_channel_reply *) tmp_void; evt_channel_hd = g_hash_table_lookup( evt_channel_hash, (gpointer)open_ch_reply->clt_ch_handle); if(evt_channel_hd == NULL){ /*TODO: free open_ch_reply */ return SA_ERR_LIBRARY; } evt_channel_hd->ch_instance = open_ch_reply->ch_instance; evt_hd->callbacks.saEvtChannelOpenCallback( evt_channel_hd->invocation, open_ch_reply->clt_ch_handle, open_ch_reply->ret_code); } break; case SA_DISPATCH_ALL: fd = evt_hd->selectionObject; time_out.tv_sec = 0; for(;;){ tmp_void = read_from_event_queue( evt_hd->event_queue, &event_type); if(tmp_void == NULL){ FD_ZERO(&rset); FD_SET(fd, &rset); ret = select(fd + 1, &rset,NULL,NULL, &time_out); if( ret == -1){ /*error */ return SA_ERR_LIBRARY; }else if(ret == 0){ return SA_OK; } ch = evt_hd->ch; msg = read_from_ipc(ch); if(msg == NULL){ ch->ops->destroy(ch); return SA_ERR_LIBRARY; } event_type = msg->msg_type; tmp_void = (void *)msg->private.event; } if(event_type == EVT_NORMAL_EVENT){ event_hd = (evt_event_handle *) tmp_void; evt_channel_hd = g_hash_table_lookup( evt_channel_hash, (gpointer)event_hd->channelId); if(evt_channel_hd == NULL){ return SA_ERR_LIBRARY; } if(get_handle(&event_handle_database, &event_handle) != SA_OK){ return SA_ERR_LIBRARY; } g_hash_table_insert(evt_event_hash, (gpointer)event_handle, event_hd); g_hash_table_insert( evt_channel_hd->event_handle_hash, (gpointer)event_handle, event_hd); evt_hd->callbacks. saEvtEventDeliverCallback( event_hd->subscription_id, event_handle, event_hd->eventDataSize); }else if(event_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON){ open_ch_reply = (struct open_channel_reply *) tmp_void; evt_channel_hd = g_hash_table_lookup( evt_channel_hash, (gpointer)open_ch_reply->clt_ch_handle); if(evt_channel_hd == NULL){ /*TODO: free open_ch_reply */ return SA_ERR_LIBRARY; } evt_channel_hd->ch_instance = open_ch_reply->ch_instance; evt_hd->callbacks. saEvtChannelOpenCallback( evt_channel_hd->invocation, open_ch_reply->clt_ch_handle, open_ch_reply->ret_code); } } break; case SA_DISPATCH_BLOCKING: break; } return SA_OK;}SaErrorTsaEvtEventDataGet(SaEvtEventHandleT eventHandle, void *eventData, SaSizeT *eventDataSize){ evt_event_handle *event_hd; SaSizeT data_size; event_hd = (evt_event_handle *)g_hash_table_lookup(evt_event_hash, (gpointer)(eventHandle)); if(event_hd == NULL){ return SA_ERR_BAD_HANDLE; } data_size = (event_hd->eventDataSize > *eventDataSize) ? *eventDataSize : event_hd->eventDataSize; *eventDataSize = data_size; memcpy(eventData, event_hd->eventData, data_size); return SA_OK;}SaErrorT saEvtChannelUnlink(SaEvtHandleT evtHandle, const SaNameT *channelName){ void *msg; char *tmp_char; SaSizeT str_len, msg_len; evt_handle *evt_hd; evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash, (gpointer)(long)evtHandle); if(evt_hd == NULL){ return SA_ERR_BAD_HANDLE; } str_len = channelName->length; msg_len = sizeof(char)+sizeof(SaSizeT)+str_len; msg = g_malloc(msg_len); if(msg == NULL){ return SA_ERR_NO_MEMORY; } tmp_char = (char *)msg; *(tmp_char) = EVT_CHANNEL_UNLINK; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); strncpy(tmp_char, channelName->value, str_len); send_to_evt_daemon(evt_hd->ch, msg, msg_len); return SA_OK;}SaErrorT saEvtEventRetentionTimeClear( SaEvtChannelHandleT channelHandle, const SaEvtEventIdT eventId ){ evt_channel_handle *evt_channel_hd; SaSizeT str_len, msg_size; void *msg; char *tmp_char; struct IPC_CHANNEL *ch; int fd; fd_set rset; struct daemon_msg *msg_reply; struct clear_retention_time_reply *clear_reply; evt_event_handle *event_hd; struct open_channel_reply *open_ch_reply; evt_handle *evt_hd; evt_channel_hd = (evt_channel_handle *)g_hash_table_lookup( evt_channel_hash, (gpointer)(long)channelHandle); if(evt_channel_hd == NULL){ return SA_ERR_BAD_HANDLE; } evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash, (gpointer)(long)(evt_channel_hd->evt_handle)); if(evt_hd == NULL){ return SA_ERR_LIBRARY; } /*msg_size=1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT) */ str_len = evt_channel_hd->channelName.length; msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT); msg = g_malloc(msg_size); tmp_char = (char *)msg; *(tmp_char) = EVT_CLEAR_RETENTION_TIME; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); strncpy(tmp_char, evt_channel_hd->channelName.value, str_len); tmp_char += str_len; memcpy(tmp_char, &eventId, sizeof(SaEvtEventIdT)); send_to_evt_daemon(evt_channel_hd->ch, msg, msg_size); /*select wait until receiving the reply */ ch = evt_channel_hd->ch; fd = evt_channel_hd->selectionObject; for(;;){ FD_ZERO(&rset); FD_SET(fd, &rset); if(select(fd + 1, &rset, NULL,NULL,NULL) == -1){ /*perror("select"); */ return SA_ERR_LIBRARY; } msg_reply = read_from_ipc(ch); if(msg_reply->msg_type == EVT_CLEAR_RETENTION_TIME_REPLY){ clear_reply = msg_reply->private.clear_retention_reply; if(clear_reply->event_id == eventId){ return clear_reply->ret_code; }else{ /*error */ /*continue to wait reply from daemon */ continue; } }else if(msg_reply->msg_type == EVT_NORMAL_EVENT){ event_hd = msg_reply->private.event; append_to_event_queue(evt_hd->event_queue, event_hd); }else if(msg_reply->msg_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON){ open_ch_reply = msg_reply->private.open_ch_reply; append_to_reply_queue(evt_hd->event_queue, open_ch_reply); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -