⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_lib.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 3 页
字号:
		FD_SET(fd, &rset);		if(select(fd + 1, &rset, NULL,NULL,NULL) == -1){			/*perror("select"); */			return(1);		}		msg_reply = read_from_ipc(ch);		if(msg_reply->msg_type == EVT_PUBLISH_REPLY){			if(eventHandle == 				msg_reply->private.pub_reply->eventHandle){				if(msg_reply->private.pub_reply->ret_code						== SA_OK){					break;				}else{					return msg_reply->private.pub_reply->ret_code;				}			}else{				/*update timeout, continue */			}		}else if(msg_reply->msg_type == EVT_NORMAL_EVENT)		{			/*TODO: update timeout, continue */			event_hd = msg_reply->private.event;			append_to_event_queue(evt_hd->event_queue, event_hd);		}else if(msg_reply->msg_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON)		{			open_ch_reply = msg_reply->private.open_ch_reply;			append_to_reply_queue(evt_hd->event_queue,					open_ch_reply);		}else		{			/*update timeout, continue */			/*error msg */		}		/*if msg_type is normal event, buffer it and continue */		/*if msg_type is publish_reply, break */	}	*(eventId) = (SaEvtEventIdT)msg_reply->private.pub_reply->event_id;	return SA_OK;	}SaErrorT saEvtEventSubscribe(const SaEvtChannelHandleT channelHandle,                    const SaEvtEventFilterArrayT *filters,                    SaEvtSubscriptionIdT subscriptionId){	evt_channel_handle *evt_channel_hd;	struct IPC_CHANNEL *ch;	GHashTable *subscription_hash;	SaUint8T *tmp_char;	SaSizeT msg_size;	SaSizeT str_len, number, i, filter_size=0;	void *msg;	SaEvtEventFilterT *filter;	void *tmp_pointer;		evt_channel_hd = (evt_channel_handle *)g_hash_table_lookup(			evt_channel_hash, 			(gpointer)channelHandle);	if(evt_channel_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	if((evt_channel_hd->open_flags & SA_EVT_CHANNEL_SUBSCRIBER)			!= SA_EVT_CHANNEL_SUBSCRIBER){		return SA_ERR_INVALID_PARAM;	}	ch = evt_channel_hd->ch;	subscription_hash = evt_channel_hd->subscription_hash;	tmp_pointer = g_hash_table_lookup(subscription_global,				(gpointer)subscriptionId);	if(tmp_pointer != NULL){		return SA_ERR_EXIST;	}	 	g_hash_table_insert(subscription_hash, 			(gpointer)subscriptionId, 			(gpointer)subscriptionId);	g_hash_table_insert(subscription_global, 			(gpointer)subscriptionId, 			(gpointer)subscriptionId);		/*msg_size= msg_type+ch_name_len+ch_name+channel_id+sub_id+filter_len+filters_number+pattern */	str_len = evt_channel_hd->channelName.length;	/* calculate filter length, then copy it to msg */	number = filters->filtersNumber;	filter = filters->filters;	for(i=0; i<number; i++){		filter_size += filter[i].filter.patternSize;		}	filter_size = sizeof(SaSizeT)+(number*sizeof(SaSizeT))+		(number*sizeof(SaEvtEventFilterTypeT))+filter_size;	msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(void *)+		sizeof(subscriptionId)+sizeof(SaSizeT)+		filter_size+sizeof(SaEvtChannelHandleT);	msg = g_malloc(msg_size);	if(msg == NULL){		return SA_ERR_NO_MEMORY;	}	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_SUBSCRIBE;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, evt_channel_hd->channelName.value, str_len);	tmp_char += str_len;	memcpy(tmp_char, &(evt_channel_hd->ch_instance), sizeof(void *));	tmp_char += sizeof(void *);	memcpy(tmp_char, &subscriptionId, sizeof(SaEvtSubscriptionIdT));	tmp_char += sizeof(SaEvtSubscriptionIdT);	memcpy(tmp_char, &filter_size, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, &number, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);		for(i=0; i<number; i++){		memcpy(tmp_char, &(filter[i].filterType), sizeof(SaEvtEventFilterTypeT));		tmp_char += sizeof(SaEvtEventFilterTypeT);		memcpy(tmp_char, &(filter[i].filter.patternSize), sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);		memcpy(tmp_char, filter[i].filter.pattern,				filter[i].filter.patternSize);		tmp_char += filter[i].filter.patternSize;	}		memcpy(tmp_char, &channelHandle, sizeof(SaEvtChannelHandleT));	send_to_evt_daemon(ch, msg, msg_size);	return SA_OK;}SaErrorT saEvtEventUnsubscribe(	SaEvtChannelHandleT channelHandle,	SaEvtSubscriptionIdT subscriptionId	){	evt_channel_handle *evt_channel_hd;	GHashTable *subscription_hash;		SaSizeT msg_size;	SaSizeT str_len;	void *msg;	char *tmp_char;	struct IPC_CHANNEL *ch;			evt_channel_hd = (evt_channel_handle *)g_hash_table_lookup(			evt_channel_hash, 			(gpointer)channelHandle);	if(evt_channel_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	ch = evt_channel_hd->ch;	subscription_hash = evt_channel_hd->subscription_hash;	if(g_hash_table_lookup(subscription_hash,(gpointer)subscriptionId)					 == NULL){		return SA_ERR_NAME_NOT_FOUND;	}	g_hash_table_remove(subscription_hash, (gpointer)subscriptionId);	/* construct the msg and send to daemon */	str_len = evt_channel_hd->channelName.length;	msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(void *)+		sizeof(SaEvtSubscriptionIdT);	msg = g_malloc(msg_size);	if(msg == NULL){		return SA_ERR_LIBRARY;	}	tmp_char = msg;	*(tmp_char) = EVT_UNSUBSCRIBE;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	strncpy(tmp_char, evt_channel_hd->channelName.value, str_len);	tmp_char += str_len;	memcpy(tmp_char, &(evt_channel_hd->ch_instance), sizeof(void *));	tmp_char += sizeof(void *);	memcpy(tmp_char, &subscriptionId, sizeof(SaEvtSubscriptionIdT));	send_to_evt_daemon(ch, msg, msg_size);	return SA_OK;}SaErrorT saEvtSelectionObjectGet(const SaEvtHandleT evtHandle,                        SaSelectionObjectT *selectionObject){	evt_handle *evt_hd;		if(selectionObject == NULL){		return SA_ERR_INVALID_PARAM;	}	evt_hd = g_hash_table_lookup(evt_handle_hash, (gpointer)evtHandle);	if( evt_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	*selectionObject = evt_hd->selectionObject;	return SA_OK;}static void *read_from_event_queue(struct event_queue_s *event_queue,		SaUint8T *type){	SaSizeT i;	struct queue_head *queue;	evt_event_handle *event_tmp = NULL;	struct open_channel_reply *reply_tmp = NULL;	void *ret = NULL;	if(event_queue->event_number != 0){				for(i=SA_EVT_HIGHEST_PRIORITY;i<=SA_EVT_LOWEST_PRIORITY;i++){			queue = &(event_queue->queue[i]);			if(queue->head != NULL){				event_tmp = queue->head;				queue->head = queue->head->next;				if(queue->head == NULL){					queue->tail = NULL;				}				event_tmp->next = NULL;				event_queue->event_number--;				*(type) = EVT_NORMAL_EVENT;				ret = (void *)event_tmp;				break;			}else{				continue;			}				}	}else if(event_queue->reply_number != 0){		reply_tmp = event_queue->open_ch_reply_queue.head;		event_queue->open_ch_reply_queue.head = 			event_queue->open_ch_reply_queue.head->next;		if(event_queue->open_ch_reply_queue.head == NULL){			event_queue->open_ch_reply_queue.tail = NULL;		}		reply_tmp->next = NULL;		event_queue->reply_number--;		*(type) = EVT_CH_OPEN_REPLY_FROM_DAEMON;		ret = (void *)reply_tmp;	}		return ret;}SaErrorT saEvtDispatch(const SaEvtHandleT evtHandle,				SaDispatchFlagsT dispatchFlags){	evt_handle *evt_hd;	struct IPC_CHANNEL *ch;	evt_event_handle *event_hd;	evt_channel_handle *evt_channel_hd;	struct daemon_msg *msg;	SaEvtEventHandleT event_handle;	SaUint8T event_type;	void *tmp_void;	struct open_channel_reply *open_ch_reply;	int fd;	fd_set rset;	struct timeval time_out;	int ret;		evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash,			(gpointer)evtHandle);	if( evt_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	if((dispatchFlags < 1) || (dispatchFlags > 3)){		return SA_ERR_BAD_FLAGS;	}		/*can read two types of message from IPC: */	/*1 EVT_NORMAL_EVENT */	/*2 EVT_CH_OPEN_REPLY_FROM_DAEMON: for channel_open_async */			switch(dispatchFlags){		case SA_DISPATCH_ONE:			tmp_void = read_from_event_queue(evt_hd->event_queue,						&event_type);			if(tmp_void == NULL){				ch = evt_hd->ch;				msg = read_from_ipc(ch);				if(msg == NULL){					ch->ops->destroy(ch);					return SA_ERR_LIBRARY;				}				event_type = msg->msg_type;				tmp_void = (void *)msg->private.event;			}			if(event_type == EVT_NORMAL_EVENT){				event_hd = (evt_event_handle *)tmp_void;				evt_channel_hd = g_hash_table_lookup(						evt_channel_hash,						(gpointer)event_hd->channelId);				if(evt_channel_hd == NULL){					return SA_ERR_LIBRARY;				}				if(get_handle(&event_handle_database,							&event_handle) != SA_OK){					return SA_ERR_LIBRARY;				}				g_hash_table_insert(evt_event_hash,					(gpointer)event_handle,					event_hd);				g_hash_table_insert(					evt_channel_hd->event_handle_hash, 					(gpointer)event_handle,					event_hd);				evt_hd->callbacks.saEvtEventDeliverCallback(					event_hd->subscription_id,					event_handle,					event_hd->eventDataSize);			}else if(event_type ==					EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON){				open_ch_reply = (struct open_channel_reply *)					tmp_void;				evt_channel_hd = g_hash_table_lookup(						evt_channel_hash, 						(gpointer)open_ch_reply->clt_ch_handle);				if(evt_channel_hd == NULL){					/*TODO: free open_ch_reply */					return SA_ERR_LIBRARY;				}				evt_channel_hd->ch_instance = 					open_ch_reply->ch_instance;				evt_hd->callbacks.saEvtChannelOpenCallback(						evt_channel_hd->invocation,						open_ch_reply->clt_ch_handle,						open_ch_reply->ret_code);			}			break;		case SA_DISPATCH_ALL:			fd = evt_hd->selectionObject;			time_out.tv_sec = 0;			for(;;){				tmp_void = read_from_event_queue(						evt_hd->event_queue,						&event_type);				if(tmp_void == NULL){					FD_ZERO(&rset);					FD_SET(fd, &rset);					ret = select(fd + 1, &rset,NULL,NULL,							&time_out);					if( ret == -1){						/*error */						return SA_ERR_LIBRARY;					}else if(ret == 0){						return SA_OK;					}					ch = evt_hd->ch;					msg = read_from_ipc(ch);					if(msg == NULL){						ch->ops->destroy(ch);						return SA_ERR_LIBRARY;					}					event_type = msg->msg_type;					tmp_void = (void *)msg->private.event;					}				if(event_type == EVT_NORMAL_EVENT){					event_hd = (evt_event_handle *)						tmp_void;					evt_channel_hd = g_hash_table_lookup(						evt_channel_hash,						(gpointer)event_hd->channelId);					if(evt_channel_hd == NULL){						return SA_ERR_LIBRARY;					}					if(get_handle(&event_handle_database,							&event_handle) != SA_OK){						return SA_ERR_LIBRARY;					}					g_hash_table_insert(evt_event_hash,						(gpointer)event_handle,						event_hd);					g_hash_table_insert(					  evt_channel_hd->event_handle_hash, 					  (gpointer)event_handle,					  event_hd);					evt_hd->callbacks.						saEvtEventDeliverCallback(						event_hd->subscription_id,						event_handle,						event_hd->eventDataSize);				}else if(event_type == 					EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON){					open_ch_reply = 						(struct open_channel_reply *)						tmp_void;					evt_channel_hd = g_hash_table_lookup(						evt_channel_hash, 						(gpointer)open_ch_reply->clt_ch_handle);					if(evt_channel_hd == NULL){						/*TODO: free open_ch_reply */						return SA_ERR_LIBRARY;					}					evt_channel_hd->ch_instance = 						open_ch_reply->ch_instance;					evt_hd->callbacks.						saEvtChannelOpenCallback(						evt_channel_hd->invocation,						open_ch_reply->clt_ch_handle,						open_ch_reply->ret_code);				}			}			break;		case SA_DISPATCH_BLOCKING:			break;	}	return SA_OK;}SaErrorTsaEvtEventDataGet(SaEvtEventHandleT eventHandle,		void *eventData,		SaSizeT *eventDataSize){	evt_event_handle *event_hd;	SaSizeT data_size;		event_hd = (evt_event_handle *)g_hash_table_lookup(evt_event_hash,			(gpointer)(eventHandle));	if(event_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	data_size = (event_hd->eventDataSize > *eventDataSize)		? *eventDataSize : event_hd->eventDataSize;	*eventDataSize = data_size;	memcpy(eventData, event_hd->eventData, data_size);	return SA_OK;}SaErrorT saEvtChannelUnlink(SaEvtHandleT evtHandle,		const SaNameT *channelName){	void *msg;	char *tmp_char;	SaSizeT str_len, msg_len;	evt_handle *evt_hd;		evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash,					(gpointer)(long)evtHandle);	if(evt_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	str_len = channelName->length;	msg_len = sizeof(char)+sizeof(SaSizeT)+str_len;	msg = g_malloc(msg_len);	if(msg == NULL){		return SA_ERR_NO_MEMORY;	}		tmp_char = (char *)msg;	*(tmp_char) = EVT_CHANNEL_UNLINK;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	strncpy(tmp_char, channelName->value, str_len);	send_to_evt_daemon(evt_hd->ch, msg, msg_len);	return SA_OK;}SaErrorT saEvtEventRetentionTimeClear(	SaEvtChannelHandleT channelHandle,	const SaEvtEventIdT eventId	){	evt_channel_handle *evt_channel_hd;	SaSizeT str_len, msg_size;	void *msg;	char *tmp_char;		struct IPC_CHANNEL *ch;	int fd;	fd_set rset;	struct daemon_msg *msg_reply;	struct clear_retention_time_reply *clear_reply;	evt_event_handle *event_hd;	struct open_channel_reply *open_ch_reply;	evt_handle *evt_hd;		evt_channel_hd = (evt_channel_handle *)g_hash_table_lookup(			evt_channel_hash, 			(gpointer)(long)channelHandle);	if(evt_channel_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	evt_hd = (evt_handle *)g_hash_table_lookup(evt_handle_hash,			(gpointer)(long)(evt_channel_hd->evt_handle));	if(evt_hd == NULL){		return SA_ERR_LIBRARY;	}	/*msg_size=1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT) */	str_len = evt_channel_hd->channelName.length;	msg_size = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventIdT);	msg = g_malloc(msg_size);	tmp_char = (char *)msg;	*(tmp_char) = EVT_CLEAR_RETENTION_TIME;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	strncpy(tmp_char, evt_channel_hd->channelName.value, str_len);	tmp_char += str_len;	memcpy(tmp_char, &eventId, sizeof(SaEvtEventIdT));	send_to_evt_daemon(evt_channel_hd->ch, msg, msg_size);		/*select wait until receiving the reply	 */	ch = evt_channel_hd->ch;	fd = evt_channel_hd->selectionObject;	for(;;){		FD_ZERO(&rset);		FD_SET(fd, &rset);		if(select(fd + 1, &rset, NULL,NULL,NULL) == -1){			/*perror("select"); */			return SA_ERR_LIBRARY;		}		msg_reply = read_from_ipc(ch);		if(msg_reply->msg_type == EVT_CLEAR_RETENTION_TIME_REPLY){			clear_reply = msg_reply->private.clear_retention_reply;			if(clear_reply->event_id == eventId){				return clear_reply->ret_code;			}else{				/*error */				/*continue to wait reply from daemon */				continue;			}		}else if(msg_reply->msg_type == EVT_NORMAL_EVENT){			event_hd = msg_reply->private.event;			append_to_event_queue(evt_hd->event_queue, event_hd);		}else if(msg_reply->msg_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON){			open_ch_reply = msg_reply->private.open_ch_reply;			append_to_reply_queue(evt_hd->event_queue,					open_ch_reply);		}	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -