⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 eventd.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
	my_node_id = node_list.mynode;		event_id = my_node_id;	event_id = (event_id << 32);		event_id += local_event_id;	return event_id;}static void broadcast_event_msg_to_cluster(struct client_msg *msg){	struct ha_msg *m;	SaSizeT msg_len, str_len, publisher_len, number, tmp_size, i;	SaEvtEventPatternT *pattern;	void *bin_msg;	SaUint8T *tmp_char;	struct evt_event *event = msg->private.event;	SaTimeT tmp_time;	SaEvtEventIdT tmp_event_id;	if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot broadcast event to cluster");		return;	}	str_len = strlen(msg->channel_name);	publisher_len = event->publisherName.length;	msg_len = 1+sizeof(SaSizeT)+str_len+1+sizeof(SaTimeT)+sizeof(SaSizeT)+				publisher_len+sizeof(SaTimeT)+sizeof(SaSizeT)+				event->pattern_size+sizeof(SaSizeT)+event->data_size+				sizeof(SaEvtEventIdT);	bin_msg = g_malloc(msg_len);	if(bin_msg == NULL){		return;	}	number = event->pattern_array->patternsNumber;	pattern = event->pattern_array->patterns;	tmp_char = (SaUint8T *)bin_msg;	*(tmp_char) = EVT_EVENT_MSG;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, msg->channel_name, str_len);	tmp_char += str_len;	*(tmp_char) = event->priority;	tmp_char++;	hton_64(&(event->retention_time), &tmp_time);	memcpy(tmp_char, &tmp_time, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	tmp_size = htonl(publisher_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event->publisherName.value, publisher_len);	tmp_char += publisher_len;	hton_64(&(event->publish_time), &tmp_time);	memcpy(tmp_char, &tmp_time, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	tmp_size = htonl(event->pattern_size);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	tmp_size = htonl(number);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	for(i=0; i<number; i++){		tmp_size = htonl((pattern+i)->patternSize);		memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);	}	for(i=0; i<number; i++){		memcpy(tmp_char, (pattern+i)->pattern,				(pattern+i)->patternSize);		tmp_char += (pattern+i)->patternSize;	}	tmp_size = htonl(event->data_size);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event->event_data, event->data_size);	tmp_char += event->data_size;	hton_64(&(event->event_id), &tmp_event_id);	memcpy(tmp_char, &tmp_event_id, sizeof(SaEvtEventIdT));	if((ha_msg_addbin(m, BIN_CONTENT, bin_msg, msg_len) == HA_FAIL)||		(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create event");		g_free(bin_msg);		ha_msg_del(m);		return;	}	info->hb->llc_ops->sendclustermsg(info->hb, m);	g_free(bin_msg);	ha_msg_del(m);	return;}static void send_publish_reply_to_client(IPC_Channel *client, 		struct evt_event *event, 		SaErrorT ret_code,		SaEvtEventIdT event_id){	void *msg;	SaSizeT msg_size;	char *tmp_char;			msg_size = 1+sizeof(SaEvtEventHandleT)+				sizeof(SaEvtEventIdT)+sizeof(SaErrorT);	msg = g_malloc(msg_size);	tmp_char = msg;	*(tmp_char) = EVT_PUBLISH_REPLY;	tmp_char++;	memcpy(tmp_char, &(event->clt_event_hd), sizeof(SaEvtEventHandleT));	tmp_char += sizeof(SaEvtEventHandleT);	memcpy(tmp_char, &event_id, sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(tmp_char, &ret_code, sizeof(SaErrorT));		send_to_client(client, msg, msg_size);	return;}static void search_cached_event(gpointer key,					gpointer value, gpointer user_data){	struct evt_subscription *subscription;	struct evt_event *event;	event = (struct evt_event *)value;	subscription = (struct evt_subscription *)user_data;	if(matched(event->pattern_array, subscription->filters)){		deliver_event_to_local_subscriber(subscription, event);	}	return;}static void send_cached_events_to_client(char *channel_name,		struct evt_subscription *subscription,		GHashTable *event_cache){	/*compare the pattern against the filter, deliver event to client if match*/	g_hash_table_foreach(event_cache,			search_cached_event,			subscription);	return;}static SaErrorT append_subscription(SaUint8T *channel_name, 		struct evt_subscription *subscription){	struct evt_channel *evt_ch;	struct channel_instance *ch_instance;		evt_ch = find_channel_by_name(channel_name);	if(evt_ch == NULL){		return SA_ERR_LIBRARY;	}	ch_instance = g_hash_table_lookup(evt_ch->channel_instances, 			subscription->ch_id);	g_hash_table_insert(ch_instance->subscriptions, 			(gpointer)(long)(subscription->subscription_id),			subscription);		return SA_OK;}static void send_retention_clear_reply_to_client(IPC_Channel *client,				char *channel_name,	SaEvtEventIdT event_id, SaErrorT reply){	SaSizeT str_len, msg_size;	void *msg;	char *tmp_char;				str_len = strlen(channel_name);	msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT)+				sizeof(SaErrorT);	msg = g_malloc(msg_size);	tmp_char = (char *)msg;	*(tmp_char) = EVT_CLEAR_RETENTION_TIME_REPLY;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	strncpy(tmp_char, channel_name, str_len);	tmp_char += str_len;	memcpy(tmp_char, &event_id, sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(tmp_char, &reply, sizeof(SaErrorT));	send_to_client(client, msg, msg_size);	return;}static void broadcast_ch_open_req(SaUint8T *channel_name,		struct evt_ch_open_request *ch_open_req){	SaSizeT str_len, msg_len, tmp_size;	SaUint64T tmp_key, key;	SaUint8T *tmp_char;	void *msg;	struct ha_msg *m;	if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot broadcast open_ch request to cluster");		return;	}	str_len = strlen(channel_name);	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T);	msg = g_malloc(msg_len);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_CH_OPEN_REQUEST;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, channel_name, str_len);	tmp_char += str_len;	key = (SaUint64T)(long)ch_open_req;	hton_64(&key, &tmp_key);	memcpy(tmp_char, &tmp_key, sizeof(SaUint64T));		if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) ||			(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create open_ch request message");		g_free(msg);		ha_msg_del(m);		return;	}	info->hb->llc_ops->sendclustermsg(info->hb, m);	g_free(msg);	ha_msg_del(m);	return;}static void broadcast_new_subscription(SaUint8T *channel_name,		struct evt_subscription *subscription){	struct ha_msg *m;	SaSizeT str_len, msg_len, tmp_size, number, i;	void *msg;	SaUint8T *tmp_char;	SaUint64T tmp_64, key;	SaUint32T tmp_32;	SaEvtEventFilterT *filter;	SaEvtEventFilterTypeT tmp_filter_type;	if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot broadcast new subscription to cluster");		return;	}	str_len = strlen(channel_name);	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaUint64T)+sizeof(SaUint32T)+				sizeof(SaSizeT)+subscription->filters_size; 	msg = g_malloc(msg_len);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_NEW_SUBSCRIBE;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, channel_name, str_len);	tmp_char += str_len;	key = (SaUint64T)(long)(subscription->ch_id);	hton_64(&key, &tmp_64);	memcpy(tmp_char, &tmp_64, sizeof(SaUint64T));	tmp_char += sizeof(SaUint64T);	tmp_32 = htonl(subscription->subscription_id);	memcpy(tmp_char, &tmp_32, sizeof(SaUint32T));	tmp_char += sizeof(SaUint32T);	tmp_size = htonl(subscription->filters_size);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	number = subscription->filters->filtersNumber;	tmp_size = htonl(number);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	filter = subscription->filters->filters;	for(i=0; i<number; i++){		tmp_filter_type = htonl(filter[i].filterType);		memcpy(tmp_char, &tmp_filter_type, sizeof(SaEvtEventFilterTypeT));		tmp_char += sizeof(SaEvtEventFilterTypeT);		tmp_size = htonl(filter[i].filter.patternSize);		memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);		memcpy(tmp_char, filter[i].filter.pattern,				filter[i].filter.patternSize);		tmp_char += filter[i].filter.patternSize;	}	if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) ||		(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create open_ch request message");		g_free(msg);		ha_msg_del(m);		return;	}	info->hb->llc_ops->sendclustermsg(info->hb, m);	g_free(msg);	ha_msg_del(m);	return;}struct event_timeout_s{	GHashTable *event_cache;	struct evt_event *event;	guint tag;};static void free_event(struct evt_event *event){	SaEvtEventPatternArrayT *pattern_array;	SaEvtEventPatternT *patterns;	SaSizeT number, i;	pattern_array = event->pattern_array;	patterns = pattern_array->patterns;	number = pattern_array->patternsNumber;		g_free(event->event_data);	for(i=0; i<number; i++){		g_free(patterns[i].pattern);	}	g_free(patterns);	g_free(pattern_array);	put_handle(&event_id_database, event->event_id);	g_free(event);	return;}static void free_remote_event(struct evt_event *event){	SaEvtEventPatternArrayT *pattern_array;	SaEvtEventPatternT *patterns;	SaSizeT number, i;	pattern_array = event->pattern_array;	patterns = pattern_array->patterns;	number = pattern_array->patternsNumber;	/*g_free(event->publisherName);*/	g_free(event->event_data);	for(i=0; i<number; i++){		g_free(patterns[i].pattern);	}	g_free(patterns);	g_free(pattern_array);		g_free(event);	return;}static gboolean timeout_for_retention_time(gpointer user_data){	struct event_timeout_s *event_timeout;	GHashTable *event_cache;	struct evt_event *event;	SaUint32T tmp_32;		event_timeout = (struct event_timeout_s *)user_data;	event_cache = event_timeout->event_cache;	event = event_timeout->event;	tmp_32 = event->event_id;	g_hash_table_remove(event_cache, (gpointer)(long)(tmp_32));	free_event(event);	Gmain_timeout_remove(event_timeout->tag);	g_free(event_timeout);	return TRUE;}static SaErrorT clear_retention_time(char *channel_name,									struct client_msg *msg){	struct evt_channel *evt_ch;	struct evt_event *event;	SaUint32T tmp_32;		evt_ch = g_hash_table_lookup(hash_table_for_channel_name,					channel_name);	tmp_32 = msg->private.retention_clear->event_id;		event = g_hash_table_lookup(evt_ch->event_cache, 				(gpointer)(long)(tmp_32));	if(event == NULL){		return SA_ERR_NOT_EXIST;	}	g_hash_table_remove(evt_ch->event_cache, 				(gpointer)(long)(tmp_32));	/*the event will be released in timeout*/	/*free_event(event);*/	return SA_OK;}struct clear_request{	IPC_Channel *client;	SaEvtEventIdT event_id;};static struct clear_request *append_clear_req(IPC_Channel *client,		SaEvtEventIdT event_id){	struct clear_request *clear_req;	SaUint32T tmp_32;	clear_req = (struct clear_request *)g_malloc(			sizeof(struct clear_request));	clear_req->client = client;	clear_req->event_id = event_id;	tmp_32 = event_id;	g_hash_table_insert(info->evt_pending_clear_requests,			(gpointer)(long)tmp_32, clear_req);	return clear_req;}static void send_clear_to_node(SaUint8T *channel_name,	SaEvtEventIdT event_id, SaInt32T node_id){	struct ha_msg *m;	void *msg;	char *to_id;	SaSizeT msg_len, str_len, tmp_size;	SaUint8T *tmp_char;	SaEvtEventIdT tmp_event_id;		if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot send clear reply to remote");		return;	}	to_id = node_list.nodes[node_id].NodeID;	str_len = strlen(channel_name);	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT);	msg = g_malloc(msg_len);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_RETENTION_CLEAR_REQUEST;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, channel_name, str_len);	tmp_char += str_len;	hton_64(&(event_id), &tmp_event_id);	memcpy(tmp_char, &tmp_event_id, sizeof(SaEvtEventIdT));		if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) || 			(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create clear reply message");		g_free(msg);		ha_msg_del(m);		return;	}	info->hb->llc_ops->sendnodemsg(info->hb, m, to_id);	g_free(msg);	ha_msg_del(m);	return;}static void broadcast_unlink(SaUint8T *channel_name){	struct ha_msg *m;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -