⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 event_lib.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 3 页
字号:
		if(msg_reply == NULL){			printf("received NULL msg from daemon\n");			return 0;		}		if(msg_reply->msg_type == EVT_CH_OPEN_REPLY_FROM_DAEMON){			if(channel_handle == 				msg_reply->private.open_ch_reply->clt_ch_handle){				if(msg_reply->private.open_ch_reply->ret_code						== SA_OK){					break;				}else{					return msg_reply->private.open_ch_reply->ret_code;				}			}else{				/*update timeout, continue */			}		}else if(msg_reply->msg_type == EVT_NORMAL_EVENT) {						event_hd = msg_reply->private.event;						append_to_event_queue(evt_hd->event_queue, event_hd);			/*TODO: update timeout, continue */		}else if(msg_reply->msg_type == EVT_ASYN_CH_OPEN_REPLY_FROM_DAEMON)		{			open_ch_reply = msg_reply->private.open_ch_reply;			append_to_reply_queue(evt_hd->event_queue,					open_ch_reply);		}else 		{			/*update timeout, continue */			/*error msg */		}			}	/*if open evt_channel succeed	 */	evt_channel_hd->channelName.length = str_len;	strncpy(evt_channel_hd->channelName.value, channelName->value, str_len);	evt_channel_hd->evt_handle = evtHandle;	evt_channel_hd->ch = evt_hd->ch;	evt_channel_hd->selectionObject = fd;	evt_channel_hd->open_flags = channelOpenFlags;	evt_channel_hd->ch_instance = 		msg_reply->private.open_ch_reply->ch_instance;	evt_channel_hd->event_handle_hash = g_hash_table_new(g_direct_hash, 				g_direct_equal);	evt_channel_hd->subscription_hash = g_hash_table_new(g_direct_hash, 				g_direct_equal);	g_hash_table_insert(channel_hash, (gpointer)channel_handle,			evt_channel_hd);	g_hash_table_insert(evt_channel_hash, (gpointer)channel_handle,			evt_channel_hd);	*(channelHandle) = channel_handle;	return SA_OK;}static void free_open_ch_reply(struct open_channel_reply *open_ch_reply){		g_free(open_ch_reply);	return;}static void free_event_queue(struct event_queue_s *event_queue){	SaSizeT i;	struct queue_head *queue;	evt_event_handle *event_tmp;	struct open_channel_reply *open_ch_reply;		if(event_queue->event_number != 0){		for(i=SA_EVT_HIGHEST_PRIORITY;i<=SA_EVT_LOWEST_PRIORITY;i++){			queue = &(event_queue->queue[i]);			while(queue->head != NULL){				event_tmp = queue->head;				queue->head = event_tmp->next;				free_event(event_tmp);			}			g_free(queue);		}	}	if(event_queue->reply_number != 0){		while(event_queue->open_ch_reply_queue.head != NULL){			open_ch_reply = event_queue->open_ch_reply_queue.head;			event_queue->open_ch_reply_queue.head = 				open_ch_reply->next;			free_open_ch_reply(open_ch_reply);		}	}	g_free(event_queue);}static void free_event(evt_event_handle *event_hd){	/*free patternArray, publisherName, then free event_hd */	g_free(event_hd->patternArray);		g_free(event_hd->eventData);	g_free(event_hd);	return;}static void free_subscription_resource(gpointer key,		gpointer value,		gpointer user_data){	GHashTable *sub_hash = user_data;	g_hash_table_remove(sub_hash, key);	g_hash_table_remove(subscription_global, key);	return;}static void free_event_resource(gpointer key,				gpointer value, gpointer user_data){	SaEvtEventHandleT event_handle;	evt_event_handle *event_hd;	GHashTable *event_handle_hash;	event_handle = (SaEvtEventHandleT)key;	event_hd = (evt_event_handle *)value;	event_handle_hash = (GHashTable *)user_data;	free_event(event_hd);	g_hash_table_remove(evt_event_hash, key);	return;}static void free_ch_resource(gpointer key,				gpointer value, gpointer user_data){	SaEvtChannelHandleT ch_handle;	evt_channel_handle *ch_hd;	GHashTable *ch_hash, *event_handle_hash, *subscription_hash;	ch_handle = (SaEvtChannelHandleT)key;	ch_hd = (evt_channel_handle *)value;	ch_hash = (GHashTable *)user_data;	g_hash_table_remove(evt_channel_hash, (gpointer)ch_handle);	g_hash_table_remove(ch_hash, (gpointer)ch_handle);		event_handle_hash = ch_hd->event_handle_hash;	if(event_handle_hash != NULL){	g_hash_table_foreach(event_handle_hash, 				free_event_resource, event_handle_hash);	}	subscription_hash = ch_hd->subscription_hash;	if(subscription_hash != NULL){	g_hash_table_foreach(subscription_hash, 				free_subscription_resource, subscription_hash);	}	return;}static SaErrorT send_evt_finalize(struct IPC_CHANNEL *ch,							evt_handle *evt_hd){	SaSizeT msg_len;	void *msg;	SaUint8T *tmp_char;	msg_len = 1;	msg = g_malloc(msg_len);	if(msg == NULL){		return SA_ERR_NO_MEMORY;	}	tmp_char = (SaUint8T *)msg;	*(tmp_char) = EVT_FINALIZE;		send_to_evt_daemon(ch, msg, msg_len);			return SA_OK;}SaErrorT saEvtFinalize(SaEvtHandleT evtHandle){	evt_handle *evt_hd;	struct IPC_CHANNEL *ch;		evt_hd = g_hash_table_lookup(evt_handle_hash, (gpointer)evtHandle);	if( evt_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	/*close the connection */	ch = evt_hd->ch;	send_evt_finalize(ch, evt_hd);		ch->ops->destroy(ch);	/*free up the resources, free the channel, the events */	g_hash_table_remove(evt_handle_hash, (gpointer)evtHandle);	g_hash_table_foreach(evt_hd->evt_channel_hash,			free_ch_resource,			evt_hd->evt_channel_hash);	free_event_queue(evt_hd->event_queue);	g_free(evt_hd);	return SA_OK;}static SaErrorT send_channel_close(IPC_Channel *ch,		evt_channel_handle *evt_channel_hd){	void *msg;	char *tmp_char;	SaSizeT str_len, msg_len;			str_len = evt_channel_hd->channelName.length;	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(void *);	msg = g_malloc(msg_len);	if(msg == NULL){		return SA_ERR_NO_MEMORY;	}	tmp_char = msg;	*tmp_char = EVT_CLOSE_EVENT_CHANNEL;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	strncpy(tmp_char, evt_channel_hd->channelName.value, str_len);	tmp_char = tmp_char + str_len;	memcpy(tmp_char, &(evt_channel_hd->ch_instance), sizeof(void *));	send_to_evt_daemon(ch, msg, msg_len);	return SA_OK;}SaErrorT saEvtChannelClose(SaEvtChannelHandleT channelHandle){	evt_channel_handle *evt_channel_hd;	evt_handle *evt_hd;	GHashTable *event_handle_hash, *subscription_hash;	/*free events, free subscriptions, free item in hash table, free evt_channel_hd */	evt_channel_hd = g_hash_table_lookup(evt_channel_hash,			(gpointer)channelHandle);	if(evt_channel_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	send_channel_close(evt_channel_hd->ch ,evt_channel_hd);		event_handle_hash = evt_channel_hd->event_handle_hash;	if(event_handle_hash != NULL){		g_hash_table_foreach(event_handle_hash, 				free_event_resource, event_handle_hash);		g_hash_table_destroy(event_handle_hash);	}		subscription_hash = evt_channel_hd->subscription_hash;	if(subscription_hash != NULL){	g_hash_table_foreach(subscription_hash, 				free_subscription_resource, subscription_hash);	g_hash_table_destroy(subscription_hash);	}	evt_hd = g_hash_table_lookup(evt_handle_hash, 			(gpointer)evt_channel_hd->evt_handle);	g_hash_table_remove(evt_hd->evt_channel_hash, (gpointer)channelHandle);	g_hash_table_remove(evt_channel_hash, (gpointer)(channelHandle));	g_free(evt_channel_hd);	return SA_OK;}SaErrorT saEvtEventAllocate(const SaEvtChannelHandleT channelHandle,                   SaEvtEventHandleT *eventHandle){	evt_event_handle *event_hd;		evt_channel_handle *evt_channel_hd;	GHashTable *event_handle_hash;		if(eventHandle == NULL){		return SA_ERR_INVALID_PARAM;	}	evt_channel_hd = g_hash_table_lookup(evt_channel_hash,			(gpointer)channelHandle);	if(evt_channel_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	event_hd = (evt_event_handle *)g_malloc0(sizeof(evt_event_handle));	if (!event_hd)		return SA_ERR_NO_MEMORY;	event_handle_hash = evt_channel_hd->event_handle_hash;	event_hd->channelId = channelHandle;	if(get_handle(&event_handle_database, eventHandle) != SA_OK){		g_free(event_hd);		return SA_ERR_LIBRARY;	}	g_hash_table_insert(evt_event_hash, (gpointer)*eventHandle, event_hd);	g_hash_table_insert(evt_channel_hd->event_handle_hash, 			(gpointer)*eventHandle, event_hd);	return SA_OK;}SaErrorT saEvtEventFree(SaEvtEventHandleT eventHandle){	evt_event_handle *event_hd;	evt_channel_handle *evt_channel_hd;		event_hd = g_hash_table_lookup(evt_event_hash, (gpointer)eventHandle);	if(event_hd == NULL){		return SA_ERR_BAD_HANDLE;	}		free_event(event_hd);		evt_channel_hd = g_hash_table_lookup(evt_channel_hash,			(gpointer)event_hd->channelId);	if(evt_channel_hd == NULL){		return SA_ERR_LIBRARY;	}		g_hash_table_remove(evt_channel_hd->event_handle_hash,			(gpointer)eventHandle);	g_hash_table_remove(evt_event_hash,			(gpointer)eventHandle);	put_handle(&event_handle_database, eventHandle);	return SA_OK;}static SaErrorT copy_patternarray(evt_event_handle *event_hd,		const SaEvtEventPatternArrayT *patternArray){	SaSizeT number, i, size=0;	SaEvtEventPatternT *pattern;	SaSizeT *tmp_size;	SaUint8T *tmp_char;	number = patternArray->patternsNumber;	pattern = patternArray->patterns;	/*size = field1(number of patterns)+ field2(length of each pattern)+ field3(patterns) */	for(i=0; i<number; i++){		size = size + (pattern+i)->patternSize;	}	size = size + (number+1)*sizeof(SaSizeT);	event_hd->patternArray = g_malloc(size);	event_hd->event_size = size;	if(event_hd->patternArray == NULL){		return SA_ERR_NO_MEMORY;	}	tmp_size = event_hd->patternArray;	*(tmp_size) = number;	tmp_size++;	for(i=0; i<number; i++){		*(tmp_size) = (pattern+i)->patternSize;		tmp_size++;	}	tmp_char = (SaUint8T *)tmp_size;	for(i=0; i<number; i++){		strncpy(tmp_char, (pattern+i)->pattern,				(pattern+i)->patternSize);		tmp_char = tmp_char + (pattern+i)->patternSize;	}	return SA_OK;}SaErrorT saEvtEventAttributesSet(SaEvtEventHandleT eventHandle,                        const SaEvtEventPatternArrayT *patternArray,                        SaUint8T priority,                        SaTimeT retentionTime,                        const SaNameT *publisherName){	evt_event_handle *event_hd;	if((patternArray == NULL) || (publisherName == NULL)){		return SA_ERR_INVALID_PARAM;	}	if(priority > SA_EVT_LOWEST_PRIORITY){		return SA_ERR_INVALID_PARAM;	}		event_hd = (evt_event_handle *)g_hash_table_lookup(evt_event_hash,			(gpointer)(eventHandle));	if(event_hd == NULL){		return SA_ERR_BAD_HANDLE;	}		copy_patternarray(event_hd, patternArray);		event_hd->priority = priority;	event_hd->retentionTime = retentionTime;	event_hd->publisherName.length = publisherName->length;	memcpy(event_hd->publisherName.value, publisherName->value,			publisherName->length);			event_hd->publishTime = (SaTimeT)time(NULL);		event_hd->set_flag = 1;	return SA_OK;}#define min(A,B) ((A)<(B) ? (A) : (B))SaErrorT saEvtEventAttributesGet(const SaEvtEventHandleT eventHandle,                        SaEvtEventPatternArrayT *patternArray,                        SaUint8T *priority,                        SaTimeT *retentionTime,                        SaNameT *publisherName,                                                SaTimeT *publishTime,                        SaEvtEventIdT *eventId){	evt_event_handle *event_hd;	SaSizeT number, *tmp_size, min_number, i;	SaUint8T *tmp_char;	SaEvtEventPatternT *patterns;		event_hd = (evt_event_handle *)g_hash_table_lookup(			evt_event_hash, (gpointer)eventHandle);	if(event_hd == NULL){		return SA_ERR_BAD_HANDLE;	}		/*TODO: what should be done if patterSize conflicts */	if(patternArray != NULL){		tmp_size = (SaSizeT *)event_hd->patternArray;		number = *(tmp_size);		tmp_size++;		tmp_char = (SaUint8T *)(tmp_size+number);		min_number = min(patternArray->patternsNumber, number);		patternArray->patternsNumber = min_number;		patterns = patternArray->patterns;		for(i=0; i<min_number; i++){			patterns[i].patternSize = min(patterns[i].patternSize,					*(tmp_size));			memcpy(patterns[i].pattern, tmp_char, 					patterns[i].patternSize);			tmp_char += *(tmp_size);			tmp_size++;		}	}	if(priority != NULL){		*(priority) = event_hd->priority;	}	if(retentionTime != NULL){		*(retentionTime) = event_hd->retentionTime;	}	if(publisherName != NULL){		publisherName->length = event_hd->publisherName.length;		memcpy(publisherName->value, event_hd->publisherName.value,			publisherName->length);	}	if(publishTime != NULL){		*(publishTime) = event_hd->publishTime;	}	if(eventId != NULL){		*(eventId) = event_hd->eventId;	}	return SA_OK;}static SaErrorT send_publish(IPC_Channel *ch,		SaNameT *channel_name,		SaEvtEventHandleT eventHandle,		evt_event_handle *event_hd,		const void *eventData,		SaSizeT eventDataSize){	void *msg;	char *tmp_char;	SaSizeT str_len, publisher_len, msg_len;		str_len = channel_name->length;	publisher_len = event_hd->publisherName.length;	msg_len = 1+sizeof(SaSizeT)+str_len+sizeof(SaEvtEventHandleT)+		sizeof(SaUint8T)+sizeof(SaTimeT)+sizeof(SaSizeT)+		publisher_len+sizeof(SaTimeT)+sizeof(SaSizeT)+		event_hd->event_size+sizeof(SaSizeT)+eventDataSize;			msg = g_malloc(msg_len);	if(msg == NULL){		return SA_ERR_NO_MEMORY;	}	tmp_char = (char *)msg;	*(tmp_char) = EVT_PUBLISH;	tmp_char++;	memcpy(tmp_char, &str_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, channel_name->value, str_len);	tmp_char += str_len;	memcpy(tmp_char, &eventHandle, sizeof(SaEvtEventHandleT));	tmp_char += sizeof(SaEvtEventHandleT);	memcpy(tmp_char, &(event_hd->priority), sizeof(SaUint8T));	tmp_char += sizeof(SaUint8T);	memcpy(tmp_char, &(event_hd->retentionTime), sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(tmp_char, &publisher_len, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event_hd->publisherName.value, publisher_len);	tmp_char += publisher_len;		memcpy(tmp_char, &(event_hd->publishTime), sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(tmp_char, &(event_hd->event_size), sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, event_hd->patternArray, event_hd->event_size);	tmp_char += event_hd->event_size;	memcpy(tmp_char, &eventDataSize, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(tmp_char, eventData, eventDataSize);	tmp_char += eventDataSize;	send_to_evt_daemon(ch, msg, msg_len);	return SA_OK;}SaErrorT saEvtEventPublish(const SaEvtEventHandleT eventHandle,                  const void *eventData,                  SaSizeT eventDataSize,		  SaEvtEventIdT *eventId){	evt_channel_handle *evt_channel_hd;	evt_event_handle *event_hd;	evt_handle *evt_hd;	struct IPC_CHANNEL *ch;	SaNameT *channel_name;		int fd;	fd_set rset;	struct daemon_msg *msg_reply;	struct open_channel_reply *open_ch_reply;			event_hd = g_hash_table_lookup(evt_event_hash, (gpointer)eventHandle);	if(event_hd == NULL){		return SA_ERR_BAD_HANDLE;	}	if(event_hd->set_flag == 0){		return SA_ERR_INVALID_PARAM;	}	evt_channel_hd = g_hash_table_lookup(evt_channel_hash,			(gpointer)event_hd->channelId);	if(evt_channel_hd == NULL){		return SA_ERR_LIBRARY;	}	evt_hd = g_hash_table_lookup(evt_handle_hash,			(gpointer)(evt_channel_hd->evt_handle));	if(evt_hd == NULL){		return SA_ERR_LIBRARY;	}	ch = evt_channel_hd->ch;	channel_name = &(evt_channel_hd->channelName);	send_publish(ch, channel_name, eventHandle, 			event_hd, eventData, eventDataSize);	sleep(1);	fd = evt_channel_hd->selectionObject;		for(;;){		FD_ZERO(&rset);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -