⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 eventd.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
/* $Id: eventd.c,v 1.8 2004/11/22 20:06:42 gshi Exp $ *//*  * eventd.c: source file for event daemon * * Copyright (C) 2004 Forrest,Zhao <forrest.zhao@intel.com> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <clplumbing/cl_signal.h>#include "event.h"int global_debug=0;int global_verbose=0;static struct sa_handle_database event_id_database;GHashTable *hash_table_for_ipc;GHashTable *hash_table_for_channel_name;typedef struct {	SaEvtEventFilterT *filters;	SaSizeT filtersNumber;}evt_filter_array;struct evt_event {	SaEvtEventIdT event_id; /*globally unique  */	SaTimeT retention_time;	SaNameT publisherName;	SaTimeT publish_time;	SaUint8T priority;	SaEvtEventPatternArrayT *pattern_array;	SaSizeT pattern_size;	void *event_data;	SaSizeT data_size;	SaEvtEventHandleT clt_event_hd;};struct evt_subscription {	SaSizeT filters_size;	SaEvtEventFilterArrayT *filters;	IPC_Channel *client;	SaEvtChannelHandleT clt_ch_handle;	SaEvtSubscriptionIdT subscription_id;		void *ch_id; /*at daemon side */};struct evt_new_subscription {	char *channel_name;	SaEvtEventFilterArrayT *filters;	char *orig;	/*at daemon side */	/*void *ch_id; */	SaUint64T ch_id; /*in fact it's a pointer, in order to portable between 32 bit and 64 bit platform, we define it as 64 bit length */	SaEvtSubscriptionIdT subscription_id;};struct evt_new_subscription_reply {	char *channel_name;	SaUint64T ch_id; /*at daemon side*/	SaEvtSubscriptionIdT subscription_id;	struct evt_event *event;};struct channel_instance {	GHashTable *subscriptions;	SaEvtChannelHandleT clt_ch_handle;	char *ch_name;};struct evt_channel {	char *channel_name;	int unlink;	GHashTable *channel_instances;	unsigned int use_count;		GHashTable *event_cache;};struct ipc {	IPC_Channel *client;	GHashTable *channel_instances;};struct evt_ch_open_request {	char *channel_name;	SaEvtChannelHandleT clt_ch_handle;	SaTimeT time_out;	IPC_Channel *client;	};struct evt_ch_open {	char *channel_name;	SaEvtChannelHandleT clt_ch_handle;	SaEvtChannelOpenFlagsT ch_open_flags;	SaTimeT time_out;};struct evt_ch_close {	char *channel_name;	void *ch_ins;};struct evt_retention_clear {	SaEvtEventIdT event_id;};struct evt_retention_clear_reply {	SaEvtEventIdT event_id;	SaErrorT ret_code;};struct evt_ch_open_request_remote{	char *channel_name;	SaUint64T key;};struct evt_ch_open_reply_remote{	char *channel_name;	SaUint64T key;};struct client_msg{	enum evt_type msg_type;	char *channel_name;	union {		struct evt_event *event;  /*publish*/		struct evt_subscription *subscription;		struct evt_ch_open *ch_open;		struct evt_ch_close *ch_close;		struct evt_retention_clear *retention_clear;		struct evt_retention_clear_reply *retention_clear_reply;		struct evt_new_subscription *new_subscription;		struct evt_new_subscription_reply *new_sub_reply;		struct evt_ch_open_request_remote *ch_open_request_remote;		struct evt_ch_open_reply_remote *ch_open_reply_remote;	} private;};struct node_element {	uint  NodeUuid;	char NodeID[NODEIDSIZE];	char Status[STATUSSIZE];		};struct node_list_s {	uint	   node_count;	uint	   mynode;	struct node_element nodes[MAXNODE];};struct node_list_s node_list;#define EVT_SERVICE "event_service"#define BIN_CONTENT "bin_content"void hton_64(const SaUint64T *src_64, SaUint64T *dst_64);void ntoh_64(const SaUint64T *src_64, SaUint64T *dst_64);static void add_node_l(const char *node,					const char *status, const char *mynode){	int nodecount, mynode_idx, i, j;	char value;	nodecount = node_list.node_count;	if (nodecount == 0) {		mynode_idx = -1;	} else {		mynode_idx = node_list.mynode;	}	for ( i = 0 ; i < nodecount ; i++ ) {		value = strncmp(node_list.nodes[i].NodeID, node, NODEIDSIZE);		assert(value!=0);		if(value > 0) {			break;		}	}	for ( j = nodecount; j>i; j-- ) {		node_list.nodes[j] = node_list.nodes[j-1];		node_list.nodes[j].NodeUuid = j;	}	strncpy(node_list.nodes[i].NodeID, node, NODEIDSIZE);	strncpy(node_list.nodes[i].Status, status, STATUSSIZE);	node_list.nodes[i].NodeUuid = i;	node_list.node_count++;	if (strncmp(mynode, node, NODEIDSIZE) == 0) {		node_list.mynode = i;	} else if (mynode_idx != -1 && i <= mynode_idx) {		node_list.mynode = mynode_idx+1;	}	return;}static voidLinkStatus(const char * node, const char * lnk, const char * status ,				void * private){	cl_log(LOG_INFO, "Link Status update: Link %s/%s "		"now has status %s", node, lnk, status);	return;}struct evt_info {	ll_cluster_t *hb;	struct evt_channel *evt_channel_head;		GHashTable *evt_pending_ch_open_requests;	GHashTable *evt_pending_clear_requests;};struct evt_info *info;void *evt_daemon_initialize(void);void *evt_daemon_initialize(){	ll_cluster_t*	hb_fd;	const char *	node;	const char *	hname;	const char *	status;	unsigned	fmask;	hb_fd = ll_cluster_new("heartbeat");	if (hb_fd->llc_ops->signon(hb_fd, "ccm")!= HA_OK) {		cl_log(LOG_ERR, "Cannot sign on with heartbeat");		cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd));		return NULL;	}	if((hname = hb_fd->llc_ops->get_mynodeid(hb_fd)) == NULL) {		cl_log(LOG_ERR, "get_mynodeid() failed");		return NULL;	}	if (hb_fd->llc_ops->set_ifstatus_callback(hb_fd, LinkStatus, NULL)								!=HA_OK){		cl_log(LOG_ERR, "Cannot set if status callback");		cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd));		return NULL;	}	fmask = LLC_FILTER_DEFAULT;	if (hb_fd->llc_ops->setfmode(hb_fd, fmask) != HA_OK) {		cl_log(LOG_ERR, "Cannot set filter mode");		cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd));		return NULL;	}	if (hb_fd->llc_ops->init_nodewalk(hb_fd) != HA_OK) {		cl_log(LOG_ERR, "Cannot start node walk");		cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd));		return NULL;	}	while((node = hb_fd->llc_ops->nextnode(hb_fd))!= NULL) {		/* ignore non normal nodes */		if(strcmp(hb_fd->llc_ops->node_type(hb_fd, node), "normal") != 0) {			if(strcmp(node,hname) == 0) {				cl_log(LOG_ERR, "This cluster node: %s: " "is a ping node",								node);				return NULL;			}			continue;		}		status =  hb_fd->llc_ops->node_status(hb_fd, node);		if(global_debug) {			cl_log(LOG_DEBUG, "Cluster node: %s: status: %s", node,	status);		}		/* add the node to the node list */		add_node_l(node, status, hname);	}	if (hb_fd->llc_ops->end_nodewalk(hb_fd) != HA_OK) {		cl_log(LOG_ERR, "Cannot end node walk");		cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd));		return NULL;	}	info = (struct evt_info *)g_malloc0(sizeof(struct evt_info));	info->evt_pending_ch_open_requests = g_hash_table_new(g_direct_hash, 				g_direct_equal);	info->evt_pending_clear_requests = g_hash_table_new(g_direct_hash, 				g_direct_equal);	event_id_database.handle_count = 0;	return hb_fd;}typedef struct hb_usrdata_s {	ll_cluster_t	*hb_fd;	GMainLoop	*mainloop;} hb_usrdata_t;static void read_publish(void *msg, struct client_msg *ret){	struct evt_event *event;	SaUint8T *tmp_char, *tmp_char_pattern;	SaSizeT str_len, number, i;	SaEvtEventPatternArrayT *pattern_array;	SaEvtEventPatternT *patterns;	event = (struct evt_event *)g_malloc(sizeof(struct evt_event));		ret->private.event = event;	pattern_array = (SaEvtEventPatternArrayT *)g_malloc(			sizeof(SaEvtEventPatternArrayT));	event->pattern_array = pattern_array;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	ret->channel_name = g_malloc(str_len+1);	tmp_char += sizeof(SaSizeT);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&(event->clt_event_hd), tmp_char, sizeof(SaEvtEventHandleT));	tmp_char += sizeof(SaEvtEventHandleT);	event->priority = *(tmp_char);	tmp_char++;	memcpy(&(event->retention_time), tmp_char, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	event->publisherName.length = str_len;	memcpy(event->publisherName.value, tmp_char, str_len);	/*event->publisherName[str_len] = '\0';*/	tmp_char += str_len;	memcpy(&(event->publish_time), tmp_char, sizeof(SaTimeT));	tmp_char += sizeof(SaTimeT);	memcpy(&(event->pattern_size), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	memcpy(&number, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	pattern_array->patternsNumber = number;	patterns = (SaEvtEventPatternT *)g_malloc(			sizeof(SaEvtEventPatternT)*number);	pattern_array->patterns = patterns;	tmp_char_pattern = tmp_char + number*sizeof(SaSizeT);	for(i=0; i<number; i++){		memcpy(&(patterns[i].patternSize), tmp_char, sizeof(SaSizeT));		tmp_char += sizeof(SaSizeT);		patterns[i].pattern = g_malloc(patterns[i].patternSize);		memcpy(patterns[i].pattern, tmp_char_pattern, patterns[i].patternSize);		tmp_char_pattern += patterns[i].patternSize;	}	tmp_char = tmp_char_pattern;	memcpy(&(event->data_size), tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	event->event_data = g_malloc(event->data_size);	memcpy(event->event_data, tmp_char, event->data_size);	return;}static void read_unsubscribe(void *msg, struct client_msg *ret){	struct evt_subscription *subscription;	SaUint8T *tmp_char;	SaSizeT str_len;			subscription = (struct evt_subscription *)g_malloc(			sizeof(struct evt_subscription));	ret->private.subscription = subscription;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&(subscription->ch_id), tmp_char, sizeof(void *));	tmp_char += sizeof(void *);	memcpy(&(subscription->subscription_id), tmp_char, sizeof(SaEvtSubscriptionIdT));	return;}static void read_open_channel(void *msg, struct client_msg *ret){	SaUint8T *tmp_char;	SaSizeT str_len;		struct evt_ch_open *ch_open;		ch_open = (struct evt_ch_open *)g_malloc(sizeof(struct evt_ch_open));	ret->private.ch_open = ch_open;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	ch_open->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	memcpy(ch_open->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	ch_open->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&(ch_open->clt_ch_handle), tmp_char, sizeof(SaEvtChannelHandleT));	tmp_char += sizeof(SaEvtChannelHandleT);	memcpy(&(ch_open->ch_open_flags), tmp_char, sizeof(SaEvtChannelOpenFlagsT));	return;}static void read_close_channel(void *msg, struct client_msg *ret){	SaUint8T *tmp_char;	SaSizeT str_len;	struct evt_ch_close *ch_close;		ch_close = (struct evt_ch_close *)g_malloc(			sizeof(struct evt_ch_close));	ret->private.ch_close = ch_close;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&(ch_close->ch_ins), tmp_char, sizeof(void *));	return;}static void read_clear_retention_time(void *msg, struct client_msg *ret){	struct evt_retention_clear *retention_clear;	SaUint8T *tmp_char;	SaSizeT str_len;		retention_clear = (struct evt_retention_clear *)g_malloc(			sizeof(struct evt_retention_clear));	ret->private.retention_clear = retention_clear;	tmp_char = (SaUint8T *)msg;	tmp_char++;	memcpy(&str_len, tmp_char, sizeof(SaSizeT));	tmp_char += sizeof(SaSizeT);	ret->channel_name = g_malloc(str_len+1);	memcpy(ret->channel_name, tmp_char, str_len);	ret->channel_name[str_len] = '\0';	tmp_char += str_len;	memcpy(&(retention_clear->event_id), tmp_char, sizeof(SaEvtEventIdT));	return;}static void read_subscribe(IPC_Channel *ch, void *msg, struct client_msg *ret){	struct evt_subscription *subscription;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -