📄 event_lib.c
字号:
if(msg_reply == NULL){ printf("received NULL msg from daemon\n"); return 0; } if(msg_reply->msg_type == EVT_CH_OPEN_REPLY_FROM_DAEMON){ if(channel_handle == msg_reply->private.open_ch_reply->clt_ch_handle){ if(msg_reply->private.open_ch_reply->ret_code == SA_OK){ break; }else{ return msg_reply->private.open_ch_reply->ret_code; } }else{ /*update timeout, continue */ } }else if(msg_reply->msg_type == EVT_NORMAL_EVENT) { event_hd = msg_reply->private.event; append_to_event_queue(evt_hd->event_queue, event_hd); /*TODO: update timeout, continue */ }else if(msg_reply->msg_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON) { open_ch_reply = msg_reply->private.open_ch_reply; append_to_reply_queue(evt_hd->event_queue, open_ch_reply); }else { /*update timeout, continue */ /*error msg */ } } /*if open evt_channel succeed */ evt_channel_hd->channelName.length = str_len; strncpy(evt_channel_hd->channelName.value, channelName->value, str_len); evt_channel_hd->evt_handle = evtHandle; evt_channel_hd->ch = evt_hd->ch; evt_channel_hd->selectionObject = fd; evt_channel_hd->open_flags = channelOpenFlags; evt_channel_hd->ch_instance = msg_reply->private.open_ch_reply->ch_instance; evt_channel_hd->event_handle_hash = g_hash_table_new(g_direct_hash, g_direct_equal); evt_channel_hd->subscription_hash = g_hash_table_new(g_direct_hash, g_direct_equal); g_hash_table_insert(channel_hash, (gpointer)channel_handle, evt_channel_hd); g_hash_table_insert(evt_channel_hash, (gpointer)channel_handle, evt_channel_hd); *(channelHandle) = channel_handle; return SA_OK;}static void free_open_ch_reply(struct open_channel_reply *open_ch_reply){ g_free(open_ch_reply); return;}static void free_event_queue(struct event_queue_s *event_queue){ SaSizeT i; struct queue_head *queue; evt_event_handle *event_tmp; struct open_channel_reply *open_ch_reply; if(event_queue->event_number != 0){ for(i=SA_EVT_HIGHEST_PRIORITY;i<=SA_EVT_LOWEST_PRIORITY;i++){ queue = &(event_queue->queue[i]); while(queue->head != NULL){ event_tmp = queue->head; queue->head = event_tmp->next; free_event(event_tmp); } g_free(queue); } } if(event_queue->reply_number != 0){ while(event_queue->open_ch_reply_queue.head != NULL){ open_ch_reply = event_queue->open_ch_reply_queue.head; event_queue->open_ch_reply_queue.head = open_ch_reply->next; free_open_ch_reply(open_ch_reply); } } g_free(event_queue);}static void free_event(evt_event_handle *event_hd){ /*free patternArray, publisherName, then free event_hd */ g_free(event_hd->patternArray); g_free(event_hd->eventData); g_free(event_hd); return;}static void free_subscription_resource(gpointer key, gpointer value, gpointer user_data){ GHashTable *sub_hash = user_data; g_hash_table_remove(sub_hash, key); g_hash_table_remove(subscription_global, key); return;}static void free_event_resource(gpointer key, gpointer value, gpointer user_data){ SaEvtEventHandleT event_handle; evt_event_handle *event_hd; GHashTable *event_handle_hash; event_handle = (SaEvtEventHandleT)key; event_hd = (evt_event_handle *)value; event_handle_hash = (GHashTable *)user_data; free_event(event_hd); g_hash_table_remove(evt_event_hash, key); return;}static void free_ch_resource(gpointer key, gpointer value, gpointer user_data){ SaEvtChannelHandleT ch_handle; evt_channel_handle *ch_hd; GHashTable *ch_hash, *event_handle_hash, *subscription_hash; ch_handle = (SaEvtChannelHandleT)key; ch_hd = (evt_channel_handle *)value; ch_hash = (GHashTable *)user_data; g_hash_table_remove(evt_channel_hash, (gpointer)ch_handle); g_hash_table_remove(ch_hash, (gpointer)ch_handle); event_handle_hash = ch_hd->event_handle_hash; if(event_handle_hash != NULL){ g_hash_table_foreach(event_handle_hash, free_event_resource, event_handle_hash); } subscription_hash = ch_hd->subscription_hash; if(subscription_hash != NULL){ g_hash_table_foreach(subscription_hash, free_subscription_resource, subscription_hash); } return;}static SaErrorT send_evt_finalize(struct IPC_CHANNEL *ch, evt_handle *evt_hd){ SaSizeT msg_len; void *msg; SaUint8T *tmp_char; msg_len = 1; msg = g_malloc(msg_len); if(msg == NULL){ return SA_ERR_NO_MEMORY; } tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_FINALIZE; send_to_evt_daemon(ch, msg, msg_len); return SA_OK;}SaErrorT saEvtFinalize(SaEvtHandleT evtHandle){ evt_handle *evt_hd; struct IPC_CHANNEL *ch; evt_hd = g_hash_table_lookup(evt_handle_hash, (gpointer)evtHandle); if( evt_hd == NULL){ return SA_ERR_BAD_HANDLE; } /*close the connection */ ch = evt_hd->ch; send_evt_finalize(ch, evt_hd); ch->ops->destroy(ch); /*free up the resources, free the channel, the events */ g_hash_table_remove(evt_handle_hash, (gpointer)evtHandle); g_hash_table_foreach(evt_hd->evt_channel_hash, free_ch_resource, evt_hd->evt_channel_hash); free_event_queue(evt_hd->event_queue); g_free(evt_hd); return SA_OK;}static SaErrorT send_channel_close(IPC_Channel *ch, evt_channel_handle *evt_channel_hd){ void *msg; char *tmp_char; SaSizeT str_len, msg_len; str_len = evt_channel_hd->channelName.length; msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(void *); msg = g_malloc(msg_len); if(msg == NULL){ return SA_ERR_NO_MEMORY; } tmp_char = msg; *tmp_char = EVT_CLOSE_EVENT_CHANNEL; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); strncpy(tmp_char, evt_channel_hd->channelName.value, str_len); tmp_char = tmp_char + str_len; memcpy(tmp_char, &(evt_channel_hd->ch_instance), sizeof(void *)); send_to_evt_daemon(ch, msg, msg_len); return SA_OK;}SaErrorT saEvtChannelClose(SaEvtChannelHandleT channelHandle){ evt_channel_handle *evt_channel_hd; evt_handle *evt_hd; GHashTable *event_handle_hash, *subscription_hash; /*free events, free subscriptions, free item in hash table, free evt_channel_hd */ evt_channel_hd = g_hash_table_lookup(evt_channel_hash, (gpointer)channelHandle); if(evt_channel_hd == NULL){ return SA_ERR_BAD_HANDLE; } send_channel_close(evt_channel_hd->ch ,evt_channel_hd); event_handle_hash = evt_channel_hd->event_handle_hash; if(event_handle_hash != NULL){ g_hash_table_foreach(event_handle_hash, free_event_resource, event_handle_hash); g_hash_table_destroy(event_handle_hash); } subscription_hash = evt_channel_hd->subscription_hash; if(subscription_hash != NULL){ g_hash_table_foreach(subscription_hash, free_subscription_resource, subscription_hash); g_hash_table_destroy(subscription_hash); } evt_hd = g_hash_table_lookup(evt_handle_hash, (gpointer)evt_channel_hd->evt_handle); g_hash_table_remove(evt_hd->evt_channel_hash, (gpointer)channelHandle); g_hash_table_remove(evt_channel_hash, (gpointer)(channelHandle)); g_free(evt_channel_hd); return SA_OK;}SaErrorT saEvtEventAllocate(const SaEvtChannelHandleT channelHandle, SaEvtEventHandleT *eventHandle){ evt_event_handle *event_hd; evt_channel_handle *evt_channel_hd; GHashTable *event_handle_hash; if(eventHandle == NULL){ return SA_ERR_INVALID_PARAM; } evt_channel_hd = g_hash_table_lookup(evt_channel_hash, (gpointer)channelHandle); if(evt_channel_hd == NULL){ return SA_ERR_BAD_HANDLE; } event_hd = (evt_event_handle *)g_malloc0(sizeof(evt_event_handle)); if (!event_hd) return SA_ERR_NO_MEMORY; event_handle_hash = evt_channel_hd->event_handle_hash; event_hd->channelId = channelHandle; if(get_handle(&event_handle_database, eventHandle) != SA_OK){ g_free(event_hd); return SA_ERR_LIBRARY; } g_hash_table_insert(evt_event_hash, (gpointer)*eventHandle, event_hd); g_hash_table_insert(evt_channel_hd->event_handle_hash, (gpointer)*eventHandle, event_hd); return SA_OK;}SaErrorT saEvtEventFree(SaEvtEventHandleT eventHandle){ evt_event_handle *event_hd; evt_channel_handle *evt_channel_hd; event_hd = g_hash_table_lookup(evt_event_hash, (gpointer)eventHandle); if(event_hd == NULL){ return SA_ERR_BAD_HANDLE; } free_event(event_hd); evt_channel_hd = g_hash_table_lookup(evt_channel_hash, (gpointer)event_hd->channelId); if(evt_channel_hd == NULL){ return SA_ERR_LIBRARY; } g_hash_table_remove(evt_channel_hd->event_handle_hash, (gpointer)eventHandle); g_hash_table_remove(evt_event_hash, (gpointer)eventHandle); put_handle(&event_handle_database, eventHandle); return SA_OK;}static SaErrorT copy_patternarray(evt_event_handle *event_hd, const SaEvtEventPatternArrayT *patternArray){ SaSizeT number, i, size=0; SaEvtEventPatternT *pattern; SaSizeT *tmp_size; SaUint8T *tmp_char; number = patternArray->patternsNumber; pattern = patternArray->patterns; /*size = field1(number of patterns)+ field2(length of each pattern)+ field3(patterns) */ for(i=0; i<number; i++){ size = size + (pattern+i)->patternSize; } size = size + (number+1)*sizeof(SaSizeT); event_hd->patternArray = g_malloc(size); event_hd->event_size = size; if(event_hd->patternArray == NULL){ return SA_ERR_NO_MEMORY; } tmp_size = event_hd->patternArray; *(tmp_size) = number; tmp_size++; for(i=0; i<number; i++){ *(tmp_size) = (pattern+i)->patternSize; tmp_size++; } tmp_char = (SaUint8T *)tmp_size; for(i=0; i<number; i++){ strncpy(tmp_char, (pattern+i)->pattern, (pattern+i)->patternSize); tmp_char = tmp_char + (pattern+i)->patternSize; } return SA_OK;}SaErrorT saEvtEventAttributesSet(SaEvtEventHandleT eventHandle, const SaEvtEventPatternArrayT *patternArray, SaUint8T priority, SaTimeT retentionTime, const SaNameT *publisherName){ evt_event_handle *event_hd; if((patternArray == NULL) || (publisherName == NULL)){ return SA_ERR_INVALID_PARAM; } if(priority > SA_EVT_LOWEST_PRIORITY){ return SA_ERR_INVALID_PARAM; } event_hd = (evt_event_handle *)g_hash_table_lookup(evt_event_hash, (gpointer)(eventHandle)); if(event_hd == NULL){ return SA_ERR_BAD_HANDLE; } copy_patternarray(event_hd, patternArray); event_hd->priority = priority; event_hd->retentionTime = retentionTime; event_hd->publisherName.length = publisherName->length; memcpy(event_hd->publisherName.value, publisherName->value, publisherName->length); event_hd->publishTime = (SaTimeT)time(NULL); event_hd->set_flag = 1; return SA_OK;}#define min(A,B) ((A)<(B) ? (A) : (B))SaErrorT saEvtEventAttributesGet(const SaEvtEventHandleT eventHandle, SaEvtEventPatternArrayT *patternArray, SaUint8T *priority, SaTimeT *retentionTime, SaNameT *publisherName, SaTimeT *publishTime, SaEvtEventIdT *eventId){ evt_event_handle *event_hd; SaSizeT number, *tmp_size, min_number, i; SaUint8T *tmp_char; SaEvtEventPatternT *patterns; event_hd = (evt_event_handle *)g_hash_table_lookup( evt_event_hash, (gpointer)eventHandle); if(event_hd == NULL){ return SA_ERR_BAD_HANDLE; } /*TODO: what should be done if patterSize conflicts */ if(patternArray != NULL){ tmp_size = (SaSizeT *)event_hd->patternArray; number = *(tmp_size); tmp_size++; tmp_char = (SaUint8T *)(tmp_size+number); min_number = min(patternArray->patternsNumber, number); patternArray->patternsNumber = min_number; patterns = patternArray->patterns; for(i=0; i<min_number; i++){ patterns[i].patternSize = min(patterns[i].patternSize, *(tmp_size)); memcpy(patterns[i].pattern, tmp_char, patterns[i].patternSize); tmp_char += *(tmp_size); tmp_size++; } } if(priority != NULL){ *(priority) = event_hd->priority; } if(retentionTime != NULL){ *(retentionTime) = event_hd->retentionTime; } if(publisherName != NULL){ publisherName->length = event_hd->publisherName.length; memcpy(publisherName->value, event_hd->publisherName.value, publisherName->length); } if(publishTime != NULL){ *(publishTime) = event_hd->publishTime; } if(eventId != NULL){ *(eventId) = event_hd->eventId; } return SA_OK;}static SaErrorT send_publish(IPC_Channel *ch, SaNameT *channel_name, SaEvtEventHandleT eventHandle, evt_event_handle *event_hd, const void *eventData, SaSizeT eventDataSize){ void *msg; char *tmp_char; SaSizeT str_len, publisher_len, msg_len; str_len = channel_name->length; publisher_len = event_hd->publisherName.length; msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventHandleT)+ sizeof(SaUint8T)+sizeof(SaTimeT)+sizeof(SaSizeT)+ publisher_len+sizeof(SaTimeT)+sizeof(SaSizeT)+ event_hd->event_size+sizeof(SaSizeT)+eventDataSize; msg = g_malloc(msg_len); if(msg == NULL){ return SA_ERR_NO_MEMORY; } tmp_char = (char *)msg; *(tmp_char) = EVT_PUBLISH; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, channel_name->value, str_len); tmp_char += str_len; memcpy(tmp_char, &eventHandle, sizeof(SaEvtEventHandleT)); tmp_char += sizeof(SaEvtEventHandleT); memcpy(tmp_char, &(event_hd->priority), sizeof(SaUint8T)); tmp_char += sizeof(SaUint8T); memcpy(tmp_char, &(event_hd->retentionTime), sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(tmp_char, &publisher_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event_hd->publisherName.value, publisher_len); tmp_char += publisher_len; memcpy(tmp_char, &(event_hd->publishTime), sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(tmp_char, &(event_hd->event_size), sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event_hd->patternArray, event_hd->event_size); tmp_char += event_hd->event_size; memcpy(tmp_char, &eventDataSize, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, eventData, eventDataSize); tmp_char += eventDataSize; send_to_evt_daemon(ch, msg, msg_len); return SA_OK;}SaErrorT saEvtEventPublish(const SaEvtEventHandleT eventHandle, const void *eventData, SaSizeT eventDataSize, SaEvtEventIdT *eventId){ evt_channel_handle *evt_channel_hd; evt_event_handle *event_hd; evt_handle *evt_hd; struct IPC_CHANNEL *ch; SaNameT *channel_name; int fd; fd_set rset; struct daemon_msg *msg_reply; struct open_channel_reply *open_ch_reply; event_hd = g_hash_table_lookup(evt_event_hash, (gpointer)eventHandle); if(event_hd == NULL){ return SA_ERR_BAD_HANDLE; } if(event_hd->set_flag == 0){ return SA_ERR_INVALID_PARAM; } evt_channel_hd = g_hash_table_lookup(evt_channel_hash, (gpointer)event_hd->channelId); if(evt_channel_hd == NULL){ return SA_ERR_LIBRARY; } evt_hd = g_hash_table_lookup(evt_handle_hash, (gpointer)(evt_channel_hd->evt_handle)); if(evt_hd == NULL){ return SA_ERR_LIBRARY; } ch = evt_channel_hd->ch; channel_name = &(evt_channel_hd->channelName); send_publish(ch, channel_name, eventHandle, event_hd, eventData, eventDataSize); sleep(1); fd = evt_channel_hd->selectionObject; for(;;){ FD_ZERO(&rset);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -