📄 subscr.c
字号:
if (!found) return; /* Cancel subscription timer (if used), then delete subscription */ if (sub->timeout != TIPC_WAIT_FOREVER) { sub->timeout = TIPC_WAIT_FOREVER; spin_unlock_bh(subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); spin_lock_bh(subscriber->lock); } dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n", sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); subscr_del(sub);}/** * subscr_subscribe - create subscription for subscriber * * Called with subscriber locked */static void subscr_subscribe(struct tipc_subscr *s, struct subscriber *subscriber){ struct subscription *sub; /* Determine/update subscriber's endianness */ if (s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)) subscriber->swap = 0; else subscriber->swap = 1; /* Detect & process a subscription cancellation request */ if (s->filter & htohl(TIPC_SUB_CANCEL, subscriber->swap)) { s->filter &= ~htohl(TIPC_SUB_CANCEL, subscriber->swap); subscr_cancel(s, subscriber); return; } /* Refuse subscription if global limit exceeded */ if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) { warn("Subscription rejected, subscription limit reached (%u)\n", tipc_max_subscriptions); subscr_terminate(subscriber); return; } /* Allocate subscription object */ sub = kzalloc(sizeof(*sub), GFP_ATOMIC); if (!sub) { warn("Subscription rejected, no memory\n"); subscr_terminate(subscriber); return; } /* Initialize subscription object */ sub->seq.type = htohl(s->seq.type, subscriber->swap); sub->seq.lower = htohl(s->seq.lower, subscriber->swap); sub->seq.upper = htohl(s->seq.upper, subscriber->swap); sub->timeout = htohl(s->timeout, subscriber->swap); sub->filter = htohl(s->filter, subscriber->swap); if ((!(sub->filter & TIPC_SUB_PORTS) == !(sub->filter & TIPC_SUB_SERVICE)) || (sub->seq.lower > sub->seq.upper)) { warn("Subscription rejected, illegal request\n"); kfree(sub); subscr_terminate(subscriber); return; } memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); INIT_LIST_HEAD(&sub->subscription_list); INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); atomic_inc(&topsrv.subscription_count); if (sub->timeout != TIPC_WAIT_FOREVER) { k_init_timer(&sub->timer, (Handler)subscr_timeout, (unsigned long)sub); k_start_timer(&sub->timer, sub->timeout); } sub->owner = subscriber; tipc_nametbl_subscribe(sub);}/** * subscr_conn_shutdown_event - handle termination request from subscriber */static void subscr_conn_shutdown_event(void *usr_handle, u32 portref, struct sk_buff **buf, unsigned char const *data, unsigned int size, int reason){ struct subscriber *subscriber; spinlock_t *subscriber_lock; subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); if (subscriber == NULL) return; subscriber_lock = subscriber->lock; subscr_terminate(subscriber); spin_unlock_bh(subscriber_lock);}/** * subscr_conn_msg_event - handle new subscription request from subscriber */static void subscr_conn_msg_event(void *usr_handle, u32 port_ref, struct sk_buff **buf, const unchar *data, u32 size){ struct subscriber *subscriber; spinlock_t *subscriber_lock; subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); if (subscriber == NULL) return; subscriber_lock = subscriber->lock; if (size != sizeof(struct tipc_subscr)) subscr_terminate(subscriber); else subscr_subscribe((struct tipc_subscr *)data, subscriber); spin_unlock_bh(subscriber_lock);}/** * subscr_named_msg_event - handle request to establish a new subscriber */static void subscr_named_msg_event(void *usr_handle, u32 port_ref, struct sk_buff **buf, const unchar *data, u32 size, u32 importance, struct tipc_portid const *orig, struct tipc_name_seq const *dest){ struct subscriber *subscriber; struct iovec msg_sect = {NULL, 0}; spinlock_t *subscriber_lock; dbg("subscr_named_msg_event: orig = %x own = %x,\n", orig->node, tipc_own_addr); if (size && (size != sizeof(struct tipc_subscr))) { warn("Subscriber rejected, invalid subscription size\n"); return; } /* Create subscriber object */ subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC); if (subscriber == NULL) { warn("Subscriber rejected, no memory\n"); return; } INIT_LIST_HEAD(&subscriber->subscription_list); INIT_LIST_HEAD(&subscriber->subscriber_list); subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock); if (subscriber->ref == 0) { warn("Subscriber rejected, reference table exhausted\n"); kfree(subscriber); return; } /* Establish a connection to subscriber */ tipc_createport(topsrv.user_ref, (void *)(unsigned long)subscriber->ref, importance, NULL, NULL, subscr_conn_shutdown_event, NULL, NULL, subscr_conn_msg_event, NULL, &subscriber->port_ref); if (subscriber->port_ref == 0) { warn("Subscriber rejected, unable to create port\n"); tipc_ref_discard(subscriber->ref); kfree(subscriber); return; } tipc_connect2port(subscriber->port_ref, orig); /* Add subscriber to topology server's subscriber list */ tipc_ref_lock(subscriber->ref); spin_lock_bh(&topsrv.lock); list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); spin_unlock_bh(&topsrv.lock); /* * Subscribe now if message contains a subscription, * otherwise send an empty response to complete connection handshaking */ subscriber_lock = subscriber->lock; if (size) subscr_subscribe((struct tipc_subscr *)data, subscriber); else tipc_send(subscriber->port_ref, 1, &msg_sect); spin_unlock_bh(subscriber_lock);}int tipc_subscr_start(void){ struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; int res = -1; memset(&topsrv, 0, sizeof (topsrv)); spin_lock_init(&topsrv.lock); INIT_LIST_HEAD(&topsrv.subscriber_list); spin_lock_bh(&topsrv.lock); res = tipc_attach(&topsrv.user_ref, NULL, NULL); if (res) { spin_unlock_bh(&topsrv.lock); return res; } res = tipc_createport(topsrv.user_ref, NULL, TIPC_CRITICAL_IMPORTANCE, NULL, NULL, NULL, NULL, subscr_named_msg_event, NULL, NULL, &topsrv.setup_port); if (res) goto failed; res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); if (res) goto failed; spin_unlock_bh(&topsrv.lock); return 0;failed: err("Failed to create subscription service\n"); tipc_detach(topsrv.user_ref); topsrv.user_ref = 0; spin_unlock_bh(&topsrv.lock); return res;}void tipc_subscr_stop(void){ struct subscriber *subscriber; struct subscriber *subscriber_temp; spinlock_t *subscriber_lock; if (topsrv.user_ref) { tipc_deleteport(topsrv.setup_port); list_for_each_entry_safe(subscriber, subscriber_temp, &topsrv.subscriber_list, subscriber_list) { tipc_ref_lock(subscriber->ref); subscriber_lock = subscriber->lock; subscr_terminate(subscriber); spin_unlock_bh(subscriber_lock); } tipc_detach(topsrv.user_ref); topsrv.user_ref = 0; }}int tipc_ispublished(struct tipc_name const *name){ u32 domain = 0; return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -