📄 su_source.c
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//** * @file su_source.c * @brief Wrapper for glib GSource. * * Refs: * - http://sofia-sip.sourceforge.net/refdocs/su/group__su__wait.html * - http://developer.gnome.org/doc/API/glib/glib-the-main-event-loop.html * * @author Pekka Pessi <Pekka.Pessi@nokia.com>. * * @date Created: Thu Mar 4 15:15:15 2004 ppessi * */#include "config.h"#include <glib.h>#if HAVE_OPEN_C#include <glib/gthread.h>#include <glib_global.h>#endif#define SU_PORT_IMPLEMENTATION 1#define SU_MSG_ARG_T union { char anoymous[4]; }#define su_port_s su_source_s#include "sofia-sip/su_source.h"#include "sofia-sip/su_glib.h"#include "sofia-sip/su.h"#include "su_port.h"#include "sofia-sip/su_alloc.h"#include <stdlib.h>#include <assert.h>#include <stdio.h>#include <string.h>#include <limits.h>#if 1#define PORT_LOCK_DEBUG(x) ((void)0)#else#define PORT_LOCK_DEBUG(x) printf x#endifstatic su_port_t *su_source_port_create(void) __attribute__((__malloc__));static gboolean su_source_prepare(GSource *gs, gint *return_tout);static gboolean su_source_check(GSource *gs);static gboolean su_source_dispatch(GSource *gs, GSourceFunc callback, gpointer user_data);static void su_source_finalize(GSource *source);staticGSourceFuncs su_source_funcs[1] = {{ su_source_prepare, su_source_check, su_source_dispatch, su_source_finalize, NULL, NULL }};static int su_source_port_init(su_port_t *self, su_port_vtable_t const *vtable);static void su_source_port_deinit(su_port_t *self);static void su_source_lock(su_port_t *self, char const *who);static void su_source_unlock(su_port_t *self, char const *who);static void su_source_incref(su_port_t *self, char const *who);static void su_source_decref(su_port_t *self, int blocking, char const *who);static struct _GSource *su_source_gsource(su_port_t *port);static int su_source_send(su_port_t *self, su_msg_r rmsg);static int su_source_register(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg, int priority);static int su_source_unregister(su_port_t *port, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg);static int su_source_deregister(su_port_t *self, int i);static int su_source_unregister_all(su_port_t *self, su_root_t *root);static int su_source_eventmask(su_port_t *self, int index, int socket, int events);static void su_source_run(su_port_t *self);static void su_source_break(su_port_t *self);static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);static int su_source_thread(su_port_t *self, enum su_port_thread_op op);static int su_source_add_prepoll(su_port_t *port, su_root_t *root, su_prepoll_f *, su_prepoll_magic_t *);static int su_source_remove_prepoll(su_port_t *port, su_root_t *root);static int su_source_multishot(su_port_t *self, int multishot);static char const *su_source_name(su_port_t const *self);static su_port_vtable_t const su_source_port_vtable[1] = {{ /* su_vtable_size: */ sizeof su_source_port_vtable, su_source_lock, su_source_unlock, su_source_incref, su_source_decref, su_source_gsource, su_source_send, su_source_register, su_source_unregister, su_source_deregister, su_source_unregister_all, su_source_eventmask, su_source_run, su_source_break, su_source_step, su_source_thread, su_source_add_prepoll, su_source_remove_prepoll, su_base_port_timers, su_source_multishot, /*su_source_wait_events*/ NULL, su_base_port_getmsgs, su_base_port_getmsgs_from, su_source_name, su_base_port_start_shared, su_base_port_wait, NULL, }};static char const *su_source_name(su_port_t const *self){ return "GSource";}/** * Port is a per-thread reactor. * * Multiple root objects executed by single thread share a su_port_t object. */struct su_source_s { su_base_port_t sup_base[1]; GThread *sup_tid; GStaticMutex sup_obtained[1]; GStaticMutex sup_mutex[1]; GSource *sup_source; /**< Backpointer to source */ GMainLoop *sup_main_loop; /**< Reference to mainloop while running */ /* Waits */ unsigned sup_registers; /** Counter incremented by su_port_register() or su_port_unregister() */ unsigned sup_n_waits; unsigned sup_size_waits; unsigned sup_max_index; unsigned *sup_indices; su_wait_t *sup_waits; su_wakeup_f *sup_wait_cbs; su_wakeup_arg_t**sup_wait_args; su_root_t **sup_wait_roots; };typedef struct _SuSource{ GSource ss_source[1]; su_port_t ss_port[1];} SuSource;#define SU_SOURCE_OWN_THREAD(p) ((p)->sup_tid == g_thread_self())#if 1#define SU_SOURCE_INCREF(p, f) (g_source_ref(p->sup_source))#define SU_SOURCE_DECREF(p, f) (g_source_unref(p->sup_source))#else/* Debugging versions */#define SU_SOURCE_INCREF(p, f) (g_source_ref(p->sup_source), printf("incref(%p) by %s\n", (p), f))#define SU_SOURCE_DECREF(p, f) do { printf("decref(%p) by %s\n", (p), f), \ g_source_unref(p->sup_source); } while(0)#endif#if HAVE_FUNC#define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))#elif HAVE_FUNCTION#define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))#else#define enter (void)0#endif/*=============== Public function definitions ===============*//** Create a root that uses GSource as reactor */su_root_t *su_glib_root_create(su_root_magic_t *magic){ return su_root_create_with_port(magic, su_source_port_create());}/** Deprecated */su_root_t *su_root_source_create(su_root_magic_t *magic){ return su_glib_root_create(magic);}/** * Returns a GSource object for the root * * Note that you need to unref the GSource with g_source_unref() * before destroying the root object. * * @return NULL on error (for instance if root was not created with * su_glib_root_create()) */GSource *su_glib_root_gsource(su_root_t *root){ g_assert(root); return su_root_gsource(root);}/*=============== Private function definitions ===============*//** Initialize source port */static int su_source_port_init(su_port_t *self, su_port_vtable_t const *vtable){ GSource *gs = (GSource *)((char *)self - offsetof(SuSource, ss_port)); self->sup_source = gs; g_static_mutex_init(self->sup_obtained); g_static_mutex_init(self->sup_mutex); return su_base_port_init(self, vtable);}static void su_source_port_deinit(su_port_t *self){ su_base_port_deinit(self); g_static_mutex_free(self->sup_mutex); g_static_mutex_free(self->sup_obtained); if (self->sup_indices) free (self->sup_indices), self->sup_indices = NULL; if (self->sup_waits) free (self->sup_waits), self->sup_waits = NULL; if (self->sup_wait_cbs) free (self->sup_wait_cbs), self->sup_wait_cbs = NULL; if (self->sup_wait_args) free (self->sup_wait_args), self->sup_wait_args = NULL; if (self->sup_wait_roots) free (self->sup_wait_roots), self->sup_wait_roots = NULL; su_home_deinit(self->sup_base->sup_home);}/** @internal Destroy a port. */static void su_source_finalize(GSource *gs){ SuSource *ss = (SuSource *)gs; assert(gs); SU_DEBUG_9(("su_source_finalize() called\n")); su_source_port_deinit(ss->ss_port);}void su_source_port_lock(su_port_t *self, char const *who){ PORT_LOCK_DEBUG(("%p at %s locking(%p)...", (void *)g_thread_self(), who, self)); g_static_mutex_lock(self->sup_mutex); PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...", (void *)g_thread_self(), who, self));}void su_source_port_unlock(su_port_t *self, char const *who){ g_static_mutex_unlock(self->sup_mutex); PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n", (void *)g_thread_self(), who, self));}/** @internal Send a message to the port. */int su_source_send(su_port_t *self, su_msg_r rmsg){ int wakeup = su_base_port_send(self, rmsg); GMainContext *gmc; if (wakeup < 0) return -1; if (wakeup == 0) return 0; gmc = g_source_get_context(self->sup_source); if (gmc) g_main_context_wakeup(gmc); return 0;}/** @internal * * Change or query ownership of the port object. * * @param self pointer to a port object * @param op operation * * @ERRORS * @ERROR EALREADY port already has an owner (or has no owner) */static int su_source_thread(su_port_t *self, enum su_port_thread_op op){ GThread *me = g_thread_self(); switch (op) { case su_port_thread_op_is_obtained: if (self->sup_tid == me) return 2; else if (self->sup_tid) return 1; else return 0; case su_port_thread_op_release: if (self->sup_tid != me) return errno = EALREADY, -1; self->sup_tid = NULL; g_static_mutex_unlock(self->sup_obtained); return 0; case su_port_thread_op_obtain: if (su_home_threadsafe(su_port_home(self)) == -1) return -1; g_static_mutex_lock(self->sup_obtained); self->sup_tid = me; return 0; default: return errno = ENOSYS, -1; }}/* -- Registering and unregistering ------------------------------------- *//* Seconds from 1.1.1900 to 1.1.1970 */#define NTP_EPOCH 2208988800UL /** Prepare to wait - calculate time to next timer */staticgboolean su_source_prepare(GSource *gs, gint *return_tout){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; enter; if (self->sup_base->sup_head) { *return_tout = 0; return TRUE; } if (self->sup_base->sup_timers) { su_time_t now; GTimeVal gtimeval; su_duration_t tout; g_source_get_current_time(gs, >imeval); now.tv_sec = gtimeval.tv_sec + 2208988800UL; now.tv_usec = gtimeval.tv_usec; tout = su_timer_next_expires(self->sup_base->sup_timers, now); *return_tout = (tout < 0 || tout > (su_duration_t)G_MAXINT)? -1 : (gint)tout; return (tout == 0); } return FALSE;}staticgboolean su_source_check(GSource *gs){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; gint tout; unsigned i, I; enter; I = self->sup_n_waits;#if SU_HAVE_POLL for (i = 0; i < I; i++) { if (self->sup_waits[i].revents) return TRUE; }#endif return su_source_prepare(gs, &tout);}static gboolean su_source_dispatch(GSource *gs, GSourceFunc callback, gpointer user_data){ SuSource *ss = (SuSource *)gs; su_port_t *self = ss->ss_port; enter; if (self->sup_base->sup_head) su_base_port_getmsgs(self); if (self->sup_base->sup_timers) { su_time_t now; GTimeVal gtimeval; su_duration_t tout; int timers = 0; tout = SU_DURATION_MAX; g_source_get_current_time(gs, >imeval); now.tv_sec = gtimeval.tv_sec + 2208988800UL; now.tv_usec = gtimeval.tv_usec; timers = su_timer_expire(&self->sup_base->sup_timers, &tout, now); }#if SU_HAVE_POLL { su_root_t *root; su_wait_t *waits = self->sup_waits; unsigned i, n = self->sup_n_waits; unsigned version = self->sup_registers; for (i = 0; i < n; i++) { if (waits[i].revents) { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); /* Callback used su_register()/su_unregister() */ if (version != self->sup_registers) break; } } }#endif if (!callback) return TRUE; return callback(user_data);}static void su_source_lock(su_port_t *self, char const *who){ PORT_LOCK_DEBUG(("%p at %s locking(%p)...", (void *)g_thread_self(), who, self)); g_static_mutex_lock(self->sup_mutex); PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...", (void *)g_thread_self(), who, self));}static void su_source_unlock(su_port_t *self, char const *who){ g_static_mutex_unlock(self->sup_mutex); PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n", (void *)g_thread_self(), who, self));}static void su_source_incref(su_port_t *self, char const *who){ SU_SOURCE_INCREF(self, who);}static void su_source_decref(su_port_t *self, int blocking, char const *who){ /* XXX - blocking? */ SU_SOURCE_DECREF(self, who);}GSource *su_source_gsource(su_port_t *self){ return self->sup_source;}/** @internal * * Register a @c su_wait_t object. The wait object, a callback function and * a argument pointer is stored in the port object. The callback function * will be called when the wait object is signaled. * * Please note if identical wait objects are inserted, only first one is * ever signalled. * * @param self pointer to port * @param root pointer to root object * @param waits pointer to wait object * @param callback callback function pointer * @param arg argument given to callback function when it is invoked * @param priority relative priority of the wait object * (0 is normal, 1 important, 2 realtime)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -