📄 eventd.c
字号:
my_node_id = node_list.mynode; event_id = my_node_id; event_id = (event_id << 32); event_id += local_event_id; return event_id;}static void broadcast_event_msg_to_cluster(struct client_msg *msg){ struct ha_msg *m; SaSizeT msg_len, str_len, publisher_len, number, tmp_size, i; SaEvtEventPatternT *pattern; void *bin_msg; SaUint8T *tmp_char; struct evt_event *event = msg->private.event; SaTimeT tmp_time; SaEvtEventIdT tmp_event_id; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot broadcast event to cluster"); return; } str_len = strlen(msg->channel_name); publisher_len = event->publisherName.length; msg_len = 1+sizeof(SaSizeT)+str_len+1+sizeof(SaTimeT)+sizeof(SaSizeT)+ publisher_len+sizeof(SaTimeT)+sizeof(SaSizeT)+ event->pattern_size+sizeof(SaSizeT)+event->data_size+ sizeof(SaEvtEventIdT); bin_msg = g_malloc(msg_len); if(bin_msg == NULL){ return; } number = event->pattern_array->patternsNumber; pattern = event->pattern_array->patterns; tmp_char = (SaUint8T *)bin_msg; *(tmp_char) = EVT_EVENT_MSG; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, msg->channel_name, str_len); tmp_char += str_len; *(tmp_char) = event->priority; tmp_char++; hton_64(&(event->retention_time), &tmp_time); memcpy(tmp_char, &tmp_time, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); tmp_size = htonl(publisher_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event->publisherName.value, publisher_len); tmp_char += publisher_len; hton_64(&(event->publish_time), &tmp_time); memcpy(tmp_char, &tmp_time, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); tmp_size = htonl(event->pattern_size); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); tmp_size = htonl(number); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); for(i=0; i<number; i++){ tmp_size = htonl((pattern+i)->patternSize); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); } for(i=0; i<number; i++){ memcpy(tmp_char, (pattern+i)->pattern, (pattern+i)->patternSize); tmp_char += (pattern+i)->patternSize; } tmp_size = htonl(event->data_size); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event->event_data, event->data_size); tmp_char += event->data_size; hton_64(&(event->event_id), &tmp_event_id); memcpy(tmp_char, &tmp_event_id, sizeof(SaEvtEventIdT)); if((ha_msg_addbin(m, BIN_CONTENT, bin_msg, msg_len) == HA_FAIL)|| (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create event"); g_free(bin_msg); ha_msg_del(m); return; } info->hb->llc_ops->sendclustermsg(info->hb, m); g_free(bin_msg); ha_msg_del(m); return;}static void send_publish_reply_to_client(IPC_Channel *client, struct evt_event *event, SaErrorT ret_code, SaEvtEventIdT event_id){ void *msg; SaSizeT msg_size; char *tmp_char; msg_size = 1+sizeof(SaEvtEventHandleT)+ sizeof(SaEvtEventIdT)+sizeof(SaErrorT); msg = g_malloc(msg_size); tmp_char = msg; *(tmp_char) = EVT_PUBLISH_REPLY; tmp_char++; memcpy(tmp_char, &(event->clt_event_hd), sizeof(SaEvtEventHandleT)); tmp_char += sizeof(SaEvtEventHandleT); memcpy(tmp_char, &event_id, sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); memcpy(tmp_char, &ret_code, sizeof(SaErrorT)); send_to_client(client, msg, msg_size); return;}static void search_cached_event(gpointer key, gpointer value, gpointer user_data){ struct evt_subscription *subscription; struct evt_event *event; event = (struct evt_event *)value; subscription = (struct evt_subscription *)user_data; if(matched(event->pattern_array, subscription->filters)){ deliver_event_to_local_subscriber(subscription, event); } return;}static void send_cached_events_to_client(char *channel_name, struct evt_subscription *subscription, GHashTable *event_cache){ /*compare the pattern against the filter, deliver event to client if match*/ g_hash_table_foreach(event_cache, search_cached_event, subscription); return;}static SaErrorT append_subscription(SaUint8T *channel_name, struct evt_subscription *subscription){ struct evt_channel *evt_ch; struct channel_instance *ch_instance; evt_ch = find_channel_by_name(channel_name); if(evt_ch == NULL){ return SA_ERR_LIBRARY; } ch_instance = g_hash_table_lookup(evt_ch->channel_instances, subscription->ch_id); g_hash_table_insert(ch_instance->subscriptions, (gpointer)(long)(subscription->subscription_id), subscription); return SA_OK;}static void send_retention_clear_reply_to_client(IPC_Channel *client, char *channel_name, SaEvtEventIdT event_id, SaErrorT reply){ SaSizeT str_len, msg_size; void *msg; char *tmp_char; str_len = strlen(channel_name); msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT)+ sizeof(SaErrorT); msg = g_malloc(msg_size); tmp_char = (char *)msg; *(tmp_char) = EVT_CLEAR_RETENTION_TIME_REPLY; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); strncpy(tmp_char, channel_name, str_len); tmp_char += str_len; memcpy(tmp_char, &event_id, sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); memcpy(tmp_char, &reply, sizeof(SaErrorT)); send_to_client(client, msg, msg_size); return;}static void broadcast_ch_open_req(SaUint8T *channel_name, struct evt_ch_open_request *ch_open_req){ SaSizeT str_len, msg_len, tmp_size; SaUint64T tmp_key, key; SaUint8T *tmp_char; void *msg; struct ha_msg *m; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot broadcast open_ch request to cluster"); return; } str_len = strlen(channel_name); msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T); msg = g_malloc(msg_len); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_CH_OPEN_REQUEST; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, channel_name, str_len); tmp_char += str_len; key = (SaUint64T)(long)ch_open_req; hton_64(&key, &tmp_key); memcpy(tmp_char, &tmp_key, sizeof(SaUint64T)); if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create open_ch request message"); g_free(msg); ha_msg_del(m); return; } info->hb->llc_ops->sendclustermsg(info->hb, m); g_free(msg); ha_msg_del(m); return;}static void broadcast_new_subscription(SaUint8T *channel_name, struct evt_subscription *subscription){ struct ha_msg *m; SaSizeT str_len, msg_len, tmp_size, number, i; void *msg; SaUint8T *tmp_char; SaUint64T tmp_64, key; SaUint32T tmp_32; SaEvtEventFilterT *filter; SaEvtEventFilterTypeT tmp_filter_type; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot broadcast new subscription to cluster"); return; } str_len = strlen(channel_name); msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T)+sizeof(SaUint32T)+ sizeof(SaSizeT)+subscription->filters_size; msg = g_malloc(msg_len); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_NEW_SUBSCRIBE; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, channel_name, str_len); tmp_char += str_len; key = (SaUint64T)(long)(subscription->ch_id); hton_64(&key, &tmp_64); memcpy(tmp_char, &tmp_64, sizeof(SaUint64T)); tmp_char += sizeof(SaUint64T); tmp_32 = htonl(subscription->subscription_id); memcpy(tmp_char, &tmp_32, sizeof(SaUint32T)); tmp_char += sizeof(SaUint32T); tmp_size = htonl(subscription->filters_size); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); number = subscription->filters->filtersNumber; tmp_size = htonl(number); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); filter = subscription->filters->filters; for(i=0; i<number; i++){ tmp_filter_type = htonl(filter[i].filterType); memcpy(tmp_char, &tmp_filter_type, sizeof(SaEvtEventFilterTypeT)); tmp_char += sizeof(SaEvtEventFilterTypeT); tmp_size = htonl(filter[i].filter.patternSize); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, filter[i].filter.pattern, filter[i].filter.patternSize); tmp_char += filter[i].filter.patternSize; } if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create open_ch request message"); g_free(msg); ha_msg_del(m); return; } info->hb->llc_ops->sendclustermsg(info->hb, m); g_free(msg); ha_msg_del(m); return;}struct event_timeout_s{ GHashTable *event_cache; struct evt_event *event; guint tag;};static void free_event(struct evt_event *event){ SaEvtEventPatternArrayT *pattern_array; SaEvtEventPatternT *patterns; SaSizeT number, i; pattern_array = event->pattern_array; patterns = pattern_array->patterns; number = pattern_array->patternsNumber; g_free(event->event_data); for(i=0; i<number; i++){ g_free(patterns[i].pattern); } g_free(patterns); g_free(pattern_array); put_handle(&event_id_database, event->event_id); g_free(event); return;}static void free_remote_event(struct evt_event *event){ SaEvtEventPatternArrayT *pattern_array; SaEvtEventPatternT *patterns; SaSizeT number, i; pattern_array = event->pattern_array; patterns = pattern_array->patterns; number = pattern_array->patternsNumber; /*g_free(event->publisherName);*/ g_free(event->event_data); for(i=0; i<number; i++){ g_free(patterns[i].pattern); } g_free(patterns); g_free(pattern_array); g_free(event); return;}static gboolean timeout_for_retention_time(gpointer user_data){ struct event_timeout_s *event_timeout; GHashTable *event_cache; struct evt_event *event; SaUint32T tmp_32; event_timeout = (struct event_timeout_s *)user_data; event_cache = event_timeout->event_cache; event = event_timeout->event; tmp_32 = event->event_id; g_hash_table_remove(event_cache, (gpointer)(long)(tmp_32)); free_event(event); Gmain_timeout_remove(event_timeout->tag); g_free(event_timeout); return TRUE;}static SaErrorT clear_retention_time(char *channel_name, struct client_msg *msg){ struct evt_channel *evt_ch; struct evt_event *event; SaUint32T tmp_32; evt_ch = g_hash_table_lookup(hash_table_for_channel_name, channel_name); tmp_32 = msg->private.retention_clear->event_id; event = g_hash_table_lookup(evt_ch->event_cache, (gpointer)(long)(tmp_32)); if(event == NULL){ return SA_ERR_NOT_EXIST; } g_hash_table_remove(evt_ch->event_cache, (gpointer)(long)(tmp_32)); /*the event will be released in timeout*/ /*free_event(event);*/ return SA_OK;}struct clear_request{ IPC_Channel *client; SaEvtEventIdT event_id;};static struct clear_request *append_clear_req(IPC_Channel *client, SaEvtEventIdT event_id){ struct clear_request *clear_req; SaUint32T tmp_32; clear_req = (struct clear_request *)g_malloc( sizeof(struct clear_request)); clear_req->client = client; clear_req->event_id = event_id; tmp_32 = event_id; g_hash_table_insert(info->evt_pending_clear_requests, (gpointer)(long)tmp_32, clear_req); return clear_req;}static void send_clear_to_node(SaUint8T *channel_name, SaEvtEventIdT event_id, SaInt32T node_id){ struct ha_msg *m; void *msg; char *to_id; SaSizeT msg_len, str_len, tmp_size; SaUint8T *tmp_char; SaEvtEventIdT tmp_event_id; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send clear reply to remote"); return; } to_id = node_list.nodes[node_id].NodeID; str_len = strlen(channel_name); msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT); msg = g_malloc(msg_len); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_RETENTION_CLEAR_REQUEST; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, channel_name, str_len); tmp_char += str_len; hton_64(&(event_id), &tmp_event_id); memcpy(tmp_char, &tmp_event_id, sizeof(SaEvtEventIdT)); if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create clear reply message"); g_free(msg); ha_msg_del(m); return; } info->hb->llc_ops->sendnodemsg(info->hb, m, to_id); g_free(msg); ha_msg_del(m); return;}static void broadcast_unlink(SaUint8T *channel_name){ struct ha_msg *m;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -