📄 eventd.c
字号:
/* $Id: eventd.c,v 1.8 2004/11/22 20:06:42 gshi Exp $ *//* * eventd.c: source file for event daemon * * Copyright (C) 2004 Forrest,Zhao <forrest.zhao@intel.com> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <clplumbing/cl_signal.h>#include "event.h"int global_debug=0;int global_verbose=0;static struct sa_handle_database event_id_database;GHashTable *hash_table_for_ipc;GHashTable *hash_table_for_channel_name;typedef struct { SaEvtEventFilterT *filters; SaSizeT filtersNumber;}evt_filter_array;struct evt_event { SaEvtEventIdT event_id; /*globally unique */ SaTimeT retention_time; SaNameT publisherName; SaTimeT publish_time; SaUint8T priority; SaEvtEventPatternArrayT *pattern_array; SaSizeT pattern_size; void *event_data; SaSizeT data_size; SaEvtEventHandleT clt_event_hd;};struct evt_subscription { SaSizeT filters_size; SaEvtEventFilterArrayT *filters; IPC_Channel *client; SaEvtChannelHandleT clt_ch_handle; SaEvtSubscriptionIdT subscription_id; void *ch_id; /*at daemon side */};struct evt_new_subscription { char *channel_name; SaEvtEventFilterArrayT *filters; char *orig; /*at daemon side */ /*void *ch_id; */ SaUint64T ch_id; /*in fact it's a pointer, in order to portable between 32 bit and 64 bit platform, we define it as 64 bit length */ SaEvtSubscriptionIdT subscription_id;};struct evt_new_subscription_reply { char *channel_name; SaUint64T ch_id; /*at daemon side*/ SaEvtSubscriptionIdT subscription_id; struct evt_event *event;};struct channel_instance { GHashTable *subscriptions; SaEvtChannelHandleT clt_ch_handle; char *ch_name;};struct evt_channel { char *channel_name; int unlink; GHashTable *channel_instances; unsigned int use_count; GHashTable *event_cache;};struct ipc { IPC_Channel *client; GHashTable *channel_instances;};struct evt_ch_open_request { char *channel_name; SaEvtChannelHandleT clt_ch_handle; SaTimeT time_out; IPC_Channel *client; };struct evt_ch_open { char *channel_name; SaEvtChannelHandleT clt_ch_handle; SaEvtChannelOpenFlagsT ch_open_flags; SaTimeT time_out;};struct evt_ch_close { char *channel_name; void *ch_ins;};struct evt_retention_clear { SaEvtEventIdT event_id;};struct evt_retention_clear_reply { SaEvtEventIdT event_id; SaErrorT ret_code;};struct evt_ch_open_request_remote{ char *channel_name; SaUint64T key;};struct evt_ch_open_reply_remote{ char *channel_name; SaUint64T key;};struct client_msg{ enum evt_type msg_type; char *channel_name; union { struct evt_event *event; /*publish*/ struct evt_subscription *subscription; struct evt_ch_open *ch_open; struct evt_ch_close *ch_close; struct evt_retention_clear *retention_clear; struct evt_retention_clear_reply *retention_clear_reply; struct evt_new_subscription *new_subscription; struct evt_new_subscription_reply *new_sub_reply; struct evt_ch_open_request_remote *ch_open_request_remote; struct evt_ch_open_reply_remote *ch_open_reply_remote; } private;};struct node_element { uint NodeUuid; char NodeID[NODEIDSIZE]; char Status[STATUSSIZE]; };struct node_list_s { uint node_count; uint mynode; struct node_element nodes[MAXNODE];};struct node_list_s node_list;#define EVT_SERVICE "event_service"#define BIN_CONTENT "bin_content"void hton_64(const SaUint64T *src_64, SaUint64T *dst_64);void ntoh_64(const SaUint64T *src_64, SaUint64T *dst_64);static void add_node_l(const char *node, const char *status, const char *mynode){ int nodecount, mynode_idx, i, j; char value; nodecount = node_list.node_count; if (nodecount == 0) { mynode_idx = -1; } else { mynode_idx = node_list.mynode; } for ( i = 0 ; i < nodecount ; i++ ) { value = strncmp(node_list.nodes[i].NodeID, node, NODEIDSIZE); assert(value!=0); if(value > 0) { break; } } for ( j = nodecount; j>i; j-- ) { node_list.nodes[j] = node_list.nodes[j-1]; node_list.nodes[j].NodeUuid = j; } strncpy(node_list.nodes[i].NodeID, node, NODEIDSIZE); strncpy(node_list.nodes[i].Status, status, STATUSSIZE); node_list.nodes[i].NodeUuid = i; node_list.node_count++; if (strncmp(mynode, node, NODEIDSIZE) == 0) { node_list.mynode = i; } else if (mynode_idx != -1 && i <= mynode_idx) { node_list.mynode = mynode_idx+1; } return;}static voidLinkStatus(const char * node, const char * lnk, const char * status , void * private){ cl_log(LOG_INFO, "Link Status update: Link %s/%s " "now has status %s", node, lnk, status); return;}struct evt_info { ll_cluster_t *hb; struct evt_channel *evt_channel_head; GHashTable *evt_pending_ch_open_requests; GHashTable *evt_pending_clear_requests;};struct evt_info *info;void *evt_daemon_initialize(void);void *evt_daemon_initialize(){ ll_cluster_t* hb_fd; const char * node; const char * hname; const char * status; unsigned fmask; hb_fd = ll_cluster_new("heartbeat"); if (hb_fd->llc_ops->signon(hb_fd, "ccm")!= HA_OK) { cl_log(LOG_ERR, "Cannot sign on with heartbeat"); cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd)); return NULL; } if((hname = hb_fd->llc_ops->get_mynodeid(hb_fd)) == NULL) { cl_log(LOG_ERR, "get_mynodeid() failed"); return NULL; } if (hb_fd->llc_ops->set_ifstatus_callback(hb_fd, LinkStatus, NULL) !=HA_OK){ cl_log(LOG_ERR, "Cannot set if status callback"); cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd)); return NULL; } fmask = LLC_FILTER_DEFAULT; if (hb_fd->llc_ops->setfmode(hb_fd, fmask) != HA_OK) { cl_log(LOG_ERR, "Cannot set filter mode"); cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd)); return NULL; } if (hb_fd->llc_ops->init_nodewalk(hb_fd) != HA_OK) { cl_log(LOG_ERR, "Cannot start node walk"); cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd)); return NULL; } while((node = hb_fd->llc_ops->nextnode(hb_fd))!= NULL) { /* ignore non normal nodes */ if(strcmp(hb_fd->llc_ops->node_type(hb_fd, node), "normal") != 0) { if(strcmp(node,hname) == 0) { cl_log(LOG_ERR, "This cluster node: %s: " "is a ping node", node); return NULL; } continue; } status = hb_fd->llc_ops->node_status(hb_fd, node); if(global_debug) { cl_log(LOG_DEBUG, "Cluster node: %s: status: %s", node, status); } /* add the node to the node list */ add_node_l(node, status, hname); } if (hb_fd->llc_ops->end_nodewalk(hb_fd) != HA_OK) { cl_log(LOG_ERR, "Cannot end node walk"); cl_log(LOG_ERR, "REASON: %s", hb_fd->llc_ops->errmsg(hb_fd)); return NULL; } info = (struct evt_info *)g_malloc0(sizeof(struct evt_info)); info->evt_pending_ch_open_requests = g_hash_table_new(g_direct_hash, g_direct_equal); info->evt_pending_clear_requests = g_hash_table_new(g_direct_hash, g_direct_equal); event_id_database.handle_count = 0; return hb_fd;}typedef struct hb_usrdata_s { ll_cluster_t *hb_fd; GMainLoop *mainloop;} hb_usrdata_t;static void read_publish(void *msg, struct client_msg *ret){ struct evt_event *event; SaUint8T *tmp_char, *tmp_char_pattern; SaSizeT str_len, number, i; SaEvtEventPatternArrayT *pattern_array; SaEvtEventPatternT *patterns; event = (struct evt_event *)g_malloc(sizeof(struct evt_event)); ret->private.event = event; pattern_array = (SaEvtEventPatternArrayT *)g_malloc( sizeof(SaEvtEventPatternArrayT)); event->pattern_array = pattern_array; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); ret->channel_name = g_malloc(str_len+1); tmp_char += sizeof(SaSizeT); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&(event->clt_event_hd), tmp_char, sizeof(SaEvtEventHandleT)); tmp_char += sizeof(SaEvtEventHandleT); event->priority = *(tmp_char); tmp_char++; memcpy(&(event->retention_time), tmp_char, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); event->publisherName.length = str_len; memcpy(event->publisherName.value, tmp_char, str_len); /*event->publisherName[str_len] = '\0';*/ tmp_char += str_len; memcpy(&(event->publish_time), tmp_char, sizeof(SaTimeT)); tmp_char += sizeof(SaTimeT); memcpy(&(event->pattern_size), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); memcpy(&number, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); pattern_array->patternsNumber = number; patterns = (SaEvtEventPatternT *)g_malloc( sizeof(SaEvtEventPatternT)*number); pattern_array->patterns = patterns; tmp_char_pattern = tmp_char + number*sizeof(SaSizeT); for(i=0; i<number; i++){ memcpy(&(patterns[i].patternSize), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); patterns[i].pattern = g_malloc(patterns[i].patternSize); memcpy(patterns[i].pattern, tmp_char_pattern, patterns[i].patternSize); tmp_char_pattern += patterns[i].patternSize; } tmp_char = tmp_char_pattern; memcpy(&(event->data_size), tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); event->event_data = g_malloc(event->data_size); memcpy(event->event_data, tmp_char, event->data_size); return;}static void read_unsubscribe(void *msg, struct client_msg *ret){ struct evt_subscription *subscription; SaUint8T *tmp_char; SaSizeT str_len; subscription = (struct evt_subscription *)g_malloc( sizeof(struct evt_subscription)); ret->private.subscription = subscription; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&(subscription->ch_id), tmp_char, sizeof(void *)); tmp_char += sizeof(void *); memcpy(&(subscription->subscription_id), tmp_char, sizeof(SaEvtSubscriptionIdT)); return;}static void read_open_channel(void *msg, struct client_msg *ret){ SaUint8T *tmp_char; SaSizeT str_len; struct evt_ch_open *ch_open; ch_open = (struct evt_ch_open *)g_malloc(sizeof(struct evt_ch_open)); ret->private.ch_open = ch_open; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); ch_open->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); memcpy(ch_open->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; ch_open->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&(ch_open->clt_ch_handle), tmp_char, sizeof(SaEvtChannelHandleT)); tmp_char += sizeof(SaEvtChannelHandleT); memcpy(&(ch_open->ch_open_flags), tmp_char, sizeof(SaEvtChannelOpenFlagsT)); return;}static void read_close_channel(void *msg, struct client_msg *ret){ SaUint8T *tmp_char; SaSizeT str_len; struct evt_ch_close *ch_close; ch_close = (struct evt_ch_close *)g_malloc( sizeof(struct evt_ch_close)); ret->private.ch_close = ch_close; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&(ch_close->ch_ins), tmp_char, sizeof(void *)); return;}static void read_clear_retention_time(void *msg, struct client_msg *ret){ struct evt_retention_clear *retention_clear; SaUint8T *tmp_char; SaSizeT str_len; retention_clear = (struct evt_retention_clear *)g_malloc( sizeof(struct evt_retention_clear)); ret->private.retention_clear = retention_clear; tmp_char = (SaUint8T *)msg; tmp_char++; memcpy(&str_len, tmp_char, sizeof(SaSizeT)); tmp_char += sizeof(SaSizeT); ret->channel_name = g_malloc(str_len+1); memcpy(ret->channel_name, tmp_char, str_len); ret->channel_name[str_len] = '\0'; tmp_char += str_len; memcpy(&(retention_clear->event_id), tmp_char, sizeof(SaEvtEventIdT)); return;}static void read_subscribe(IPC_Channel *ch, void *msg, struct client_msg *ret){ struct evt_subscription *subscription;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -