📄 eventd.c
字号:
pattern_array = (SaEvtEventPatternArrayT *)g_malloc( sizeof(SaEvtEventPatternArrayT)); event->pattern_array = pattern_array; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; event->priority = *(tmp_char); tmp_char++; memcpy(&tmp_time, tmp_char, sizeof(SaTimeT)); ntoh_64(&tmp_time, &(event->retention_time)); tmp_char += sizeof(SaTimeT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); event->publisherName.length = str_len; memcpy(event->publisherName.value, tmp_char, str_len); tmp_char += str_len; memcpy(&tmp_time, tmp_char, sizeof(SaTimeT)); ntoh_64(&tmp_time, &(event->publish_time)); tmp_char += sizeof(SaTimeT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); event->pattern_size = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); number = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); pattern_array->patternsNumber = number; patterns = (SaEvtEventPatternT *)g_malloc( sizeof(SaEvtEventPatternT)*number); pattern_array->patterns = patterns; tmp_char_pattern = tmp_char + number*sizeof(SaSizeT); for(i=0; i<number; i++){ memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); patterns[i].patternSize = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); patterns[i].pattern = g_malloc(patterns[i].patternSize); memcpy(patterns[i].pattern, tmp_char_pattern, patterns[i].patternSize); tmp_char_pattern += patterns[i].patternSize; } tmp_char = tmp_char_pattern; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); event->data_size = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); event->event_data = g_malloc(event->data_size); memcpy(event->event_data, tmp_char, event->data_size); tmp_char += event->data_size; memcpy(&tmp_event_id, tmp_char, sizeof(SaEvtEventIdT)); ntoh_64(&tmp_event_id, &(event->event_id)); return;}static void read_unlink(const void *bin_msg, struct client_msg *ret){ const SaUint8T *tmp_char; SaSizeT str_len; SaSizeT tmp_size; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; return;}static void read_new_subscribe(const void *bin_msg, struct client_msg *ret){ const SaUint8T *tmp_char; SaSizeT str_len, number, i; SaSizeT tmp_size; SaUint64T tmp_64; struct evt_new_subscription *new_sub; SaUint32T tmp_32; SaEvtEventFilterT *filter; SaEvtEventFilterTypeT tmp_filter_type; new_sub = (struct evt_new_subscription *)g_malloc( sizeof(struct evt_new_subscription)); ret->private.new_subscription = new_sub; new_sub->filters = (SaEvtEventFilterArrayT *)g_malloc( sizeof(SaEvtEventFilterArrayT)); tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); new_sub->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); memcpy(new_sub->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; new_sub->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&tmp_64, tmp_char, sizeof(SaUint64T)); ntoh_64(&tmp_64, &(new_sub->ch_id)); tmp_char += sizeof(SaUint64T); memcpy(&tmp_32, tmp_char, sizeof(SaUint32T)); new_sub->subscription_id = ntohl(tmp_32); tmp_char += sizeof(SaUint32T); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); number = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); new_sub->filters->filtersNumber = number; filter = (SaEvtEventFilterT *)g_malloc( sizeof(SaEvtEventFilterT)*number); new_sub->filters->filters = filter; for(i=0; i<number; i++){ memcpy(&tmp_filter_type, tmp_char, sizeof(SaEvtEventFilterTypeT)); filter[i].filterType = ntohl(tmp_filter_type); tmp_char += sizeof(SaEvtEventFilterTypeT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); filter[i].filter.patternSize = str_len; filter[i].filter.pattern = (SaUint8T *)g_malloc(str_len); memcpy(filter[i].filter.pattern, tmp_char, str_len); tmp_char += str_len; } return;}static void read_new_sub_reply(const void *bin_msg, struct client_msg *ret){ const SaUint8T *tmp_char, *tmp_char_pattern; SaSizeT str_len, number, i, publisher_len; SaSizeT tmp_size; struct evt_new_subscription_reply *new_sub_reply; SaUint64T tmp_64; SaUint32T tmp_32; struct evt_event *event; SaEvtEventPatternArrayT *pattern_array; SaEvtEventPatternT *patterns; SaTimeT tmp_time; SaEvtEventIdT tmp_evt_id; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; new_sub_reply = (struct evt_new_subscription_reply *)g_malloc( sizeof(struct evt_new_subscription_reply)); if(new_sub_reply == NULL){ return; } ret->private.new_sub_reply = new_sub_reply; event = (struct evt_event *)g_malloc(sizeof(struct evt_event)); new_sub_reply->event = event; pattern_array = (SaEvtEventPatternArrayT *)g_malloc( sizeof(SaEvtEventPatternArrayT)); event->pattern_array = pattern_array; memcpy(&tmp_64, tmp_char, sizeof(SaUint64T)); ntoh_64(&tmp_64, &(new_sub_reply->ch_id)); tmp_char += sizeof(SaUint64T); memcpy(&tmp_32, tmp_char, sizeof(SaUint32T)); new_sub_reply->subscription_id = ntohl(tmp_32); tmp_char += sizeof(SaUint32T); tmp_char += sizeof(SaSizeT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); number = htonl(tmp_size); tmp_char += sizeof(SaSizeT); pattern_array->patternsNumber = number; patterns = (SaEvtEventPatternT *)g_malloc( sizeof(SaEvtEventPatternT)*number); pattern_array->patterns = patterns; tmp_char_pattern = tmp_char + number*sizeof(SaSizeT); for(i=0; i<number; i++){/* patterns[i].patternSize = ntohl(*(tmp_size));*//* tmp_size++;*/ memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); patterns[i].patternSize = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); patterns[i].pattern = g_malloc(patterns[i].patternSize); memcpy(patterns[i].pattern, tmp_char_pattern, patterns[i].patternSize); tmp_char_pattern += patterns[i].patternSize; } tmp_char = tmp_char_pattern; event->priority = *(tmp_char); tmp_char++; memcpy(&tmp_time, tmp_char, sizeof(SaTimeT)); ntoh_64(&tmp_time, &(event->retention_time)); tmp_char += sizeof(SaTimeT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); publisher_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); event->publisherName.length = publisher_len; memcpy(event->publisherName.value, tmp_char, publisher_len); tmp_char += publisher_len; memcpy(&tmp_time, tmp_char, sizeof(SaTimeT)); ntoh_64(&tmp_time, &(event->publish_time)); tmp_char += sizeof(SaTimeT); memcpy(&tmp_evt_id, tmp_char, sizeof(SaEvtEventIdT)); ntoh_64(&tmp_evt_id, &(event->event_id)); tmp_char += sizeof(SaEvtEventIdT); memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); event->data_size = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); memcpy(event->event_data, tmp_char, event->data_size); return;}static void read_clear_req(const void *bin_msg, struct client_msg *ret){ const SaUint8T *tmp_char; SaSizeT tmp_size; SaSizeT str_len; struct evt_retention_clear *clear_req; SaEvtEventIdT tmp_event_id; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; clear_req = (struct evt_retention_clear *)g_malloc( sizeof(struct evt_retention_clear)); ret->private.retention_clear = clear_req; memcpy(&tmp_event_id, tmp_char, sizeof(SaEvtEventIdT)); ntoh_64(&tmp_event_id, &(clear_req->event_id)); return;}static void read_clear_reply_remote(const void *bin_msg, struct client_msg *ret){ const SaUint8T *tmp_char; SaSizeT tmp_size; SaSizeT str_len; struct evt_retention_clear_reply *clear_reply_remote; SaEvtEventIdT tmp_event_id; SaErrorT tmp_err; tmp_char = (const SaUint8T *)bin_msg; tmp_char++; memcpy(&tmp_size, tmp_char, sizeof(SaSizeT)); str_len = ntohl(tmp_size); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; clear_reply_remote = (struct evt_retention_clear_reply *)g_malloc( sizeof(struct evt_retention_clear_reply)); ret->private.retention_clear_reply = clear_reply_remote; memcpy(&tmp_event_id, tmp_char, sizeof(SaEvtEventIdT)); ntoh_64(&tmp_event_id, &(clear_reply_remote->event_id)); tmp_char += sizeof(SaEvtEventIdT); memcpy(&tmp_err, tmp_char, sizeof(SaErrorT)); clear_reply_remote->ret_code = ntohl(tmp_err); return;}static struct client_msg *evt_read_hb_msg(struct ha_msg *m){ size_t msg_len; const void* bin_msg; const SaUint8T* tmp_char; struct client_msg* ret; ret = (struct client_msg *)g_malloc0(sizeof(struct client_msg)); if(ret == NULL){ return NULL; } bin_msg = cl_get_binary(m, BIN_CONTENT, &msg_len); tmp_char = (const SaUint8T *)bin_msg; ret->msg_type = *(tmp_char); switch(*(tmp_char)){ case EVT_CH_OPEN_REQUEST: read_ch_open_request_remote(bin_msg, ret); break; case EVT_CH_OPEN_REPLY: read_ch_open_reply(bin_msg, ret); break; case EVT_EVENT_MSG: read_event_msg(bin_msg, ret); break; case EVT_CHANNEL_UNLINK_NOTIFY: read_unlink(bin_msg, ret); break; case EVT_NEW_SUBSCRIBE: read_new_subscribe(bin_msg, ret); break; case EVT_NEW_SUBSCRIBE_REPLY: read_new_sub_reply(bin_msg, ret); break; case EVT_RETENTION_CLEAR_REQUEST: read_clear_req(bin_msg, ret); break; case EVT_RETENTION_CLEAR_REPLY: read_clear_reply_remote(bin_msg, ret); break; } return ret;}static void evt_send_open_reply(const char *orig, char *channel_name, SaUint64T key){ SaSizeT str_len, msg_len, tmp_size; SaUint64T tmp_key; SaUint8T *tmp_char; void *msg; struct ha_msg *m; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot send open_ch reply to remote"); return; } str_len = strlen(channel_name); msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T); msg = g_malloc(msg_len); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_CH_OPEN_REPLY; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, channel_name, str_len); tmp_char += str_len; hton_64(&key, &tmp_key); memcpy(tmp_char, &tmp_key, sizeof(SaUint64T)); if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create open_ch reply to remote message"); g_free(msg); ha_msg_del(m); return; } info->hb->llc_ops->sendnodemsg(info->hb, m, orig); g_free(msg); ha_msg_del(m); return;}static void deliver_event_to_remote_subscriber( struct evt_new_subscription *new_sub, struct evt_event *event){ struct ha_msg *m; SaSizeT str_len, publisher_len, msg_len, tmp_size, number, i; void *msg; SaUint8T *tmp_char; SaUint64T tmp_64; SaUint32T tmp_32; SaEvtEventPatternT *pattern; SaTimeT tmp_time; SaEvtEventIdT tmp_evt_id; if ((m=ha_msg_new(0)) == NULL) { cl_log(LOG_ERR, "Cannot deliver event to remote"); return; } str_len = strlen(new_sub->channel_name); publisher_len = event->publisherName.length; msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T)+sizeof(SaUint32T)+ sizeof(SaSizeT)+event->pattern_size+1+sizeof(SaTimeT)+ sizeof(SaSizeT)+publisher_len+sizeof(SaTimeT)+ sizeof(SaEvtEventIdT)+sizeof(SaSizeT)+event->data_size; msg = g_malloc(msg_len); tmp_char = (SaUint8T *)msg; *(tmp_char) = EVT_NEW_SUBSCRIBE_REPLY; tmp_char++; tmp_size = htonl(str_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, new_sub->channel_name, str_len); tmp_char += str_len; hton_64(&(new_sub->ch_id), &tmp_64); memcpy(tmp_char, &tmp_64, sizeof(SaUint64T)); tmp_char += sizeof(SaUint64T); tmp_32 = htonl(new_sub->subscription_id); memcpy(tmp_char, &tmp_32, sizeof(SaUint32T)); tmp_char += sizeof(SaUint32T); number = event->pattern_array->patternsNumber; pattern = event->pattern_array->patterns; tmp_size = htonl(event->pattern_size); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); tmp_size = htonl(number); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); for(i=0; i<number; i++){ tmp_size = htonl((pattern+i)->patternSize); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); } for(i=0; i<number; i++){ memcpy(tmp_char, (pattern+i)->pattern, (pattern+i)->patternSize); tmp_char += (pattern+i)->patternSize; } *(tmp_char) = event->priority; tmp_char++; hton_64(&(event->retention_time), &tmp_time); memcpy(tmp_char, &tmp_time, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); tmp_size = htonl(publisher_len); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event->publisherName.value, publisher_len); tmp_char += publisher_len; hton_64(&(event->publish_time), &tmp_time); memcpy(tmp_char, &tmp_time, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); hton_64(&(event->event_id), &tmp_evt_id); memcpy(tmp_char, &tmp_evt_id, sizeof(SaEvtEventIdT)); tmp_char += sizeof(SaEvtEventIdT); tmp_size = htonl(event->data_size); memcpy(tmp_char, &tmp_size, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(tmp_char, event->event_data, event->data_size); if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || (ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){ cl_log(LOG_ERR, "Cannot create event to remote"); g_free(msg); ha_msg_del(m); return; } info->h
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -