📄 nea_server.c
字号:
SIPTAG_REQUIRE(ev->ev_require), SIPTAG_SUPPORTED(ev->ev_supported), SIPTAG_EXPIRES(expires), SIPTAG_CONTACT(s->s_local), TAG_END()); nta_incoming_destroy(irq), s->s_irq = irq = NULL; } /* Callback for checking subscriber authorization */ if (nes_watcher_callback(nes, ev, s, sip) < 0) { if (irq) { nta_incoming_treply(irq, SIP_503_SERVICE_UNAVAILABLE, TAG_END()); nta_incoming_destroy(irq); } return -1; } evv = s->s_view; /* Callback can change event view */ if (s->s_state == nea_embryonic) nea_sub_auth(s, nea_pending, NEATAG_FAKE(1), TAG_END()); if (s->s_updated != evv->evv_updated && !(irq && s->s_rejected)) nea_sub_notify(nes, s, now, TAG_END()); if (irq) { if (s->s_rejected) nta_incoming_treply(irq, SIP_403_FORBIDDEN, SIPTAG_SERVER_STR(nes->nes_server), TAG_END()); else if (s->s_state == nea_active) nta_incoming_treply(irq, SIP_200_OK, SIPTAG_REQUIRE(ev->ev_require), SIPTAG_SUPPORTED(ev->ev_supported), SIPTAG_EXPIRES(expires), SIPTAG_SERVER_STR(nes->nes_server), SIPTAG_CONTACT(s->s_local), SIPTAG_ALLOW_EVENTS(nes->nes_allow_events), SIPTAG_ALLOW(nes->nes_allow_methods), TAG_END()); else nta_incoming_treply(irq, SIP_202_ACCEPTED, SIPTAG_REQUIRE(ev->ev_require), SIPTAG_SUPPORTED(ev->ev_supported), SIPTAG_EXPIRES(expires), SIPTAG_SERVER_STR(nes->nes_server), SIPTAG_ALLOW_EVENTS(nes->nes_allow_events), SIPTAG_ALLOW(nes->nes_allow_methods), SIPTAG_CONTACT(s->s_local), TAG_END()); } return 0;}/* ----------------------------------------------------------------- *//**Notify subscriber * * The function nea_sub_notify() sends a notification to the subscriber. The * event type is specified by subscriber event, payload type and payload in * the event view. The responses to the NOTIFY transaction are * processed by response_to_notify(). * * @param nes pointer to the notifier object * @param s pointer to the subscription object * @param now current SIP time (if 0, no body is sent, * but updated Subscription-State header only * @param tag,value,... tag list * */int nea_sub_notify(nea_server_t *nes, nea_sub_t *s, sip_time_t now, tag_type_t tag, tag_value_t value, ...){ int notified = 0; ta_list ta; int subscription_state_change = now == 0; nea_event_t *ev = s->s_event; nea_state_t substate = s->s_state; if (s->s_pending_flush || (s->s_oreq && substate != nea_terminated)) { if (ev && ev->ev_throttling > s->s_updated) ev->ev_throttling = s->s_updated; return 0; } if (s->s_oreq) nta_outgoing_destroy(s->s_oreq), s->s_oreq = NULL; assert(s->s_view); assert(ev); if (!subscription_state_change && s->s_view->evv_updated == s->s_updated) return 0; if (now == 0) now = sip_now(); if (s->s_notified + s->s_throttle > now && /* Do not throttle state termination notification */ substate != nea_terminated && (long)(s->s_expires - now) > 0) { if (ev->ev_throttling > s->s_updated && !s->s_fake) ev->ev_throttling = s->s_updated; nes->nes_throttled++; return 0; } ta_start(ta, tag, value); { sip_subscription_state_t ss[1]; char expires[32]; sip_param_t params[] = { NULL, NULL, NULL }; char const *reason = NULL; int fake = 0; char reason_buf[64]; unsigned retry_after = (unsigned)-1; char retry_after_buf[64]; int i = 0; nta_response_f *callback; nea_event_view_t *evv = s->s_view; nea_event_queue_t *evq, *n_evq; assert(ev); sip_subscription_state_init(ss); tl_gets(ta_args(ta), NEATAG_REASON_REF(reason), NEATAG_FAKE_REF(fake), /* XXX - semantics??? */ NEATAG_RETRY_AFTER_REF(retry_after), TAG_END()); if (substate == nea_terminated) { if (reason) snprintf(reason_buf, sizeof(reason_buf), "reason=%s", reason), params[i++] = reason_buf; if (retry_after != (unsigned)-1) snprintf(retry_after_buf, sizeof(retry_after_buf), "retry-after=%u", retry_after), params[i++] = retry_after_buf; } else if ((long)(s->s_expires - now) <= 0) { substate = nea_terminated; params[i++] = "reason=timeout"; } else { snprintf(expires, sizeof(expires), "expires=%lu", (unsigned long)(s->s_expires - now)); params[i++] = expires; } ss->ss_params = params; switch (substate) { case nea_extended: ss->ss_substate = s->s_extended; break; case nea_pending: ss->ss_substate = "pending"; break; case nea_active: ss->ss_substate = "active"; break; case nea_terminated: ss->ss_substate = "terminated"; break; /* Do not send notifys for embryonic subscriptions */ case nea_embryonic: ta_end(ta); return 0; } callback = substate != nea_terminated ? response_to_notify : NULL; for (evq = evv->evv_head; evq->evq_next; evq = evq->evq_next) { if (evq->evq_next->evq_updated <= s->s_updated) break; } subscription_state_change = (s->s_view->evv_updated == s->s_updated); n_evq = evq->evq_payload ? evq : evv->evv_primary->evv_head; s->s_oreq = nta_outgoing_tcreate(s->s_leg, callback, s, NULL, SIP_METHOD_NOTIFY, NULL, SIPTAG_SUBSCRIPTION_STATE(ss), SIPTAG_REQUIRE(ev->ev_require), SIPTAG_SUPPORTED(ev->ev_supported), SIPTAG_USER_AGENT_STR(nes->nes_server), SIPTAG_CONTACT(s->s_local), SIPTAG_EVENT(s->s_id), TAG_IF(!subscription_state_change, SIPTAG_CONTENT_TYPE(n_evq->evq_content_type)), TAG_IF(!subscription_state_change, SIPTAG_PAYLOAD(n_evq->evq_payload)), ta_tags(ta)); notified = s->s_oreq != 0; if (notified) { s->s_notified = now; s->s_state = substate; /* XXX - we need state for "waiting" */ s->s_latest = evq->evq_version; /* Application version */ s->s_updated = evq->evq_updated; /* Internal version */ if (ev->ev_throttling > s->s_updated) ev->ev_throttling = s->s_updated; } if (callback == NULL) { nta_outgoing_destroy(s->s_oreq), s->s_oreq = NULL; /* Inform the application of a subscriber leaving the subscription. */ nes_watcher_callback(nes, ev, s, NULL); } } ta_end(ta); return notified;}/* ----------------------------------------------------------------- *//**Process responses to the NOTIFY. * * The response_to_notify() processes the responses to the NOTIFY request. * If there was an error with delivering the NOTIFY, the subscription is * considered terminated. * * @param s pointer to subscription object */int response_to_notify(nea_sub_t *s, nta_outgoing_t *oreq, sip_t const *sip){ nea_server_t *nes = s->s_nes; int status = sip->sip_status->st_status; if (status < 200) return 0; nta_outgoing_destroy(s->s_oreq), s->s_oreq = NULL; if (status < 300) { if (s->s_view->evv_updated != s->s_updated) { sip_time_t now = sip_now(); if (s->s_notified + s->s_throttle <= now) nea_sub_notify(nes, s, now, TAG_END()); else nes->nes_throttled++; } } if (s->s_state == nea_terminated || status >= 300) { SU_DEBUG_5(("nea_server: removing subscriber " URL_PRINT_FORMAT "\n", URL_PRINT_ARGS(s->s_from->a_url))); /* Inform the application of a subscriber leaving the subscription. */ nes_watcher_callback(nes, s->s_event, s, NULL); } return 0;}/* ----------------------------------------------------------------- *//** Get number of active subscribers. * * The function nea_server_active() counts the number of active subscribers * watching the specified view. If the view is not specified (@a ev is @c * NULL), it counts the number of all subscribers. * * @param nes notifier * @param ev event * * The function nea_server_active() returns number of active subscribers. */int nea_server_active(nea_server_t *nes, nea_event_t const *ev){ int n = 0; nea_sub_t *s = NULL; /* Count the number of subscribers watching this event */ for (s = nes->nes_subscribers; s ; s = s->s_next) if (!s->s_pending_flush && s->s_state == nea_active && (ev == NULL || ev == s->s_event)) n++; return n;}/** Get number of non-embryonic subscribers. * * The function nea_server_non_embryonic() counts the number of pending, * active or terminated subscribers watching the specified view. If the view * is not specified (@a ev is @c NULL), it counts the number of all * subscribers. * * @param nes notifier * @param ev event view * * The function nea_server_active() returns number of active subscribers. */int nea_server_non_embryonic(nea_server_t *nes, nea_event_t const *ev){ int n = 0; nea_sub_t *s = NULL; /* Count the number of subscribers watching this event */ for (s = nes->nes_subscribers; s ; s = s->s_next) if (!s->s_pending_flush && s->s_state != nea_embryonic && (ev == NULL || ev == s->s_event)) n++; return n;}/** Set application version number */int nea_sub_version(nea_sub_t *s, unsigned version){ if (s) return s->s_version = version; return 0;}/** Authorize a subscription. * * Application can modify the subscription state and authorize the user. * The subscription state has following simple state diagram: * * @code * +---------------+ +------------------+ * | | | | * +-----------+ | +---------+ V | +------------+ V +------------+ * | embryonic |-+->| pending |--+-+->| authorized |--+->| terminated | * +-----------+ +---------+ +------------+ +------------+ * * @endcode * * @TAGS * IF NEATAG_VIEW(view) is included in tagged arguments, @a view is assigned * to the subscriber and the content from the view is delivered to the * subscriber. * * If NEATAG_FAKE(1) is included in tags, content marked as 'fake' is * delivered to the subscriber. * * @retval 0 if successful * @retval -1 upon an error */int nea_sub_auth(nea_sub_t *s, nea_state_t state, tag_type_t tag, tag_value_t value, ...){ ta_list ta; int retval, embryonic, rejected = 0; int fake = 0; char const *reason = NULL; nea_event_view_t *evv = NULL; if (s == NULL) return -1; if (state == nea_embryonic) return -1; if (state < s->s_state) return -1; ta_start(ta, tag, value); embryonic = s->s_state == nea_embryonic; s->s_state = state; if (tl_gets(ta_args(ta), NEATAG_VIEW_REF(evv), TAG_END()) && evv) { nea_sub_assign_view(s, evv); } else { if (tl_gets(ta_args(ta), NEATAG_FAKE_REF(fake), TAG_END())) s->s_fake = fake; if (s->s_view && s->s_view->evv_fake != s->s_fake) { for (evv = s->s_view->evv_primary; evv; evv = evv->evv_next) if (!evv->evv_private && evv->evv_fake == s->s_fake) { nea_sub_assign_view(s, evv); break; } } } tl_gets(ta_args(ta), NEATAG_REASON_REF(reason), TAG_END()); rejected = reason && strcasecmp(reason, "rejected") == 0; if (state == nea_terminated && embryonic && rejected && s->s_irq) retval = 0, s->s_rejected = 1; else retval = nea_sub_notify(s->s_nes, s, 0, ta_tags(ta)); ta_end(ta); return retval;}/** Obtain a list of subscribers */nea_subnode_t const **nea_server_get_subscribers(nea_server_t *nes, nea_event_t const *ev){ nea_sub_t *s; nea_subnode_t **sn_list, *sn; int i, n; sip_time_t now = sip_now(); n = nea_server_non_embryonic(nes, ev); if (n == 0) return NULL; sn_list = su_zalloc(nes->nes_home, (n + 1) * sizeof(sn) + n * sizeof(*sn)); if (sn_list) { sn = (nea_subnode_t *)(sn_list + n + 1); for (i = 0, s = nes->nes_subscribers; s; s = s->s_next) { if (!s->s_pending_flush && s->s_state != nea_embryonic && (ev == NULL || ev == s->s_event)) { assert(i < n); nea_subnode_init(sn, s, now); sn_list[i++] = sn++; } } nes->nes_in_list++; sn_list[i] = NULL; } return (nea_subnode_t const **)sn_list;}/** Free a list of subscriptions. */void nea_server_free_subscribers(nea_server_t *nes, nea_subnode_t const **sn_list){ if (sn_list) { su_free(nes->nes_home, (void *)sn_list); if (--nes->nes_in_list == 0 && nes->nes_pending_flush) nea_server_pending_flush(nes); }}/* ----------------------------------------------------------------- */void nes_event_timer(nea_server_t *srvr, su_timer_t *timer, su_timer_arg_t *arg){ nea_server_t *nes = (nea_server_t *) arg; sip_time_t now = sip_now(); nea_sub_t *s = NULL, *s_next = NULL; su_timer_set(timer, nes_event_timer, nes); nes->nes_in_list++; /* Notify and terminate expired subscriptions */ for (s = nes->nes_subscribers; s; s = s_next) { s_next = s->s_next; if (s->s_state == nea_terminated) continue; if ((int)(now - s->s_expires) >= 0) nea_sub_notify(nes, s, now, TAG_END()); } if (--nes->nes_in_list == 0 && nes->nes_pending_flush) nea_server_pending_flush(nes); if (nes->nes_throttled) nea_server_notify(nes, NULL); return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -