⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 eventd.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
	void *msg;	SaSizeT str_len, msg_len, tmp_size;	SaUint8T *tmp_char;		if ((m=ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "Cannot broadcast unlink msg to cluster");		return;	}	str_len = strlen(channel_name);	msg_len = 1+sizeof(SaSizeT)+str_len;	msg = g_malloc(msg_len);	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_CHANNEL_UNLINK_NOTIFY;	tmp_char++;	tmp_size = htonl(str_len);	memcpy(tmp_char, &tmp_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, channel_name, str_len);	if((ha_msg_addbin(m, BIN_CONTENT, msg, msg_len) == HA_FAIL) ||			(ha_msg_add(m, F_TYPE, EVT_SERVICE) == HA_FAIL)){		cl_log(LOG_ERR, "Cannot create unlink msg");		g_free(msg);		ha_msg_del(m);		return;	}	info->hb->llc_ops->sendclustermsg(info->hb, m);	g_free(msg);	ha_msg_del(m);	return;}static int handle_msg_from_client(IPC_Channel *client, gpointer user_data){	struct client_msg *msg;	char *channel_name = NULL;	struct evt_channel *evt_ch;	struct channel_instance *ch_ins;	struct ipc *evt_ipc;	struct evt_ch_open_request *ch_open_req;	struct evt_event *event;	SaEvtEventPatternArrayT *pattern_array;	SaErrorT reply;	SaEvtEventIdT event_id;	unsigned int node_id;	struct clear_request *clear_req;	int str_len;	struct event_timeout_s *event_timeout;	SaUint32T local_event_id;		msg = evt_read_client_msg(client);	if(msg == NULL){		printf("received NULL msg\n");		return 0;	}	printf("msg_type == %d\n", (char)msg->msg_type);	channel_name = msg->channel_name;		switch(msg->msg_type){		case EVT_INITIALIZE:			evt_ipc = (struct ipc *)g_malloc(sizeof(struct ipc));			evt_ipc->client = client;			evt_ipc->channel_instances = g_hash_table_new(					g_direct_hash,					g_direct_equal);			g_hash_table_insert(hash_table_for_ipc,						(gpointer)client,						(gpointer)evt_ipc);			printf("after handle initialize!!\n");							break;		case EVT_FINALIZE:			evt_ipc = (struct ipc *)g_hash_table_lookup(					hash_table_for_ipc,					(gpointer)client);			if(evt_ipc == NULL){				break;			}			g_hash_table_remove(hash_table_for_ipc,					(gpointer)client);			printf("the hash tabe size == %d\n", g_hash_table_size(evt_ipc->channel_instances));			if(evt_ipc->channel_instances != NULL){			g_hash_table_foreach(evt_ipc->channel_instances, 					free_ch_instance,					evt_ipc->channel_instances);			g_hash_table_destroy(evt_ipc->channel_instances);			}			g_free(evt_ipc);			break;		case EVT_OPEN_EVENT_CHANNEL:			evt_ch = find_channel_by_name(channel_name);			if((evt_ch != NULL)&&(evt_ch->unlink = FALSE)){				ch_ins = (struct channel_instance *)					g_malloc0(sizeof(struct channel_instance));								str_len = strlen(channel_name);				ch_ins->ch_name = (char *)g_malloc(str_len+1);				memcpy(ch_ins->ch_name, channel_name, str_len);				ch_ins->ch_name[str_len] = '\0';				ch_ins->clt_ch_handle = msg->private.ch_open->clt_ch_handle;				ch_ins->subscriptions = g_hash_table_new(g_direct_hash, g_direct_equal);				g_hash_table_insert(evt_ch->channel_instances,						(gpointer)ch_ins,						(gpointer)ch_ins);				evt_ipc = find_ipc(client);				g_hash_table_insert(evt_ipc->channel_instances,						(gpointer)ch_ins,						(gpointer)ch_ins);								/*sleep(1);*/				send_open_channel_reply(client, 					msg->private.ch_open, 					ch_ins,					SA_OK);			}else if((evt_ch != NULL)&&(evt_ch->unlink = TRUE)){				if((msg->private.ch_open->ch_open_flags & SA_EVT_CHANNEL_CREATE)					== SA_EVT_CHANNEL_CREATE){					evt_ch->unlink = FALSE;					/*be the same as the above brach*/					ch_ins = (struct channel_instance *)						g_malloc0(sizeof(struct channel_instance));					ch_ins->clt_ch_handle = msg->private.ch_open->clt_ch_handle;					str_len = strlen(channel_name);					ch_ins->ch_name = (char *)g_malloc(str_len+1);					memcpy(ch_ins->ch_name, channel_name, str_len);					ch_ins->ch_name[str_len] = '\0';					ch_ins->subscriptions = g_hash_table_new(g_direct_hash,												g_direct_equal);					g_hash_table_insert(evt_ch->channel_instances,						(gpointer)ch_ins,						(gpointer)ch_ins);					evt_ipc = find_ipc(client);					g_hash_table_insert(evt_ipc->channel_instances,						(gpointer)ch_ins,						(gpointer)ch_ins);					send_open_channel_reply(client, 							msg->private.ch_open, 							ch_ins,							SA_OK);				}else{					ch_open_req = add_pending_ch_open_request(							client, channel_name, msg);					broadcast_ch_open_req(channel_name,							ch_open_req);				}			}else if((evt_ch == NULL) && ((msg->private.ch_open->ch_open_flags						 & SA_EVT_CHANNEL_CREATE) == SA_EVT_CHANNEL_CREATE)){				evt_ch = (struct evt_channel *)g_malloc0(								sizeof(struct evt_channel));				str_len = strlen(channel_name);				evt_ch->channel_name = (char *)g_malloc(str_len+1);				memcpy(evt_ch->channel_name, channel_name, str_len);				evt_ch->channel_name[str_len] = '\0';				evt_ch->channel_instances = g_hash_table_new(g_direct_hash,												g_direct_equal);				evt_ch->event_cache = g_hash_table_new(g_direct_hash,											g_direct_equal);				evt_ch->use_count = 1;				g_hash_table_insert(hash_table_for_channel_name,						(gpointer)evt_ch->channel_name,						(gpointer)evt_ch);				ch_ins = (struct channel_instance *)						g_malloc0(sizeof(struct channel_instance));				ch_ins->ch_name = (char *)g_malloc(str_len+1);				memcpy(ch_ins->ch_name, channel_name, str_len);				ch_ins->ch_name[str_len] = '\0';				ch_ins->clt_ch_handle = msg->private.ch_open->clt_ch_handle;				ch_ins->subscriptions = g_hash_table_new(g_direct_hash,											g_direct_equal);				g_hash_table_insert(evt_ch->channel_instances,					(gpointer)ch_ins,					(gpointer)ch_ins);				evt_ipc = find_ipc(client);				g_hash_table_insert(evt_ipc->channel_instances,						(gpointer)ch_ins,						(gpointer)ch_ins);								/*sleep(1);*/				send_open_channel_reply(client, 					msg->private.ch_open, 					ch_ins,					SA_OK);			}else{				ch_open_req = add_pending_ch_open_request(client, channel_name,					       	msg);				broadcast_ch_open_req(channel_name,						ch_open_req);			}			if(msg->private.ch_open->channel_name != NULL){				g_free(msg->private.ch_open->channel_name);			}			g_free(msg->private.ch_open);			break;		case EVT_CLOSE_EVENT_CHANNEL:			evt_ch = find_channel_by_name(channel_name);			if(evt_ch == NULL){				/*error msg*/				return 1;			}			ch_ins = g_hash_table_lookup(evt_ch->channel_instances,				msg->private.ch_close->ch_ins);			if(ch_ins == NULL){				/*error msg*/				return 1;			}			if(ch_ins->subscriptions != NULL){				g_hash_table_foreach(ch_ins->subscriptions,						free_subscriptions,						ch_ins->subscriptions);				g_hash_table_destroy(ch_ins->subscriptions);			}			g_hash_table_remove(evt_ch->channel_instances,						msg->private.ch_close->ch_ins);			evt_ipc = (struct ipc *)g_hash_table_lookup(						hash_table_for_ipc,						(gpointer)client);			g_hash_table_remove(evt_ipc->channel_instances,					(gpointer)msg->private.ch_close->ch_ins);			g_free(ch_ins);			g_free(msg->private.ch_close);			break;		case EVT_PUBLISH:			/*1 forward the event to local subscriber*/			/*2 broadcast the event to cluster*/			event = msg->private.event;			evt_ch = find_channel_by_name(channel_name);			if(evt_ch == NULL){				send_publish_reply_to_client(client, 						event, 						SA_ERR_LIBRARY, 0);				return 1;			}			local_event_id = get_local_event_id();			event_id = get_event_id(local_event_id);			printf("the event_id == %Ld\n", event_id);			send_publish_reply_to_client(client, event,					SA_OK, event_id);			/*sleep(1);*/			event->event_id = event_id;			pattern_array = msg->private.event->pattern_array;			publish_to_local_subscriber(evt_ch,					pattern_array, event);									broadcast_event_msg_to_cluster(msg);			if(event->retention_time != 0){				evt_ch = g_hash_table_lookup(						hash_table_for_channel_name,						channel_name);				g_hash_table_insert(evt_ch->event_cache,						(gpointer)local_event_id,						(gpointer)event);								/*TODO: should start timer in order to remove event when timeout*/				event_timeout = (struct event_timeout_s *)g_malloc(									sizeof(struct event_timeout_s));				event_timeout->event_cache = evt_ch->event_cache;				event_timeout->event = event;				event_timeout->tag = Gmain_timeout_add_full(G_PRIORITY_HIGH,										event->retention_time,										timeout_for_retention_time,										event_timeout, NULL);											}else{				/* free event*/				free_event(event);			}			break;		case EVT_SUBSCRIBE:						/*1 record the subscription*/			/*2 send the events within the retention time to client			*/						append_subscription(channel_name, 					msg->private.subscription);			evt_ch = find_channel_by_name(channel_name);			if(evt_ch->event_cache != NULL){				/*sleep(1);*/				send_cached_events_to_client(channel_name,						msg->private.subscription,						evt_ch->event_cache);			}						broadcast_new_subscription(channel_name,							msg->private.subscription);			break;		case EVT_UNSUBSCRIBE:			evt_ch = find_channel_by_name(channel_name);			if(evt_ch == NULL){				cl_log(LOG_ERR, "SA_ERR_LIBRARY");				g_free(msg->private.subscription);				break;			}			ch_ins = g_hash_table_lookup(evt_ch->channel_instances,						msg->private.subscription->ch_id);			if(ch_ins == NULL){				cl_log(LOG_ERR, "SA_ERR_LIBRARY");				g_free(msg->private.subscription);				break;			}			g_hash_table_remove(ch_ins->subscriptions,				(gpointer)(long)(msg->private.subscription->subscription_id));			g_free(msg->private.subscription);			break;		case EVT_CLEAR_RETENTION_TIME: 			event_id = msg->private.retention_clear->event_id;			node_id =  (event_id >> 32);			if(node_id == node_list.mynode){				reply = clear_retention_time(channel_name,						msg);				send_retention_clear_reply_to_client(client,					channel_name,					msg->private.retention_clear->event_id,					reply);			}else{								send_clear_to_node(channel_name,						event_id, node_id);				clear_req = append_clear_req(client, event_id);				/*TODO: timeout function associated with clear*/			}			g_free(msg->private.retention_clear);			break;		case EVT_CHANNEL_UNLINK:			evt_ch = find_channel_by_name(channel_name);			if(evt_ch != NULL){				evt_ch->unlink = TRUE;			}			broadcast_unlink(channel_name);						break;		default:			break;	}	if(channel_name != NULL){		g_free(channel_name);	}	g_free(msg);	return 0;}void hton_64(const SaUint64T *src_64, SaUint64T *dst_64){	SaUint32T high_value, low_value, *tmp_32;	const SaUint32T *tmp_32_const;	if(byte_order() == 0){		tmp_32_const = (const SaUint32T *)src_64;		high_value = *tmp_32_const;		tmp_32_const++;		low_value = *tmp_32_const;		tmp_32 = (SaUint32T *)dst_64;			*(tmp_32) = htonl(low_value);		tmp_32++;		*(tmp_32) = htonl(high_value);	}	return;}void ntoh_64(const SaUint64T *src_64, SaUint64T *dst_64){	SaUint32T high_value, low_value, *tmp_32;	const SaUint32T *tmp_32_const;	if(byte_order() == 0){		tmp_32_const = (const SaUint32T *)src_64;		low_value = ntohl(*(tmp_32_const));		tmp_32_const++;		high_value = ntohl(*(tmp_32_const));		tmp_32 = (SaUint32T *)dst_64;		*tmp_32 = high_value;		tmp_32++;		*tmp_32 = low_value;			}	return;}static void read_ch_open_reply(const void *bin_msg, struct client_msg *ret){	struct evt_ch_open_reply_remote *ch_open_reply;	const SaUint8T *tmp_char;	SaSizeT str_len;	SaSizeT tmp_size;	SaUint64T tmp_key;	ch_open_reply = (struct evt_ch_open_reply_remote *)g_malloc0(			sizeof(struct evt_ch_open_reply_remote));	ret->private.ch_open_reply_remote = ch_open_reply;	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	ch_open_reply->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	memcpy(ch_open_reply->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	ch_open_reply->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&tmp_key, tmp_char, sizeof(SaUint64T));	ntoh_64(&tmp_key, &(ch_open_reply->key));	return;}static void read_ch_open_request_remote(const void *bin_msg,							struct client_msg *ret){	struct evt_ch_open_request_remote *ch_open_request;	const SaUint8T *tmp_char;	SaSizeT str_len;	SaSizeT tmp_size;	SaUint64T tmp_key;		ch_open_request = (struct evt_ch_open_request_remote *)g_malloc0(			sizeof(struct evt_ch_open_request_remote));	ret->private.ch_open_request_remote = ch_open_request;	tmp_char = (const SaUint8T *)bin_msg;	tmp_char++;	memcpy(&tmp_size, tmp_char, sizeof(SaSizeT));	str_len = ntohl(tmp_size);	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	ch_open_request->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	memcpy(ch_open_request->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	ch_open_request->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&tmp_key, tmp_char, sizeof(SaUint64T));	ntoh_64(&tmp_key, &(ch_open_request->key));	return;	}static void read_event_msg(const void *bin_msg, struct client_msg *ret){	struct evt_event *event;	SaEvtEventPatternArrayT *pattern_array;	const SaUint8T *tmp_char, *tmp_char_pattern;	SaSizeT str_len, number, i;	SaSizeT tmp_size;	SaEvtEventPatternT *patterns;	SaEvtEventIdT tmp_event_id;	SaTimeT tmp_time;	event = (struct evt_event *)g_malloc(sizeof(struct evt_event));	ret->private.event = event;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -