📄 eventd.c
字号:
SaUint8T *tmp_char; SaSizeT str_len, number, i; SaEvtEventFilterT *filter; subscription = (struct evt_subscription *)g_malloc( sizeof(struct evt_subscription)); subscription->filters = (SaEvtEventFilterArrayT *)g_malloc( sizeof(SaEvtEventFilterArrayT)); subscription->client = ch; ret->private.subscription = subscription; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&(subscription->ch_id), tmp_char, sizeof(void *)); tmp_char += sizeof(void *); memcpy(&(subscription->subscription_id), tmp_char, sizeof(SaEvtSubscriptionIdT)); tmp_char += sizeof(SaEvtSubscriptionIdT); memcpy(&(subscription->filters_size), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(&number, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); subscription->filters->filtersNumber = number; filter = (SaEvtEventFilterT *)g_malloc( sizeof(SaEvtEventFilterT)*number); subscription->filters->filters = filter; for(i=0; i<number; i++){ memcpy(&(filter[i].filterType), tmp_char, sizeof(SaEvtEventFilterTypeT)); tmp_char += sizeof(SaEvtEventFilterTypeT); memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); filter[i].filter.patternSize = str_len; filter[i].filter.pattern = (SaUint8T *)g_malloc(str_len); memcpy(filter[i].filter.pattern, tmp_char, str_len); tmp_char += str_len; } memcpy(&(subscription->clt_ch_handle), tmp_char, sizeof(SaEvtChannelHandleT)); return; }static void read_unlink_client(void *msg, struct client_msg *ret){ SaUint8T *tmp_char; SaSizeT str_len; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; return;}static struct client_msg *evt_read_client_msg(IPC_Channel *ch){ IPC_Message* ipc_msg; struct client_msg *ret; char *tmp_char; ret = (struct client_msg *)g_malloc0(sizeof(struct client_msg)); ch->ops->waitin(ch); if(ch->ops->recv(ch, &ipc_msg) != IPC_OK){ return NULL; } tmp_char = (char *)ipc_msg->msg_body; ret->msg_type = *(tmp_char);/* printf("msg_type == %d\n", (char)ret->msg_type);*/ switch(*(tmp_char)){ case EVT_INITIALIZE: break; case EVT_FINALIZE: break; case EVT_PUBLISH: read_publish(ipc_msg->msg_body, ret); break; case EVT_SUBSCRIBE: read_subscribe(ch, ipc_msg->msg_body, ret); break; case EVT_UNSUBSCRIBE: read_unsubscribe(ipc_msg->msg_body, ret); break; case EVT_OPEN_EVENT_CHANNEL: read_open_channel(ipc_msg->msg_body, ret); break; case EVT_CLOSE_EVENT_CHANNEL: read_close_channel(ipc_msg->msg_body, ret); break; case EVT_CLEAR_RETENTION_TIME: read_clear_retention_time(ipc_msg->msg_body, ret); break; case EVT_CHANNEL_UNLINK: read_unlink_client(ipc_msg->msg_body, ret); break; default: break; } return ret;}static void free_filters(SaEvtEventFilterArrayT *filters){ SaSizeT filters_number, i; filters_number = filters->filtersNumber; for(i=0; i<filters_number; i++){ if((filters->filters+i)->filter.pattern != NULL){ g_free((filters->filters+i)->filter.pattern); } } g_free(filters->filters); g_free(filters); return;}static void free_subscriptions(gpointer key, gpointer value, gpointer user_data){ struct evt_subscription *sub = (struct evt_subscription *)value; free_filters(sub->filters); g_free(sub); return;}static struct evt_channel *find_channel_by_name(SaUint8T *channel_name){ return (struct evt_channel *)g_hash_table_lookup( hash_table_for_channel_name, (gpointer)channel_name);}static void free_ch_instance(gpointer key, gpointer value, gpointer user_data){ struct channel_instance *ch_instance; GHashTable *channel_instances; struct evt_channel *evt_ch; ch_instance = (struct channel_instance *)value; channel_instances = (GHashTable *)user_data; evt_ch = find_channel_by_name(ch_instance->ch_name); g_hash_table_remove(evt_ch->channel_instances, key); if(ch_instance->subscriptions != NULL){ g_hash_table_foreach(ch_instance->subscriptions, free_subscriptions, ch_instance->subscriptions); g_hash_table_destroy(ch_instance->subscriptions); } g_free(ch_instance->ch_name); g_free(ch_instance); return;}static struct evt_ch_open_request *add_pending_ch_open_request(IPC_Channel *client, char *ch_name, struct client_msg *msg){ struct evt_ch_open_request *ch_open_req; ch_open_req = (struct evt_ch_open_request *)g_malloc( sizeof(struct evt_ch_open_request)); /*TODO: copy channel name*/ ch_open_req->clt_ch_handle = msg->private.ch_open->clt_ch_handle; ch_open_req->client = client; ch_open_req->time_out = msg->private.ch_open->time_out; /*TODO: we need compare client+clt_ch_handle+ch_name to determine a key*/ g_hash_table_insert(info->evt_pending_ch_open_requests, (gpointer)ch_open_req, (gpointer)ch_open_req); /*TODO: start a timer*/ return ch_open_req; }static struct ipc *find_ipc(gpointer key){ struct ipc *ret; ret = (struct ipc *)g_hash_table_lookup(hash_table_for_ipc, key); return ret;}static void send_to_client(struct IPC_CHANNEL *client, void *msg, SaSizeT msg_size){ struct IPC_MESSAGE Msg; memset(&Msg, 0, sizeof(Msg)); Msg.msg_body = msg; Msg.msg_len = msg_size; Msg.msg_done = NULL; Msg.msg_private = NULL; Msg.msg_ch = client; client->ops->send(client, &Msg); return;}static int send_open_channel_reply(IPC_Channel *client, struct evt_ch_open *ch_open, struct channel_instance *ch_ins, SaErrorT ret_code){ SaSizeT str_len, msg_size; void *msg; char *tmp_char; str_len = strlen(ch_open->channel_name); msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtChannelHandleT) +sizeof(void *)+sizeof(SaErrorT); msg = g_malloc(msg_size); tmp_char = (char *)msg; *(tmp_char) = EVT_CH_OPEN_REPLY_FROM_DAEMON; tmp_char++; memcpy(tmp_char, &str_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, ch_open->channel_name, str_len); tmp_char += str_len; memcpy(tmp_char, &(ch_open->clt_ch_handle), sizeof(SaEvtChannelHandleT)); tmp_char += sizeof(SaEvtChannelHandleT); memcpy(tmp_char, &(ch_ins), sizeof(void *)); tmp_char += sizeof(void *); memcpy(tmp_char, &ret_code, sizeof(SaErrorT)); tmp_char += sizeof(SaErrorT); send_to_client(client, msg, msg_size); return 0;}/*determine the byte order of platform*//*0 indicate big-endian; 1 indicate little-endian*/static int byte_order(void){ union{ short s; char c[2]; } un; un.s = 0x0102; if((un.c[0] == 1) && (un.c[1] == 2)){ return 0; }else if((un.c[0] == 2) && (un.c[1] == 1)){ return 1; } return -1;}static void deliver_event_to_local_subscriber(struct evt_subscription *subscription, struct evt_event *event){ SaSizeT number, msg_size, i, size=0, publisher_len; SaEvtEventPatternT *pattern; SaUint8T *tmp_char; void *msg; /*calculate the size of pattern_array*/ number = event->pattern_array->patternsNumber; pattern = event->pattern_array->patterns; for(i=0; i<number; i++){ size = size + (pattern+i)->patternSize; } size = size + (number+1)*sizeof(SaSizeT); /*calculate the size of publisher_name*/ publisher_len = event->publisherName.length; msg_size = 1+sizeof(SaEvtChannelHandleT)+sizeof(SaEvtSubscriptionIdT)+ sizeof(SaSizeT)+size+sizeof(SaUint8T)+sizeof(SaTimeT)+ sizeof(SaSizeT)+publisher_len+sizeof(SaTimeT)+ sizeof(SaEvtEventIdT)+sizeof(SaSizeT)+event->data_size; msg = g_malloc(msg_size); /*msg type == EVT_NORMAL_EVENT*/ tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_NORMAL_EVENT; tmp_char++; memcpy(tmp_char, &(subscription->clt_ch_handle), sizeof(SaEvtChannelHandleT)); tmp_char += sizeof(SaEvtChannelHandleT); memcpy(tmp_char, &(subscription->subscription_id), sizeof(SaEvtSubscriptionIdT)); tmp_char += sizeof(SaEvtSubscriptionIdT); memcpy(tmp_char, &size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, &number, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); for(i=0; i<number; i++){ memcpy(tmp_char, &((pattern+i)->patternSize), sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); } for(i=0; i<number; i++){ strncpy(tmp_char, (pattern+i)->pattern, (pattern+i)->patternSize); tmp_char += (pattern+i)->patternSize; } memcpy(tmp_char, &(event->priority), 1); tmp_char++; memcpy(tmp_char, &(event->retention_time), sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(tmp_char, &publisher_len, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event->publisherName.value, publisher_len); tmp_char += publisher_len; memcpy(tmp_char, &(event->publish_time), sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(tmp_char, &(event->event_id), sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); memcpy(tmp_char, &(event->data_size), sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event->event_data, event->data_size); send_to_client(subscription->client, msg, msg_size); return;}static int fliter_match(SaEvtEventPatternT *pattern, SaEvtEventFilterT *filter){ SaSizeT filter_len = filter->filter.patternSize; SaSizeT pattern_len = pattern->patternSize; SaSizeT i; switch(filter->filterType){ case SA_EVT_PREFIX_FILTER: if(pattern_len < filter_len){ return 0; } for(i=0; i<filter_len; i++){ if(pattern->pattern[i] == filter->filter.pattern[i]){ continue; }else{ return 0; } } break; case SA_EVT_EXACT_FILTER: if(pattern_len != filter_len){ return 0; } for(i=0; i<filter_len; i++){ if(pattern->pattern[i] == filter->filter.pattern[i]){ continue; }else{ return 0; } } break; case SA_EVT_SUFFIX_FILTER: if(pattern_len < filter_len){ return 0; } for(i=0; i<filter_len; i++){ if(filter->filter.pattern[filter_len-1-i] == pattern->pattern[pattern_len-1-i]){ continue; }else{ return 0; } } break; case SA_EVT_PASS_ALL_FILTER: break; default: break; /*error message*/ } return 1;}static int matched(SaEvtEventPatternArrayT *patterns, SaEvtEventFilterArrayT *filters){ SaSizeT patterns_num = patterns->patternsNumber; SaSizeT filters_num = filters->filtersNumber; int i; SaSizeT len = (patterns_num < filters_num) ? patterns_num:filters_num; if(filters_num > patterns_num){ for(i=len; i<filters_num; i++){ if(((filters->filters+i)->filterType == SA_EVT_PASS_ALL_FILTER)|| ((filters->filters+i)->filter.patternSize == 0)){ continue; }else{ return 0; } } } for(i=0; i<len; i++){ if(fliter_match(patterns->patterns+i, filters->filters+i)){ continue; }else{ return 0; } } return 1;}static void search_subscription(gpointer key, gpointer value, gpointer user_data){ struct evt_subscription *subscription; struct evt_event *event; subscription = (struct evt_subscription *)value; event = (struct evt_event *)user_data; if(matched(event->pattern_array, subscription->filters)){ deliver_event_to_local_subscriber(subscription, event); } return;}static void search_ch_instance(gpointer key, gpointer value, gpointer user_data){ struct channel_instance *ch_ins; ch_ins = (struct channel_instance *)value; /*user_data is event to be published*/ g_hash_table_foreach(ch_ins->subscriptions, search_subscription, user_data); return;}static void publish_to_local_subscriber(struct evt_channel *evt_ch, SaEvtEventPatternArrayT *pattern_array, struct evt_event *event){ g_hash_table_foreach(evt_ch->channel_instances, search_ch_instance, event); return;}static SaUint32T get_local_event_id(void){ SaUint32T local_id; get_handle(&event_id_database, &local_id); return local_id;}static SaEvtEventIdT get_event_id(SaUint32T local_event_id){ SaUint32T my_node_id; SaEvtEventIdT event_id;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -