📄 nea_server.c
字号:
continue; break; /* This */ } if (s) { nea_event_queue_t *evq0 = su_alloc(nes->nes_home, sizeof *evq); if (evq0 == NULL) return -1; *evq0 = *evq, evq = evq0; /* evq should be copy of old head but with changed payload */ assert(evq->evq_next == evv->evv_head->evq_next); evv->evv_head->evq_next = evq; /* insert to the queue */ return 0; } su_free(nes->nes_home, (void *)evq->evq_payload); return 0;}/** Remove old unneeded notifications. */staticint nea_view_dequeue(nea_server_t *nes, nea_event_t *ev){ int i; nea_event_view_t *evv; nea_event_queue_t **prev, *evq;; assert(nes && ev); for (i = 0; ev->ev_views[i]; i++) { for (evv = ev->ev_views[i]; evv; evv = evv->evv_next) { if (!evv->evv_reliable) continue; for (prev = &evv->evv_head->evq_next; *prev; prev = &(*prev)->evq_next) if (ev->ev_throttling >= (*prev)->evq_updated) break; /* Free from evq onwards */ for (evq = *prev; evq; evq = *prev) { *prev = evq->evq_next; su_free(nes->nes_home, evq->evq_payload); su_free(nes->nes_home, evq); } } } return 0;}/* ----------------------------------------------------------------- *//** Notify watchers. * * @return * The function nea_server_notify() returns number of subscribers that the * notification could be sent, or -1 upon an error. */int nea_server_notify(nea_server_t *nes, nea_event_t *ev){ sip_time_t now = sip_now(); nea_sub_t *s; int notified = 0, throttled = nes->nes_throttled; SU_DEBUG_7(("nea_server_notify(%p): %s\n", nes, ev ? ev->ev_event->o_type: "")); ++nes->nes_in_list; nes->nes_throttled = 0; if (ev == NULL) for (ev = nes->nes_events; ev; ev = ev->ev_next) ev->ev_throttling = UINT_MAX; else ev->ev_throttling = UINT_MAX; for (s = nes->nes_subscribers; s; s = s->s_next) { if ((ev == NULL || ev == s->s_event) && s->s_state != nea_terminated) { notified += nea_sub_notify(nes, s, now, TAG_END()); } } if (throttled) { /* Dequeue throttled updates */ if (ev == NULL) for (ev = nes->nes_events; ev; ev = ev->ev_next) { nea_view_dequeue(nes, ev); SU_DEBUG_3(("nea_server(): notified %u, throttling at %u\n", notified, ev->ev_throttling)); } else { SU_DEBUG_3(("nea_server(): notified %u, throttling at %u\n", notified, ev->ev_throttling)); nea_view_dequeue(nes, ev); } } if (--nes->nes_in_list == 0 && nes->nes_pending_flush) nea_server_pending_flush(nes); return notified;}/* ----------------------------------------------------------------- */void nea_server_flush(nea_server_t *nes, nea_event_t *event){ nea_sub_t *s, **ss; sip_time_t now; if (nes == NULL) return; now = sip_now(); for (ss = &nes->nes_subscribers; (s = *ss);) { if ((event == NULL || s->s_event == event) && (s->s_state == nea_terminated || s->s_expires < now)) { /** On first flush, mark as garbage, remove on second flush */ if (!s->s_garbage) s->s_garbage = 1; else if (nes->nes_in_callback || nes->nes_in_list) { nes->nes_pending_flush = 1; (*ss)->s_pending_flush = 1; } else { nea_sub_destroy(*ss); continue; } } ss = &((*ss)->s_next); }}/* ----------------------------------------------------------------- */staticvoid nea_server_pending_flush(nea_server_t *nes){ nea_sub_t **ss; for (ss = &nes->nes_subscribers; *ss;) { if ((*ss)->s_pending_flush && !(*ss)->s_processing) { nea_sub_destroy(*ss); } else { ss = &((*ss)->s_next); } } nes->nes_pending_flush = 0;}/* ----------------------------------------------------------------- */nea_sub_t *nea_sub_create(nea_server_t *nes){ nea_sub_t *s; assert(nes); s = su_zalloc(nes->nes_home, sizeof (*s)); if (s) { s->s_nes = nes; if ((s->s_next = nes->nes_subscribers)) s->s_next->s_prev = &s->s_next; s->s_prev = &nes->nes_subscribers; nes->nes_subscribers = s; /* Copy default values */ s->s_throttle = nes->nes_throttle; } return s;}/* ----------------------------------------------------------------- */nta_incoming_t *nea_subnode_get_incoming(nea_subnode_t *sn){ assert(sn); if (sn->sn_subscriber) { return sn->sn_subscriber->s_irq; } return NULL;}/* ----------------------------------------------------------------- */void nea_sub_remove(nea_sub_t *s){ if (s) { assert(s->s_prev); if ((*s->s_prev = s->s_next)) s->s_next->s_prev = s->s_prev; s->s_prev = NULL; s->s_next = NULL; }}/* ----------------------------------------------------------------- *//**Check if subscriber has been removed from list */static int nea_sub_is_removed(nea_sub_t const *s){ return s->s_prev == NULL;}/* ----------------------------------------------------------------- */void nea_sub_destroy(nea_sub_t *s){ if (s) { nea_sub_t *del = s; su_home_t *home = del->s_nes->nes_home; if (!nea_sub_is_removed(del)) nea_sub_remove(del); del->s_event = NULL; su_free(home, del->s_local), del->s_local = NULL; su_free(home, del->s_remote), del->s_remote = NULL; if (del->s_oreq) nta_outgoing_destroy(del->s_oreq), del->s_oreq = NULL; if (del->s_leg) nta_leg_destroy(del->s_leg), del->s_leg = NULL; if (del->s_from) su_free(home, del->s_from), del->s_from = NULL; su_free(home, del); }}/** Create a new event. * * The function nea_event_create() creates a new event for the event server. */nea_event_t *nea_event_create(nea_server_t *nes, nea_watcher_f *callback, nea_emagic_t *context, char const *name, char const *subname, char const *default_content_type, char const *accept){ return nea_event_tcreate(nes, callback, context, name, subname, SIPTAG_CONTENT_TYPE_STR(default_content_type), SIPTAG_ACCEPT_STR(accept), TAG_END());}/** Create a new event (or subevent) with tags */nea_event_t *nea_event_tcreate(nea_server_t *nes, nea_watcher_f *callback, nea_emagic_t *context, char const *name, char const *subname, tag_type_t tag, tag_value_t value, ...){ nea_event_t *ev, **pev; int len = strlen(name); ta_list ta; /* Find a matching event */ if (subname == NULL) { for (pev = &nes->nes_events; (ev = *pev); pev = &(*pev)->ev_next) { if (strcmp(ev->ev_event->o_type, name) != 0) continue; SU_DEBUG_5(("nea_event_create(): already event %s\n", name)); return NULL; } } else { for (pev = &nes->nes_events; (ev = *pev); pev = &(*pev)->ev_next) { if (strncmp(ev->ev_event->o_type, name, len) != 0 || ev->ev_event->o_type[len] != '.' || strcmp(subname, ev->ev_event->o_type + len + 1) != 0) continue; SU_DEBUG_5(("nea_event_create(): already event %s.%s\n", name, subname)); return NULL; } } ta_start(ta, tag, value); ev = su_zalloc(nes->nes_home, sizeof (*ev)); if (ev) { int reliable = 0; sip_content_type_t const *ct = NULL; sip_accept_t const *ac = NULL; sip_supported_t const *k = NULL; sip_require_t const *rq = NULL; char const *ct_str = NULL, *ac_str = NULL, *k_str = NULL, *rq_str = NULL; unsigned throttle = nes->nes_throttle, min_throttle = nes->nes_min_throttle; int eventlist = nes->nes_eventlist; tl_gets(ta_args(ta), NEATAG_RELIABLE_REF(reliable), NEATAG_THROTTLE_REF(throttle), NEATAG_MINTHROTTLE_REF(min_throttle), NEATAG_EVENTLIST_REF(eventlist), SIPTAG_CONTENT_TYPE_REF(ct), SIPTAG_CONTENT_TYPE_STR_REF(ct_str), SIPTAG_ACCEPT_REF(ac), SIPTAG_ACCEPT_STR_REF(ac_str), SIPTAG_SUPPORTED_REF(k), SIPTAG_SUPPORTED_STR_REF(k_str), SIPTAG_REQUIRE_REF(rq), SIPTAG_REQUIRE_STR_REF(rq_str), TAG_END()); ev->ev_callback = callback; ev->ev_magic = context; ev->ev_event = sip_event_format(nes->nes_home, "%s%s%s", name, subname ? "." : "", subname ? subname : ""); ev->ev_reliable = reliable != 0; ev->ev_throttle = throttle; ev->ev_min_throttle = min_throttle; ev->ev_eventlist = eventlist; if (eventlist && rq == NULL && rq_str == NULL) rq_str = "eventlist"; if (rq) ev->ev_require = sip_require_dup(nes->nes_home, rq); else if (rq_str) ev->ev_require = sip_require_make(nes->nes_home, rq_str); if (ev->ev_event) {#define sip_allow_events_find(k, i) sip_params_find(k->k_items, i) if (!sip_allow_events_find(nes->nes_allow_events, ev->ev_event->o_type)) sip_allow_events_add(nes->nes_home, nes->nes_allow_events, ev->ev_event->o_type); } if (ct) ev->ev_default = sip_accept_make(nes->nes_home, ct->c_type); else ev->ev_default = sip_accept_make(nes->nes_home, ct_str); if (ac == NULL && ac_str == NULL) ac_str = ct ? ct->c_type : ct_str; if (ac) ev->ev_accept = sip_accept_dup(nes->nes_home, ac); else ev->ev_accept = sip_accept_make(nes->nes_home, ac_str ? ac_str : ""); if (k) ev->ev_supported = sip_supported_dup(nes->nes_home, k); else if (k_str) ev->ev_supported = sip_supported_make(nes->nes_home, k_str); ev->ev_prev = pev; *pev = ev; } ta_end(ta); return ev;}/* ----------------------------------------------------------------- *//** Return magic context bound to nea_event. * * The function returns the magic context bound to the event. * * @param ev pointer to event object * * @return * The function nea_emagic_get() returns the magic context * bound to the event. */nea_emagic_t *nea_emagic_get(nea_event_t *ev){ assert(ev); return ev->ev_magic;}/* ----------------------------------------------------------------- *//** Get named event */nea_event_t *nea_event_get(nea_server_t const *nes, char const *e){ nea_event_t *ev = NULL; for (ev = nes->nes_events; ev; ev = ev->ev_next) if (e == NULL || strcmp(ev->ev_event->o_type, e) == 0) break; return ev;}/* ----------------------------------------------------------------- */nta_incoming_t *nea_sub_get_request(nea_sub_t *sub){ assert(sub); return sub->s_irq;}/** Invoke the event callback. * * The function nes_watcher_callback() calls the callback provided by the * application using the notifier object. * * @param nes pointer to notifier object * @param ev pointer to event view * @param s pointer to subscription object * @param sip pointer to subscribe request * * @return * The function nes_watcher_callback() returns -1 if the notifier object * has been destroyed by the callback function, 0 otherwise. */staticint nes_watcher_callback(nea_server_t *nes, nea_event_t *ev, nea_sub_t *s, sip_t const *sip){ if (!nes->nes_in_callback) { nes->nes_in_callback = 1; if (ev->ev_callback) { nea_subnode_t sn[1]; nea_subnode_init(sn, s, sip_now()); ev->ev_callback(nes, ev->ev_magic, ev, sn, sip); } nes->nes_in_callback = 0; if (nes->nes_in_list) return 0; if (nes->nes_pending_destroy) { nea_server_destroy(nes); return -2; } if (sip == NULL && nes->nes_pending_flush) { int flushed = s->s_pending_flush; nea_server_pending_flush(nes); if (flushed) return -1; } } return 0;}/* ----------------------------------------------------------------- */#if 0/** Process incoming SUBSCRIBE message. * * The function nea_server_add() is called when the notifier receives a * SUBSCRIBE request without existing event dialog. *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -