⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 eventd.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
	pattern_array = (SaEvtEventPatternArrayT *)g_malloc(			sizeof(SaEvtEventPatternArrayT));	event->pattern_array = pattern_array;	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	event->priority = *(tmp_char);	tmp_char++;	memcpy(&tmp_time, tmp_char, sizeof(SaTimeT));	ntoh_64(&tmp_time, &(event->retention_time));	tmp_char += sizeof(SaTimeT);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	event->publisherName.length = str_len;	memcpy(event->publisherName.value, tmp_char, str_len);	tmp_char += str_len;	memcpy(&tmp_time, tmp_char, sizeof(SaTimeT));	ntoh_64(&tmp_time, &(event->publish_time));	tmp_char += sizeof(SaTimeT);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	event->pattern_size = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	number = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	pattern_array->patternsNumber = number;	patterns = (SaEvtEventPatternT *)g_malloc(			sizeof(SaEvtEventPatternT)*number);	pattern_array->patterns = patterns;	tmp_char_pattern = tmp_char + number*sizeof(SaSizeT);	for(i=0; i<number; i++){		memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));		patterns[i].patternSize = ntohl(tmp_size);		tmp_char += sizeof(SaSizeT);		patterns[i].pattern = g_malloc(patterns[i].patternSize);		memcpy(patterns[i].pattern, tmp_char_pattern, patterns[i].patternSize);		tmp_char_pattern += patterns[i].patternSize;	}	tmp_char = tmp_char_pattern;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	event->data_size = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	event->event_data = g_malloc(event->data_size);	memcpy(event->event_data, tmp_char, event->data_size);	tmp_char += event->data_size;	memcpy(&tmp_event_id, tmp_char, sizeof(SaEvtEventIdT));	ntoh_64(&tmp_event_id, &(event->event_id));	return;}static void read_unlink(const void *bin_msg, struct client_msg *ret){	const SaUint8T *tmp_char;	SaSizeT str_len;	SaSizeT tmp_size;		tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	return;}static void read_new_subscribe(const void *bin_msg, struct client_msg *ret){	const SaUint8T *tmp_char;	SaSizeT str_len, number, i;	SaSizeT tmp_size;	SaUint64T tmp_64;	struct evt_new_subscription *new_sub;	SaUint32T tmp_32;	SaEvtEventFilterT *filter;	SaEvtEventFilterTypeT tmp_filter_type;		new_sub = (struct evt_new_subscription *)g_malloc(					sizeof(struct evt_new_subscription));	ret->private.new_subscription = new_sub;	new_sub->filters = (SaEvtEventFilterArrayT *)g_malloc(			sizeof(SaEvtEventFilterArrayT));	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	new_sub->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	memcpy(new_sub->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	new_sub->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&tmp_64, tmp_char, sizeof(SaUint64T));	ntoh_64(&tmp_64, &(new_sub->ch_id));	tmp_char += sizeof(SaUint64T);	memcpy(&tmp_32, tmp_char, sizeof(SaUint32T));	new_sub->subscription_id = ntohl(tmp_32);	tmp_char += sizeof(SaUint32T);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	number = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	new_sub->filters->filtersNumber = number;	filter = (SaEvtEventFilterT *)g_malloc(			sizeof(SaEvtEventFilterT)*number);	new_sub->filters->filters = filter;	for(i=0; i<number; i++){		memcpy(&tmp_filter_type, tmp_char, sizeof(SaEvtEventFilterTypeT));		filter[i].filterType = ntohl(tmp_filter_type);		tmp_char += sizeof(SaEvtEventFilterTypeT);		memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));		str_len = ntohl(tmp_size);		tmp_char += sizeof(SaSizeT);		filter[i].filter.patternSize = str_len;		filter[i].filter.pattern = (SaUint8T *)g_malloc(str_len);		memcpy(filter[i].filter.pattern, tmp_char, str_len);		tmp_char += str_len;	}	return;}static void read_new_sub_reply(const void *bin_msg, struct client_msg *ret){	const SaUint8T *tmp_char, *tmp_char_pattern;	SaSizeT str_len, number, i, publisher_len;	SaSizeT tmp_size;	struct evt_new_subscription_reply *new_sub_reply;	SaUint64T tmp_64;	SaUint32T tmp_32;	struct evt_event *event;	SaEvtEventPatternArrayT *pattern_array;	SaEvtEventPatternT *patterns;	SaTimeT tmp_time;	SaEvtEventIdT tmp_evt_id;	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;		new_sub_reply = (struct evt_new_subscription_reply *)g_malloc(		sizeof(struct evt_new_subscription_reply));	if(new_sub_reply == NULL){		return;	}	ret->private.new_sub_reply = new_sub_reply;	event = (struct evt_event *)g_malloc(sizeof(struct evt_event));	new_sub_reply->event = event;	pattern_array = (SaEvtEventPatternArrayT *)g_malloc(			sizeof(SaEvtEventPatternArrayT));	event->pattern_array = pattern_array;	memcpy(&tmp_64, tmp_char, sizeof(SaUint64T));	ntoh_64(&tmp_64, &(new_sub_reply->ch_id));	tmp_char += sizeof(SaUint64T);	memcpy(&tmp_32, tmp_char, sizeof(SaUint32T));	new_sub_reply->subscription_id = ntohl(tmp_32);	tmp_char += sizeof(SaUint32T);	tmp_char += sizeof(SaSizeT);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	number = htonl(tmp_size);	tmp_char += sizeof(SaSizeT);	pattern_array->patternsNumber = number;	patterns = (SaEvtEventPatternT *)g_malloc(			sizeof(SaEvtEventPatternT)*number);	pattern_array->patterns = patterns;	tmp_char_pattern = tmp_char + number*sizeof(SaSizeT);	for(i=0; i<number; i++){/*		patterns[i].patternSize = ntohl(*(tmp_size));*//*		tmp_size++;*/		memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));		patterns[i].patternSize = ntohl(tmp_size);		tmp_char += sizeof(SaSizeT);		patterns[i].pattern = g_malloc(patterns[i].patternSize);		memcpy(patterns[i].pattern, tmp_char_pattern, patterns[i].patternSize);		tmp_char_pattern += patterns[i].patternSize;	}	tmp_char = tmp_char_pattern;	event->priority = *(tmp_char);	tmp_char++;	memcpy(&tmp_time, tmp_char, sizeof(SaTimeT));	ntoh_64(&tmp_time, &(event->retention_time));	tmp_char += sizeof(SaTimeT);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	publisher_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	event->publisherName.length = publisher_len;	memcpy(event->publisherName.value, tmp_char, publisher_len);	tmp_char += publisher_len;	memcpy(&tmp_time, tmp_char, sizeof(SaTimeT));	ntoh_64(&tmp_time, &(event->publish_time));	tmp_char += sizeof(SaTimeT);	memcpy(&tmp_evt_id, tmp_char, sizeof(SaEvtEventIdT));	ntoh_64(&tmp_evt_id, &(event->event_id));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	event->data_size = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	memcpy(event->event_data, tmp_char, event->data_size);	return;}static void read_clear_req(const void *bin_msg, struct client_msg *ret){	const SaUint8T *tmp_char;	SaSizeT tmp_size;	SaSizeT str_len;	struct evt_retention_clear *clear_req;	SaEvtEventIdT tmp_event_id;	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	clear_req = (struct evt_retention_clear *)g_malloc(			sizeof(struct evt_retention_clear));	ret->private.retention_clear = clear_req;	memcpy(&tmp_event_id, tmp_char, sizeof(SaEvtEventIdT));	ntoh_64(&tmp_event_id, &(clear_req->event_id));	return;}static void read_clear_reply_remote(const void *bin_msg,								struct client_msg *ret){	const SaUint8T *tmp_char;	SaSizeT tmp_size;	SaSizeT str_len;	struct evt_retention_clear_reply *clear_reply_remote;	SaEvtEventIdT tmp_event_id;	SaErrorT tmp_err;	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	clear_reply_remote = (struct evt_retention_clear_reply *)g_malloc(						sizeof(struct evt_retention_clear_reply));	ret->private.retention_clear_reply = clear_reply_remote;	memcpy(&tmp_event_id, tmp_char, sizeof(SaEvtEventIdT));	ntoh_64(&tmp_event_id, &(clear_reply_remote->event_id));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(&tmp_err, tmp_char, sizeof(SaErrorT));	clear_reply_remote->ret_code = ntohl(tmp_err);	return;}static struct client_msg *evt_read_hb_msg(struct ha_msg *m){		size_t			msg_len;	const void*		bin_msg;	const SaUint8T*		tmp_char;	struct client_msg*	ret;		ret = (struct client_msg *)g_malloc0(sizeof(struct client_msg));	if(ret == NULL){		return NULL;	}	bin_msg = cl_get_binary(m, BIN_CONTENT, &msg_len);	tmp_char = (const SaUint8T *)bin_msg;	ret->msg_type = *(tmp_char);		switch(*(tmp_char)){		case EVT_CH_OPEN_REQUEST:			read_ch_open_request_remote(bin_msg, ret);			break;		case EVT_CH_OPEN_REPLY:			read_ch_open_reply(bin_msg, ret);			break;		case EVT_EVENT_MSG:			read_event_msg(bin_msg, ret);			break;		case EVT_CHANNEL_UNLINK_NOTIFY:			read_unlink(bin_msg, ret);			break;		case EVT_NEW_SUBSCRIBE:			read_new_subscribe(bin_msg, ret);			break;		case EVT_NEW_SUBSCRIBE_REPLY:			read_new_sub_reply(bin_msg, ret);			break;		case EVT_RETENTION_CLEAR_REQUEST:			read_clear_req(bin_msg, ret);			break;		case EVT_RETENTION_CLEAR_REPLY:			read_clear_reply_remote(bin_msg, ret);			break;	}	return ret;}static void evt_send_open_reply(const char *orig,					char *channel_name, SaUint64T key){	SaSizeT str_len, msg_len, tmp_size;	SaUint64T tmp_key;	SaUint8T *tmp_char;	void *msg;	struct ha_msg *m;	if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot send open_ch reply to remote");		return;	}	str_len = strlen(channel_name);	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T);	msg = g_malloc(msg_len);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_CH_OPEN_REPLY;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, channel_name, str_len);	tmp_char += str_len;	hton_64(&key, &tmp_key);	memcpy(tmp_char, &tmp_key, sizeof(SaUint64T));	if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) ||			(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create open_ch reply to remote message");		g_free(msg);		ha_msg_del(m);		return;	}	info->hb->llc_ops->sendnodemsg(info->hb, m, orig);	g_free(msg);	ha_msg_del(m);	return;}static void deliver_event_to_remote_subscriber(		struct evt_new_subscription *new_sub, struct evt_event *event){	struct ha_msg *m;	SaSizeT str_len, publisher_len, msg_len, tmp_size, number, i;	void *msg;	SaUint8T *tmp_char;	SaUint64T tmp_64;	SaUint32T tmp_32;	SaEvtEventPatternT *pattern;	SaTimeT tmp_time;	SaEvtEventIdT tmp_evt_id;	if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot deliver event to remote");		return;	}	str_len = strlen(new_sub->channel_name);	publisher_len = event->publisherName.length;	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T)+sizeof(SaUint32T)+				sizeof(SaSizeT)+event->pattern_size+1+sizeof(SaTimeT)+				sizeof(SaSizeT)+publisher_len+sizeof(SaTimeT)+				sizeof(SaEvtEventIdT)+sizeof(SaSizeT)+event->data_size;	msg = g_malloc(msg_len);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_NEW_SUBSCRIBE_REPLY;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, new_sub->channel_name, str_len);	tmp_char += str_len;	hton_64(&(new_sub->ch_id), &tmp_64);	memcpy(tmp_char, &tmp_64, sizeof(SaUint64T));	tmp_char += sizeof(SaUint64T);	tmp_32 = htonl(new_sub->subscription_id);	memcpy(tmp_char, &tmp_32, sizeof(SaUint32T));	tmp_char += sizeof(SaUint32T);	number = event->pattern_array->patternsNumber;	pattern = event->pattern_array->patterns;	tmp_size = htonl(event->pattern_size);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	tmp_size = htonl(number);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);		for(i=0; i<number; i++){		tmp_size = htonl((pattern+i)->patternSize);		memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);	}	for(i=0; i<number; i++){		memcpy(tmp_char, (pattern+i)->pattern, 				(pattern+i)->patternSize);		tmp_char += (pattern+i)->patternSize;	}	*(tmp_char) = event->priority;	tmp_char++;	hton_64(&(event->retention_time), &tmp_time);	memcpy(tmp_char, &tmp_time, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	tmp_size = htonl(publisher_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event->publisherName.value, publisher_len);	tmp_char += publisher_len;	hton_64(&(event->publish_time), &tmp_time);	memcpy(tmp_char, &tmp_time, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	hton_64(&(event->event_id), &tmp_evt_id);	memcpy(tmp_char, &tmp_evt_id, sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	tmp_size = htonl(event->data_size);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event->event_data, event->data_size);	if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) ||		(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create event to remote");		g_free(msg);		ha_msg_del(m);		return;	}	info->h

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -