📄 su_source.c
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//** * @file su_source.c * @brief Wrapper for glib GSource. * * @author Pekka Pessi <Pekka.Pessi@nokia.com>. * * @date Created: Thu Mar 4 15:15:15 2004 ppessi * */#if HAVE_CONFIG_H#include "config.h"#endif/* Use Posix stuff */#define _XOPEN_SOURCE (500)#include <stdlib.h>#include <assert.h>#include <stdio.h>#include <string.h>#include <limits.h>#include <glib.h>#define SU_PORT_IMPLEMENTATION 1#define SU_MSG_ARG_T union { char anoymous[4]; }#define su_port_s su_source_s#include "sofia-sip/su_source.h"#include "sofia-sip/su.h"#include "su_port.h"#include "sofia-sip/su_alloc.h"static su_port_t *su_source_create(void) __attribute__((__malloc__));static gboolean su_source_prepare(GSource *gs, gint *return_tout);static gboolean su_source_check(GSource *gs);static gboolean su_source_dispatch(GSource *gs, GSourceFunc callback, gpointer user_data);static void su_source_finalize(GSource *source);static int su_source_getmsgs(su_port_t *self);staticGSourceFuncs su_source_funcs[1] = {{ su_source_prepare, su_source_check, su_source_dispatch, su_source_finalize, NULL, NULL }};static void su_source_lock(su_port_t *self, char const *who);static void su_source_unlock(su_port_t *self, char const *who);static void su_source_incref(su_port_t *self, char const *who);static void su_source_decref(su_port_t *self, int blocking, char const *who);static struct _GSource *su_source_gsource(su_port_t *port);static int su_source_send(su_port_t *self, su_msg_r rmsg);static int su_source_register(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg, int priority);static int su_source_unregister(su_port_t *port, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg);static int su_source_deregister(su_port_t *self, int i);static int su_source_unregister_all(su_port_t *self, su_root_t *root);static int su_source_eventmask(su_port_t *self, int index, int socket, int events);static void su_source_run(su_port_t *self);static void su_source_break(su_port_t *self);static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);static int su_source_own_thread(su_port_t const *port);static int su_source_add_prepoll(su_port_t *port, su_root_t *root, su_prepoll_f *, su_prepoll_magic_t *);static int su_source_remove_prepoll(su_port_t *port, su_root_t *root);static su_timer_t **su_source_timers(su_port_t *port);static int su_source_multishot(su_port_t *self, int multishot);static int su_source_threadsafe(su_port_t *port);staticsu_port_vtable_t const su_source_vtable[1] = {{ /* su_vtable_size: */ sizeof su_source_vtable, su_source_lock, su_source_unlock, su_source_incref, su_source_decref, su_source_gsource, su_source_send, su_source_register, su_source_unregister, su_source_deregister, su_source_unregister_all, su_source_eventmask, su_source_run, su_source_break, su_source_step, su_source_own_thread, su_source_add_prepoll, su_source_remove_prepoll, su_source_timers, su_source_multishot, su_source_threadsafe }};/** * Port is a per-thread reactor. * * Multiple root objects executed by single thread share a su_port_t object. */struct su_source_s { su_home_t sup_home[1]; su_port_vtable_t const *sup_vtable; GThread *sup_tid; GStaticMutex sup_mutex[1]; GStaticRWLock sup_ref[1]; GSource *sup_source; GMainLoop *sup_main_loop; /* Message list - this is protected by lock */ su_msg_t *sup_head; su_msg_t **sup_tail; /* Waits */ unsigned sup_registers; /** Counter incremented by su_port_register() or su_port_unregister() */ unsigned sup_n_waits; unsigned sup_size_waits; unsigned sup_max_index; unsigned *sup_indices; su_wait_t *sup_waits; su_wakeup_f *sup_wait_cbs; su_wakeup_arg_t**sup_wait_args; su_root_t **sup_wait_roots; /* Timer list */ su_timer_t *sup_timers;};typedef struct _SuSource{ GSource ss_source[1]; su_port_t ss_port[1];} SuSource;#define SU_SOURCE_OWN_THREAD(p) ((p)->sup_tid == g_thread_self())#if 1#define SU_SOURCE_INCREF(p, f) (g_source_ref(p->sup_source))#define SU_SOURCE_DECREF(p, f) (g_source_unref(p->sup_source))#define SU_SOURCE_INITLOCK(p) (g_static_mutex_init((p)->sup_mutex))#define SU_SOURCE_LOCK(p, f) (g_static_mutex_lock((p)->sup_mutex))#define SU_SOURCE_UNLOCK(p, f) (g_static_mutex_unlock((p)->sup_mutex))#else/* Debugging versions */#define SU_SOURCE_INCREF(p, f) (g_source_ref(p->sup_source), printf("incref(%p) by %s\n", (p), f))#define SU_SOURCE_DECREF(p, f) do { printf("decref(%p) by %s\n", (p), f), \ g_source_unref(p->sup_source); } while(0)#define SU_SOURCE_INITLOCK(p) \ (g_static_mutex_init((p)->sup_mutex), printf("init_lock(%p)\n", p))#define SU_SOURCE_LOCK(p, f) \ (printf("%ld at %s locking(%p)...", g_thread_self(), f, p), g_static_mutex_lock((p)->sup_mutex), printf(" ...%ld at %s locked(%p)...", g_thread_self(), f, p))#define SU_SOURCE_UNLOCK(p, f) \ (g_static_mutex_unlock((p)->sup_mutex), printf(" ...%ld at %s unlocked(%p)\n", g_thread_self(), f, p))#endif#if HAVE_FUNC#define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))#elif HAVE_FUNCTION#define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))#else#define enter (void)0#endif/** Create a root that uses GSource as reactor */su_root_t *su_root_source_create(su_root_magic_t *magic){ return su_root_create_with_port(magic, su_source_create());}/**@internal * * Allocates and initializes a reactor and message port object. * * @return * If successful a pointer to the new message port is returned, otherwise * NULL is returned. */su_port_t *su_source_create(void){ SuSource *ss; SU_DEBUG_9(("su_source_create() called\n")); ss = (SuSource *)g_source_new(su_source_funcs, (sizeof *ss)); if (ss) { su_port_t *self = ss->ss_port; self->sup_vtable = su_source_vtable; self->sup_source = ss->ss_source; SU_SOURCE_INITLOCK(self); self->sup_tail = &self->sup_head; self->sup_tid = g_thread_self(); SU_DEBUG_9(("su_source_with_main_context() returns %p\n", self)); return self; } else { su_perror("su_source_with_main_context(): su_home_clone"); SU_DEBUG_9(("su_source_with_main_context() fails\n")); return NULL; }}/** @internal Destroy a port. */static void su_source_finalize(GSource *gs){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; assert(gs); SU_DEBUG_9(("su_source_finalize() called\n")); if (self->sup_waits) free(self->sup_waits), self->sup_waits = NULL; if (self->sup_wait_cbs) free(self->sup_wait_cbs), self->sup_wait_cbs = NULL; if (self->sup_wait_args) free(self->sup_wait_args), self->sup_wait_args = NULL; if (self->sup_wait_roots) free(self->sup_wait_roots), self->sup_wait_roots = NULL; if (self->sup_indices) free(self->sup_indices), self->sup_indices = NULL; su_home_deinit(self->sup_home);}/* Seconds from 1.1.1900 to 1.1.1970 */#define NTP_EPOCH 2208988800UL staticgboolean su_source_prepare(GSource *gs, gint *return_tout){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; enter; if (self->sup_head) return TRUE; *return_tout = -1; if (self->sup_timers) { su_time_t now; GTimeVal gtimeval; su_duration_t tout; tout = SU_DURATION_MAX; g_source_get_current_time(gs, >imeval); now.tv_sec = gtimeval.tv_sec + 2208988800UL; now.tv_usec = gtimeval.tv_usec; tout = su_timer_next_expires(self->sup_timers, now); if (tout == 0) return TRUE; if ((gint)tout < 0 || tout > (su_duration_t)G_MAXINT) tout = -1; *return_tout = (gint)tout; } return FALSE;}staticgboolean su_source_check(GSource *gs){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; gint tout; unsigned i, I; enter; I = self->sup_n_waits;#if SU_HAVE_POLL for (i = 0; i < I; i++) { if (self->sup_waits[i].revents) return TRUE; }#endif return su_source_prepare(gs, &tout);}static gboolean su_source_dispatch(GSource *gs, GSourceFunc callback, gpointer user_data){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; enter; if (self->sup_head) su_source_getmsgs(self); if (self->sup_timers) { su_time_t now; GTimeVal gtimeval; su_duration_t tout; int timers = 0; tout = SU_DURATION_MAX; g_source_get_current_time(gs, >imeval); now.tv_sec = gtimeval.tv_sec + 2208988800UL; now.tv_usec = gtimeval.tv_usec; timers = su_timer_expire(&self->sup_timers, &tout, now); }#if SU_HAVE_POLL { su_root_t *root; su_wait_t *waits = self->sup_waits; unsigned i, n = self->sup_n_waits; unsigned version = self->sup_registers; for (i = 0; i < n; i++) { if (waits[i].revents) { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); /* Callback used su_register()/su_unregister() */ if (version != self->sup_registers) break; } } }#endif if (!callback) return TRUE; return callback(user_data);}static void su_source_lock(su_port_t *self, char const *who){ SU_SOURCE_LOCK(self, who);}static void su_source_unlock(su_port_t *self, char const *who){ SU_SOURCE_UNLOCK(self, who);}static void su_source_incref(su_port_t *self, char const *who){ SU_SOURCE_INCREF(self, who);}static void su_source_decref(su_port_t *self, int blocking, char const *who){ /* XXX - blocking? */ SU_SOURCE_DECREF(self, who);}GSource *su_source_gsource(su_port_t *self){ return self->sup_source;}/** @internal Send a message to the port. */int su_source_send(su_port_t *self, su_msg_r rmsg){ enter; if (self) { su_msg_t *msg; GMainContext *gmc; SU_SOURCE_LOCK(self, "su_source_send"); msg = rmsg[0]; rmsg[0] = NULL; *self->sup_tail = msg; self->sup_tail = &msg->sum_next; SU_SOURCE_UNLOCK(self, "su_source_send"); gmc = g_source_get_context(self->sup_source); if (gmc) g_main_context_wakeup(gmc); return 0; } else { su_msg_destroy(rmsg); return -1; }}/** @internal * Execute the messages in the incoming queue until the queue is empty.. * * @param self - pointer to a port object * * @retval 0 if there was a signal to handle, * @retval -1 otherwise. */staticint su_source_getmsgs(su_port_t *self){ enter; if (self && self->sup_head) { su_root_t *root; su_msg_f f; SU_SOURCE_INCREF(self, "su_source_getmsgs"); SU_SOURCE_LOCK(self, "su_source_getmsgs"); while (self->sup_head) { su_msg_t *msg = self->sup_head; self->sup_head = msg->sum_next; if (!self->sup_head) { assert(self->sup_tail == &msg->sum_next); self->sup_tail = &self->sup_head; } root = msg->sum_to->sut_root; f = msg->sum_func; SU_SOURCE_UNLOCK(self, "su_source_getmsgs"); if (f) f(su_root_magic(root), &msg, msg->sum_data); if (msg && msg->sum_report) su_msg_delivery_report(&msg); else su_msg_destroy(&msg); SU_SOURCE_LOCK(self, "su_source_getmsgs"); } SU_SOURCE_UNLOCK(self, "su_source_getmsgs"); SU_SOURCE_DECREF(self, "su_source_getmsgs"); return 0; } else return -1;}/** @internal * * Register a @c su_wait_t object. The wait object, a callback function and * a argument pointer is stored in the port object. The callback function * will be called when the wait object is signaled. * * Please note if identical wait objects are inserted, only first one is * ever signalled. * * @param self pointer to port * @param root pointer to root object * @param waits pointer to wait object * @param callback callback function pointer * @param arg argument given to callback function when it is invoked * @param priority relative priority of the wait object * (0 is normal, 1 important, 2 realtime) * * @return * The function @su_source_register returns nonzero index of the wait object, * or -1 upon an error. */int su_source_register(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg, int priority){ unsigned i, j, I; unsigned n; enter; assert(SU_SOURCE_OWN_THREAD(self));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -