📄 eventd.c
字号:
void *msg; SaSizeT str_len, msg_len, tmp_size; SaUint8T *tmp_char; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot broadcast unlink msg to cluster"); return; } str_len = strlen(channel_name); msg_len = 1+sizeof(SaSizeT)+str_len; msg = g_malloc(msg_len); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_CHANNEL_UNLINK_NOTIFY; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, channel_name, str_len); if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create unlink msg"); g_free(msg); ha_msg_del(m); return; } info->hb->llc_ops->sendclustermsg(info->hb, m); g_free(msg); ha_msg_del(m); return;}static int handle_msg_from_client(IPC_Channel *client, gpointer user_data){ struct client_msg *msg; char *channel_name = NULL; struct evt_channel *evt_ch; struct channel_instance *ch_ins; struct ipc *evt_ipc; struct evt_ch_open_request *ch_open_req; struct evt_event *event; SaEvtEventPatternArrayT *pattern_array; SaErrorT reply; SaEvtEventIdT event_id; unsigned int node_id; struct clear_request *clear_req; int str_len; struct event_timeout_s *event_timeout; SaUint32T local_event_id; msg = evt_read_client_msg(client); if(msg == NULL){ printf("received NULL msg\n"); return 0; } printf("msg_type == %d\n", (char)msg->msg_type); channel_name = msg->channel_name; switch(msg->msg_type){ case EVT_INITIALIZE: evt_ipc = (struct ipc *)g_malloc(sizeof(struct ipc)); evt_ipc->client = client; evt_ipc->channel_instances = g_hash_table_new( g_direct_hash, g_direct_equal); g_hash_table_insert(hash_table_for_ipc, (gpointer)client, (gpointer)evt_ipc); printf("after handle initialize!!\n"); break; case EVT_FINALIZE: evt_ipc = (struct ipc *)g_hash_table_lookup( hash_table_for_ipc, (gpointer)client); if(evt_ipc == NULL){ break; } g_hash_table_remove(hash_table_for_ipc, (gpointer)client); printf("the hash tabe size == %d\n", g_hash_table_size(evt_ipc->channel_instances)); if(evt_ipc->channel_instances != NULL){ g_hash_table_foreach(evt_ipc->channel_instances, free_ch_instance, evt_ipc->channel_instances); g_hash_table_destroy(evt_ipc->channel_instances); } g_free(evt_ipc); break; case EVT_OPEN_EVENT_CHANNEL: evt_ch = find_channel_by_name(channel_name); if((evt_ch != NULL)&&(evt_ch->unlink = FALSE)){ ch_ins = (struct channel_instance *) g_malloc0(sizeof(struct channel_instance)); str_len = strlen(channel_name); ch_ins->ch_name = (char *)g_malloc(str_len+1); memcpy(ch_ins->ch_name, channel_name, str_len); ch_ins->ch_name[str_len] = '\0'; ch_ins->clt_ch_handle = msg->private.ch_open->clt_ch_handle; ch_ins->subscriptions = g_hash_table_new(g_direct_hash, g_direct_equal); g_hash_table_insert(evt_ch->channel_instances, (gpointer)ch_ins, (gpointer)ch_ins); evt_ipc = find_ipc(client); g_hash_table_insert(evt_ipc->channel_instances, (gpointer)ch_ins, (gpointer)ch_ins); /*sleep(1);*/ send_open_channel_reply(client, msg->private.ch_open, ch_ins, SA_OK); }else if((evt_ch != NULL)&&(evt_ch->unlink = TRUE)){ if((msg->private.ch_open->ch_open_flags & SA_EVT_CHANNEL_CREATE) == SA_EVT_CHANNEL_CREATE){ evt_ch->unlink = FALSE; /*be the same as the above brach*/ ch_ins = (struct channel_instance *) g_malloc0(sizeof(struct channel_instance)); ch_ins->clt_ch_handle = msg->private.ch_open->clt_ch_handle; str_len = strlen(channel_name); ch_ins->ch_name = (char *)g_malloc(str_len+1); memcpy(ch_ins->ch_name, channel_name, str_len); ch_ins->ch_name[str_len] = '\0'; ch_ins->subscriptions = g_hash_table_new(g_direct_hash, g_direct_equal); g_hash_table_insert(evt_ch->channel_instances, (gpointer)ch_ins, (gpointer)ch_ins); evt_ipc = find_ipc(client); g_hash_table_insert(evt_ipc->channel_instances, (gpointer)ch_ins, (gpointer)ch_ins); send_open_channel_reply(client, msg->private.ch_open, ch_ins, SA_OK); }else{ ch_open_req = add_pending_ch_open_request( client, channel_name, msg); broadcast_ch_open_req(channel_name, ch_open_req); } }else if((evt_ch == NULL) && ((msg->private.ch_open->ch_open_flags & SA_EVT_CHANNEL_CREATE) == SA_EVT_CHANNEL_CREATE)){ evt_ch = (struct evt_channel *)g_malloc0( sizeof(struct evt_channel)); str_len = strlen(channel_name); evt_ch->channel_name = (char *)g_malloc(str_len+1); memcpy(evt_ch->channel_name, channel_name, str_len); evt_ch->channel_name[str_len] = '\0'; evt_ch->channel_instances = g_hash_table_new(g_direct_hash, g_direct_equal); evt_ch->event_cache = g_hash_table_new(g_direct_hash, g_direct_equal); evt_ch->use_count = 1; g_hash_table_insert(hash_table_for_channel_name, (gpointer)evt_ch->channel_name, (gpointer)evt_ch); ch_ins = (struct channel_instance *) g_malloc0(sizeof(struct channel_instance)); ch_ins->ch_name = (char *)g_malloc(str_len+1); memcpy(ch_ins->ch_name, channel_name, str_len); ch_ins->ch_name[str_len] = '\0'; ch_ins->clt_ch_handle = msg->private.ch_open->clt_ch_handle; ch_ins->subscriptions = g_hash_table_new(g_direct_hash, g_direct_equal); g_hash_table_insert(evt_ch->channel_instances, (gpointer)ch_ins, (gpointer)ch_ins); evt_ipc = find_ipc(client); g_hash_table_insert(evt_ipc->channel_instances, (gpointer)ch_ins, (gpointer)ch_ins); /*sleep(1);*/ send_open_channel_reply(client, msg->private.ch_open, ch_ins, SA_OK); }else{ ch_open_req = add_pending_ch_open_request(client, channel_name, msg); broadcast_ch_open_req(channel_name, ch_open_req); } if(msg->private.ch_open->channel_name != NULL){ g_free(msg->private.ch_open->channel_name); } g_free(msg->private.ch_open); break; case EVT_CLOSE_EVENT_CHANNEL: evt_ch = find_channel_by_name(channel_name); if(evt_ch == NULL){ /*error msg*/ return 1; } ch_ins = g_hash_table_lookup(evt_ch->channel_instances, msg->private.ch_close->ch_ins); if(ch_ins == NULL){ /*error msg*/ return 1; } if(ch_ins->subscriptions != NULL){ g_hash_table_foreach(ch_ins->subscriptions, free_subscriptions, ch_ins->subscriptions); g_hash_table_destroy(ch_ins->subscriptions); } g_hash_table_remove(evt_ch->channel_instances, msg->private.ch_close->ch_ins); evt_ipc = (struct ipc *)g_hash_table_lookup( hash_table_for_ipc, (gpointer)client); g_hash_table_remove(evt_ipc->channel_instances, (gpointer)msg->private.ch_close->ch_ins); g_free(ch_ins); g_free(msg->private.ch_close); break; case EVT_PUBLISH: /*1 forward the event to local subscriber*/ /*2 broadcast the event to cluster*/ event = msg->private.event; evt_ch = find_channel_by_name(channel_name); if(evt_ch == NULL){ send_publish_reply_to_client(client, event, SA_ERR_LIBRARY, 0); return 1; } local_event_id = get_local_event_id(); event_id = get_event_id(local_event_id); printf("the event_id == %Ld\n", event_id); send_publish_reply_to_client(client, event, SA_OK, event_id); /*sleep(1);*/ event->event_id = event_id; pattern_array = msg->private.event->pattern_array; publish_to_local_subscriber(evt_ch, pattern_array, event); broadcast_event_msg_to_cluster(msg); if(event->retention_time != 0){ evt_ch = g_hash_table_lookup( hash_table_for_channel_name, channel_name); g_hash_table_insert(evt_ch->event_cache, (gpointer)local_event_id, (gpointer)event); /*TODO: should start timer in order to remove event when timeout*/ event_timeout = (struct event_timeout_s *)g_malloc( sizeof(struct event_timeout_s)); event_timeout->event_cache = evt_ch->event_cache; event_timeout->event = event; event_timeout->tag = Gmain_timeout_add_full(G_PRIORITY_HIGH, event->retention_time, timeout_for_retention_time, event_timeout, NULL); }else{ /* free event*/ free_event(event); } break; case EVT_SUBSCRIBE: /*1 record the subscription*/ /*2 send the events within the retention time to client */ append_subscription(channel_name, msg->private.subscription); evt_ch = find_channel_by_name(channel_name); if(evt_ch->event_cache != NULL){ /*sleep(1);*/ send_cached_events_to_client(channel_name, msg->private.subscription, evt_ch->event_cache); } broadcast_new_subscription(channel_name, msg->private.subscription); break; case EVT_UNSUBSCRIBE: evt_ch = find_channel_by_name(channel_name); if(evt_ch == NULL){ cl_log(LOG_ERR, "SA_ERR_LIBRARY"); g_free(msg->private.subscription); break; } ch_ins = g_hash_table_lookup(evt_ch->channel_instances, msg->private.subscription->ch_id); if(ch_ins == NULL){ cl_log(LOG_ERR, "SA_ERR_LIBRARY"); g_free(msg->private.subscription); break; } g_hash_table_remove(ch_ins->subscriptions, (gpointer)(long)(msg->private.subscription->subscription_id)); g_free(msg->private.subscription); break; case EVT_CLEAR_RETENTION_TIME: event_id = msg->private.retention_clear->event_id; node_id = (event_id >> 32); if(node_id == node_list.mynode){ reply = clear_retention_time(channel_name, msg); send_retention_clear_reply_to_client(client, channel_name, msg->private.retention_clear->event_id, reply); }else{ send_clear_to_node(channel_name, event_id, node_id); clear_req = append_clear_req(client, event_id); /*TODO: timeout function associated with clear*/ } g_free(msg->private.retention_clear); break; case EVT_CHANNEL_UNLINK: evt_ch = find_channel_by_name(channel_name); if(evt_ch != NULL){ evt_ch->unlink = TRUE; } broadcast_unlink(channel_name); break; default: break; } if(channel_name != NULL){ g_free(channel_name); } g_free(msg); return 0;}void hton_64(const SaUint64T *src_64, SaUint64T *dst_64){ SaUint32T high_value, low_value, *tmp_32; const SaUint32T *tmp_32_const; if(byte_order() == 0){ tmp_32_const = (const SaUint32T *)src_64; high_value = *tmp_32_const; tmp_32_const++; low_value = *tmp_32_const; tmp_32 = (SaUint32T *)dst_64; *(tmp_32) = htonl(low_value); tmp_32++; *(tmp_32) = htonl(high_value); } return;}void ntoh_64(const SaUint64T *src_64, SaUint64T *dst_64){ SaUint32T high_value, low_value, *tmp_32; const SaUint32T *tmp_32_const; if(byte_order() == 0){ tmp_32_const = (const SaUint32T *)src_64; low_value = ntohl(*(tmp_32_const)); tmp_32_const++; high_value = ntohl(*(tmp_32_const)); tmp_32 = (SaUint32T *)dst_64; *tmp_32 = high_value; tmp_32++; *tmp_32 = low_value; } return;}static void read_ch_open_reply(const void *bin_msg, struct client_msg *ret){ struct evt_ch_open_reply_remote *ch_open_reply; const SaUint8T *tmp_char; SaSizeT str_len; SaSizeT tmp_size; SaUint64T tmp_key; ch_open_reply = (struct evt_ch_open_reply_remote *)g_malloc0( sizeof(struct evt_ch_open_reply_remote)); ret->private.ch_open_reply_remote = ch_open_reply; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); ch_open_reply->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); memcpy(ch_open_reply->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; ch_open_reply->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&tmp_key, tmp_char, sizeof(SaUint64T)); ntoh_64(&tmp_key, &(ch_open_reply->key)); return;}static void read_ch_open_request_remote(const void *bin_msg, struct client_msg *ret){ struct evt_ch_open_request_remote *ch_open_request; const SaUint8T *tmp_char; SaSizeT str_len; SaSizeT tmp_size; SaUint64T tmp_key; ch_open_request = (struct evt_ch_open_request_remote *)g_malloc0( sizeof(struct evt_ch_open_request_remote)); ret->private.ch_open_request_remote = ch_open_request; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); ch_open_request->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); memcpy(ch_open_request->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; ch_open_request->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&tmp_key, tmp_char, sizeof(SaUint64T)); ntoh_64(&tmp_key, &(ch_open_request->key)); return; }static void read_event_msg(const void *bin_msg, struct client_msg *ret){ struct evt_event *event; SaEvtEventPatternArrayT *pattern_array; const SaUint8T *tmp_char, *tmp_char_pattern; SaSizeT str_len, number, i; SaSizeT tmp_size; SaEvtEventPatternT *patterns; SaEvtEventIdT tmp_event_id; SaTimeT tmp_time; event = (struct evt_event *)g_malloc(sizeof(struct evt_event)); ret->private.event = event;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -