⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 eventd.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
	SaUint8T *tmp_char;	SaSizeT str_len, number, i;			SaEvtEventFilterT *filter;		subscription = (struct evt_subscription *)g_malloc(			sizeof(struct evt_subscription));	subscription->filters = (SaEvtEventFilterArrayT *)g_malloc(			sizeof(SaEvtEventFilterArrayT));	subscription->client = ch;	ret->private.subscription = subscription;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&(subscription->ch_id), tmp_char, sizeof(void *));	tmp_char += sizeof(void *);	memcpy(&(subscription->subscription_id), tmp_char, sizeof(SaEvtSubscriptionIdT));	tmp_char += sizeof(SaEvtSubscriptionIdT);	memcpy(&(subscription->filters_size), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(&number, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	subscription->filters->filtersNumber = number;		filter = (SaEvtEventFilterT *)g_malloc(			sizeof(SaEvtEventFilterT)*number);	subscription->filters->filters = filter;	for(i=0; i<number; i++){		memcpy(&(filter[i].filterType), tmp_char, sizeof(SaEvtEventFilterTypeT));		tmp_char += sizeof(SaEvtEventFilterTypeT);		memcpy(&str_len, tmp_char, sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);		filter[i].filter.patternSize = str_len;		filter[i].filter.pattern = (SaUint8T *)g_malloc(str_len);		memcpy(filter[i].filter.pattern, tmp_char, str_len);		tmp_char += str_len;	}	memcpy(&(subscription->clt_ch_handle), tmp_char, sizeof(SaEvtChannelHandleT));	return;	}static void read_unlink_client(void *msg, struct client_msg *ret){	SaUint8T *tmp_char;	SaSizeT str_len;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	return;}static struct client_msg *evt_read_client_msg(IPC_Channel *ch){	IPC_Message* ipc_msg;	struct client_msg *ret;	char *tmp_char;		ret = (struct client_msg *)g_malloc0(sizeof(struct client_msg));	ch->ops->waitin(ch);	if(ch->ops->recv(ch, &ipc_msg) != IPC_OK){		return NULL;		}	tmp_char = (char *)ipc_msg->msg_body;	ret->msg_type = *(tmp_char);/*	printf("msg_type == %d\n", (char)ret->msg_type);*/	switch(*(tmp_char)){		case EVT_INITIALIZE:			break;		case EVT_FINALIZE:			break;		case EVT_PUBLISH:			read_publish(ipc_msg->msg_body, ret);			break;		case EVT_SUBSCRIBE:			read_subscribe(ch, ipc_msg->msg_body, ret);			break;		case EVT_UNSUBSCRIBE:			read_unsubscribe(ipc_msg->msg_body, ret);			break;		case EVT_OPEN_EVENT_CHANNEL:			read_open_channel(ipc_msg->msg_body, ret);			break;		case EVT_CLOSE_EVENT_CHANNEL:			read_close_channel(ipc_msg->msg_body, ret);			break;		case EVT_CLEAR_RETENTION_TIME:			read_clear_retention_time(ipc_msg->msg_body, ret);			break;		case EVT_CHANNEL_UNLINK:			read_unlink_client(ipc_msg->msg_body, ret);			break;		default:			break;				}	return ret;}static void free_filters(SaEvtEventFilterArrayT *filters){	SaSizeT filters_number, i;	filters_number = filters->filtersNumber;	for(i=0; i<filters_number; i++){		if((filters->filters+i)->filter.pattern != NULL){			g_free((filters->filters+i)->filter.pattern);		}	}	g_free(filters->filters);	g_free(filters);	return;}static void free_subscriptions(gpointer key, gpointer value, gpointer user_data){	struct evt_subscription *sub = (struct evt_subscription *)value;	free_filters(sub->filters);	g_free(sub);	return;}static struct evt_channel *find_channel_by_name(SaUint8T *channel_name){	return (struct evt_channel *)g_hash_table_lookup(			hash_table_for_channel_name, (gpointer)channel_name);}static void free_ch_instance(gpointer key, gpointer value, gpointer user_data){	struct channel_instance *ch_instance;	GHashTable *channel_instances;	struct evt_channel *evt_ch;	ch_instance = (struct channel_instance *)value;	channel_instances = (GHashTable *)user_data;	evt_ch = find_channel_by_name(ch_instance->ch_name);	g_hash_table_remove(evt_ch->channel_instances, key);	if(ch_instance->subscriptions != NULL){	g_hash_table_foreach(ch_instance->subscriptions,			free_subscriptions, ch_instance->subscriptions);	g_hash_table_destroy(ch_instance->subscriptions);	}	g_free(ch_instance->ch_name);	g_free(ch_instance);	return;}static struct evt_ch_open_request *add_pending_ch_open_request(IPC_Channel *client, 		char *ch_name, 		struct client_msg *msg){	struct evt_ch_open_request *ch_open_req;	ch_open_req = (struct evt_ch_open_request *)g_malloc(			sizeof(struct evt_ch_open_request));	/*TODO: copy channel name*/	ch_open_req->clt_ch_handle = msg->private.ch_open->clt_ch_handle;	ch_open_req->client = client;	ch_open_req->time_out = msg->private.ch_open->time_out;	/*TODO: we need compare client+clt_ch_handle+ch_name to determine a key*/	g_hash_table_insert(info->evt_pending_ch_open_requests, 			(gpointer)ch_open_req, (gpointer)ch_open_req);	/*TODO: start a timer*/	return ch_open_req;	}static struct ipc *find_ipc(gpointer key){	struct ipc *ret;	ret = (struct ipc *)g_hash_table_lookup(hash_table_for_ipc, key);	return ret;}static void send_to_client(struct IPC_CHANNEL *client,		void *msg, SaSizeT msg_size){	struct IPC_MESSAGE	Msg;	memset(&Msg, 0, sizeof(Msg));	Msg.msg_body = msg;	Msg.msg_len = msg_size;	Msg.msg_done = NULL;	Msg.msg_private = NULL;	Msg.msg_ch = client;	client->ops->send(client, &Msg);	return;}static int send_open_channel_reply(IPC_Channel *client, 		struct evt_ch_open *ch_open, 		struct channel_instance *ch_ins,		SaErrorT ret_code){	SaSizeT str_len, msg_size;	void *msg;	char *tmp_char;			str_len = strlen(ch_open->channel_name);	msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtChannelHandleT)		+sizeof(void *)+sizeof(SaErrorT);	msg = g_malloc(msg_size);	tmp_char = (char *)msg;	*(tmp_char) = EVT_CH_OPEN_REPLY_FROM_DAEMON;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, ch_open->channel_name, str_len);	tmp_char += str_len;	memcpy(tmp_char, &(ch_open->clt_ch_handle), sizeof(SaEvtChannelHandleT));	tmp_char += sizeof(SaEvtChannelHandleT);	memcpy(tmp_char, &(ch_ins), sizeof(void *));	tmp_char += sizeof(void *);	memcpy(tmp_char, &ret_code, sizeof(SaErrorT));	tmp_char += sizeof(SaErrorT);	send_to_client(client, msg, msg_size);		return 0;}/*determine the byte order of platform*//*0 indicate big-endian; 1 indicate little-endian*/static int byte_order(void){	union{		short s;		char c[2];	} un;	un.s = 0x0102;	if((un.c[0] == 1) && (un.c[1] == 2)){		return 0;	}else if((un.c[0] == 2) && (un.c[1] == 1)){		return 1;	}	return -1;}static void deliver_event_to_local_subscriber(struct evt_subscription *subscription, 		struct evt_event *event){	SaSizeT number, msg_size, i, size=0, publisher_len;	SaEvtEventPatternT *pattern;	SaUint8T *tmp_char;	void *msg;		/*calculate the size of pattern_array*/	number = event->pattern_array->patternsNumber;	pattern = event->pattern_array->patterns;	for(i=0; i<number; i++){		size = size + (pattern+i)->patternSize;	}	size = size + (number+1)*sizeof(SaSizeT);	/*calculate the size of publisher_name*/	publisher_len = event->publisherName.length;	msg_size = 1+sizeof(SaEvtChannelHandleT)+sizeof(SaEvtSubscriptionIdT)+		sizeof(SaSizeT)+size+sizeof(SaUint8T)+sizeof(SaTimeT)+		sizeof(SaSizeT)+publisher_len+sizeof(SaTimeT)+		sizeof(SaEvtEventIdT)+sizeof(SaSizeT)+event->data_size;	msg = g_malloc(msg_size);	/*msg type == EVT_NORMAL_EVENT*/	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_NORMAL_EVENT;	tmp_char++;	memcpy(tmp_char, &(subscription->clt_ch_handle), sizeof(SaEvtChannelHandleT));	tmp_char += sizeof(SaEvtChannelHandleT);	memcpy(tmp_char, &(subscription->subscription_id), sizeof(SaEvtSubscriptionIdT));	tmp_char += sizeof(SaEvtSubscriptionIdT);	memcpy(tmp_char, &size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, &number, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	for(i=0; i<number; i++){		memcpy(tmp_char, &((pattern+i)->patternSize), sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);	}	for(i=0; i<number; i++){		strncpy(tmp_char, (pattern+i)->pattern, 				(pattern+i)->patternSize);		tmp_char += (pattern+i)->patternSize;	}		memcpy(tmp_char, &(event->priority), 1);	tmp_char++;	memcpy(tmp_char, &(event->retention_time), sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(tmp_char, &publisher_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event->publisherName.value, publisher_len);	tmp_char += publisher_len;	memcpy(tmp_char, &(event->publish_time), sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(tmp_char, &(event->event_id), sizeof(SaEvtEventIdT));	tmp_char += sizeof(SaEvtEventIdT);	memcpy(tmp_char, &(event->data_size), sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event->event_data, event->data_size);	send_to_client(subscription->client, msg, msg_size);	return;}static int fliter_match(SaEvtEventPatternT *pattern, SaEvtEventFilterT *filter){	SaSizeT filter_len = filter->filter.patternSize;	SaSizeT pattern_len = pattern->patternSize;	SaSizeT i;		switch(filter->filterType){		case SA_EVT_PREFIX_FILTER:			if(pattern_len < filter_len){				return 0;			}			for(i=0; i<filter_len; i++){				if(pattern->pattern[i] == filter->filter.pattern[i]){					continue;				}else{					return 0;				}			}			break;		case SA_EVT_EXACT_FILTER:			if(pattern_len != filter_len){				return 0;			}			for(i=0; i<filter_len; i++){				if(pattern->pattern[i] == filter->filter.pattern[i]){					continue;				}else{					return 0;				}			}			break;		case SA_EVT_SUFFIX_FILTER:			if(pattern_len < filter_len){				return 0;			}			for(i=0; i<filter_len; i++){				if(filter->filter.pattern[filter_len-1-i]					== pattern->pattern[pattern_len-1-i]){					continue;				}else{					return 0;				}			}			break;		case SA_EVT_PASS_ALL_FILTER:						break;		default:			break;			/*error message*/	}	return 1;}static int matched(SaEvtEventPatternArrayT *patterns,					SaEvtEventFilterArrayT *filters){	SaSizeT patterns_num = patterns->patternsNumber;	SaSizeT filters_num = filters->filtersNumber;	int i;	SaSizeT len = (patterns_num < filters_num) ? patterns_num:filters_num;		if(filters_num > patterns_num){		for(i=len; i<filters_num; i++){			if(((filters->filters+i)->filterType == SA_EVT_PASS_ALL_FILTER)||					((filters->filters+i)->filter.patternSize == 0)){				continue;			}else{				return 0;			}					}	}	for(i=0; i<len; i++){		if(fliter_match(patterns->patterns+i, filters->filters+i)){			continue;		}else{			return 0;		}	}	return 1;}static void search_subscription(gpointer key,					gpointer value, gpointer user_data){	struct evt_subscription *subscription;	struct evt_event *event;		subscription = (struct evt_subscription *)value;	event = (struct evt_event *)user_data;	if(matched(event->pattern_array, subscription->filters)){		deliver_event_to_local_subscriber(subscription, event);	}	return;}static void search_ch_instance(gpointer key,					gpointer value, gpointer user_data){	struct channel_instance *ch_ins;	ch_ins = (struct channel_instance *)value;	/*user_data is event to be published*/	g_hash_table_foreach(ch_ins->subscriptions,			search_subscription,			user_data);	return;}static void publish_to_local_subscriber(struct evt_channel *evt_ch, 		SaEvtEventPatternArrayT *pattern_array,		struct evt_event *event){	g_hash_table_foreach(evt_ch->channel_instances, 			search_ch_instance, 			event); 	return;}static SaUint32T get_local_event_id(void){	SaUint32T local_id;		get_handle(&event_id_database, &local_id);	return local_id;}static SaEvtEventIdT get_event_id(SaUint32T local_event_id){	SaUint32T my_node_id;	SaEvtEventIdT event_id;	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -