⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 su_source.c

📁 Sofia SIP is an open-source SIP User-Agent library, compliant with the IETF RFC3261 specification.
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//** * @file su_source.c * @brief Wrapper for glib GSource. *   * Refs:  *  - http://sofia-sip.sourceforge.net/refdocs/su/group__su__wait.html *  - http://developer.gnome.org/doc/API/glib/glib-the-main-event-loop.html * * @author Pekka Pessi <Pekka.Pessi@nokia.com>. *  * @date Created: Thu Mar  4 15:15:15 2004 ppessi *  */#include "config.h"#include <glib.h>#if HAVE_OPEN_C#include <glib/gthread.h>#include <glib_global.h>#endif#define SU_PORT_IMPLEMENTATION 1#define SU_MSG_ARG_T union { char anoymous[4]; }#define su_port_s su_source_s#include "sofia-sip/su_source.h"#include "sofia-sip/su_glib.h"#include "sofia-sip/su.h"#include "su_port.h"#include "sofia-sip/su_alloc.h"#include <stdlib.h>#include <assert.h>#include <stdio.h>#include <string.h>#include <limits.h>#if 1#define PORT_LOCK_DEBUG(x)  ((void)0)#else#define PORT_LOCK_DEBUG(x)  printf x#endifstatic su_port_t *su_source_port_create(void) __attribute__((__malloc__));static gboolean su_source_prepare(GSource *gs, gint *return_tout);static gboolean su_source_check(GSource *gs);static gboolean su_source_dispatch(GSource *gs,				   GSourceFunc callback,				   gpointer user_data);static void su_source_finalize(GSource *source);staticGSourceFuncs su_source_funcs[1] = {{    su_source_prepare,    su_source_check,    su_source_dispatch,    su_source_finalize,    NULL,    NULL  }};static int su_source_port_init(su_port_t *self, su_port_vtable_t const *vtable);static void su_source_port_deinit(su_port_t *self);static void su_source_lock(su_port_t *self, char const *who);static void su_source_unlock(su_port_t *self, char const *who);static void su_source_incref(su_port_t *self, char const *who);static void su_source_decref(su_port_t *self, int blocking, char const *who);static struct _GSource *su_source_gsource(su_port_t *port);static int su_source_send(su_port_t *self, su_msg_r rmsg);static int su_source_register(su_port_t *self,			    su_root_t *root, 			    su_wait_t *wait, 			    su_wakeup_f callback,			    su_wakeup_arg_t *arg,			    int priority);static int su_source_unregister(su_port_t *port,			      su_root_t *root, 			      su_wait_t *wait,				      su_wakeup_f callback, 			      su_wakeup_arg_t *arg);static int su_source_deregister(su_port_t *self, int i);static int su_source_unregister_all(su_port_t *self,				  su_root_t *root);static int su_source_eventmask(su_port_t *self, 			     int index, int socket, int events);static void su_source_run(su_port_t *self);static void su_source_break(su_port_t *self);static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);static int su_source_thread(su_port_t *self, enum su_port_thread_op op);static int su_source_add_prepoll(su_port_t *port,				 su_root_t *root, 				 su_prepoll_f *, 				 su_prepoll_magic_t *);static int su_source_remove_prepoll(su_port_t *port,				  su_root_t *root);static int su_source_multishot(su_port_t *self, int multishot);static char const *su_source_name(su_port_t const *self);static su_port_vtable_t const su_source_port_vtable[1] =  {{      /* su_vtable_size: */ sizeof su_source_port_vtable,      su_source_lock,      su_source_unlock,      su_source_incref,      su_source_decref,      su_source_gsource,      su_source_send,      su_source_register,      su_source_unregister,      su_source_deregister,      su_source_unregister_all,      su_source_eventmask,      su_source_run,      su_source_break,      su_source_step,      su_source_thread,      su_source_add_prepoll,      su_source_remove_prepoll,      su_base_port_timers,      su_source_multishot,      /*su_source_wait_events*/ NULL,      su_base_port_getmsgs,      su_base_port_getmsgs_from,      su_source_name,      su_base_port_start_shared,      su_base_port_wait,      NULL,    }};static char const *su_source_name(su_port_t const *self){  return "GSource";}/**  * Port is a per-thread reactor.   * * Multiple root objects executed by single thread share a su_port_t object.  */struct su_source_s {  su_base_port_t   sup_base[1];    GThread         *sup_tid;  GStaticMutex     sup_obtained[1];  GStaticMutex     sup_mutex[1];  GSource         *sup_source;	/**< Backpointer to source */  GMainLoop       *sup_main_loop; /**< Reference to mainloop while running */    /* Waits */  unsigned         sup_registers; /** Counter incremented by 				      su_port_register() or 				      su_port_unregister()				  */  unsigned         sup_n_waits;   unsigned         sup_size_waits;   unsigned         sup_max_index;  unsigned        *sup_indices;   su_wait_t       *sup_waits;   su_wakeup_f     *sup_wait_cbs;   su_wakeup_arg_t**sup_wait_args;   su_root_t      **sup_wait_roots; };typedef struct _SuSource{  GSource    ss_source[1];  su_port_t  ss_port[1];} SuSource;#define SU_SOURCE_OWN_THREAD(p)   ((p)->sup_tid == g_thread_self())#if 1#define SU_SOURCE_INCREF(p, f)    (g_source_ref(p->sup_source))#define SU_SOURCE_DECREF(p, f)    (g_source_unref(p->sup_source))#else/* Debugging versions */#define SU_SOURCE_INCREF(p, f)    (g_source_ref(p->sup_source), printf("incref(%p) by %s\n", (p), f))#define SU_SOURCE_DECREF(p, f)    do { printf("decref(%p) by %s\n", (p), f), \  g_source_unref(p->sup_source); } while(0)#endif#if HAVE_FUNC#define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))#elif HAVE_FUNCTION#define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))#else#define enter (void)0#endif/*=============== Public function definitions ===============*//** Create a root that uses GSource as reactor */su_root_t *su_glib_root_create(su_root_magic_t *magic){  return su_root_create_with_port(magic, su_source_port_create());}/** Deprecated */su_root_t *su_root_source_create(su_root_magic_t *magic){  return su_glib_root_create(magic);}/**  * Returns a GSource object for the root  * * Note that you need to unref the GSource with g_source_unref()  * before destroying the root object. * * @return NULL on error (for instance if root was not created with  *         su_glib_root_create()) */GSource *su_glib_root_gsource(su_root_t *root){  g_assert(root);  return su_root_gsource(root);}/*=============== Private function definitions ===============*//** Initialize source port */static int su_source_port_init(su_port_t *self,			       su_port_vtable_t const *vtable){  GSource *gs = (GSource *)((char *)self - offsetof(SuSource, ss_port));  self->sup_source = gs;  g_static_mutex_init(self->sup_obtained);  g_static_mutex_init(self->sup_mutex);  return su_base_port_init(self, vtable);}static void su_source_port_deinit(su_port_t *self){  su_base_port_deinit(self);  g_static_mutex_free(self->sup_mutex);  g_static_mutex_free(self->sup_obtained);  if (self->sup_indices)    free (self->sup_indices), self->sup_indices = NULL;  if (self->sup_waits)    free (self->sup_waits), self->sup_waits = NULL;  if (self->sup_wait_cbs)    free (self->sup_wait_cbs), self->sup_wait_cbs = NULL;  if (self->sup_wait_args)    free (self->sup_wait_args), self->sup_wait_args = NULL;  if (self->sup_wait_roots)    free (self->sup_wait_roots), self->sup_wait_roots = NULL;  su_home_deinit(self->sup_base->sup_home);}/** @internal Destroy a port. */static void su_source_finalize(GSource *gs){  SuSource *ss = (SuSource *)gs;  assert(gs);  SU_DEBUG_9(("su_source_finalize() called\n"));  su_source_port_deinit(ss->ss_port);}void su_source_port_lock(su_port_t *self, char const *who){  PORT_LOCK_DEBUG(("%p at %s locking(%p)...",		   (void *)g_thread_self(), who, self));  g_static_mutex_lock(self->sup_mutex);  PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...", 		   (void *)g_thread_self(), who, self));}void su_source_port_unlock(su_port_t *self, char const *who){  g_static_mutex_unlock(self->sup_mutex);  PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n", 		   (void *)g_thread_self(), who, self));}/** @internal Send a message to the port. */int su_source_send(su_port_t *self, su_msg_r rmsg){  int wakeup = su_base_port_send(self, rmsg);  GMainContext *gmc;  if (wakeup < 0)    return -1;  if (wakeup == 0)    return 0;  gmc = g_source_get_context(self->sup_source);  if (gmc)    g_main_context_wakeup(gmc);  return 0;}/** @internal * * Change or query ownership of the port object. * * @param self pointer to a port object * @param op operation  * * @ERRORS * @ERROR EALREADY port already has an owner (or has no owner) */static int su_source_thread(su_port_t *self, enum su_port_thread_op op){  GThread *me = g_thread_self();  switch (op) {  case su_port_thread_op_is_obtained:    if (self->sup_tid == me)      return 2;    else if (self->sup_tid)      return 1;    else      return 0;  case su_port_thread_op_release:    if (self->sup_tid != me)      return errno = EALREADY, -1;    self->sup_tid = NULL;    g_static_mutex_unlock(self->sup_obtained);    return 0;  case su_port_thread_op_obtain:    if (su_home_threadsafe(su_port_home(self)) == -1)      return -1;    g_static_mutex_lock(self->sup_obtained);    self->sup_tid = me;    return 0;  default:    return errno = ENOSYS, -1;  }}/* -- Registering and unregistering ------------------------------------- *//* Seconds from 1.1.1900 to 1.1.1970 */#define NTP_EPOCH 2208988800UL /** Prepare to wait - calculate time to next timer */staticgboolean su_source_prepare(GSource *gs, gint *return_tout){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  enter;    if (self->sup_base->sup_head) {    *return_tout = 0;    return TRUE;  }  if (self->sup_base->sup_timers) {    su_time_t now;    GTimeVal  gtimeval;    su_duration_t tout;    g_source_get_current_time(gs, &gtimeval);    now.tv_sec = gtimeval.tv_sec + 2208988800UL;    now.tv_usec = gtimeval.tv_usec;    tout = su_timer_next_expires(self->sup_base->sup_timers, now);    *return_tout = (tout < 0 || tout > (su_duration_t)G_MAXINT)?	-1 : (gint)tout;    return (tout == 0);  }  return FALSE;}staticgboolean su_source_check(GSource *gs){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  gint tout;  unsigned i, I;  enter;  I = self->sup_n_waits;#if SU_HAVE_POLL  for (i = 0; i < I; i++) {    if (self->sup_waits[i].revents)      return TRUE;  }#endif  return su_source_prepare(gs, &tout);}static gboolean su_source_dispatch(GSource *gs,			    GSourceFunc callback,			    gpointer user_data){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  enter;  if (self->sup_base->sup_head)    su_base_port_getmsgs(self);  if (self->sup_base->sup_timers) {    su_time_t now;    GTimeVal  gtimeval;    su_duration_t tout;    int timers = 0;    tout = SU_DURATION_MAX;    g_source_get_current_time(gs, &gtimeval);    now.tv_sec = gtimeval.tv_sec + 2208988800UL;    now.tv_usec = gtimeval.tv_usec;    timers = su_timer_expire(&self->sup_base->sup_timers, &tout, now);  }#if SU_HAVE_POLL  {    su_root_t *root;    su_wait_t *waits = self->sup_waits;    unsigned i, n = self->sup_n_waits;    unsigned version = self->sup_registers;    for (i = 0; i < n; i++) {      if (waits[i].revents) {	root = self->sup_wait_roots[i];	self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, 			      &waits[i], 			      self->sup_wait_args[i]);	/* Callback used su_register()/su_unregister() */	if (version != self->sup_registers)	  break;      }    }  }#endif  if (!callback)    return TRUE;  return callback(user_data);}static void su_source_lock(su_port_t *self, char const *who){  PORT_LOCK_DEBUG(("%p at %s locking(%p)...",		   (void *)g_thread_self(), who, self));  g_static_mutex_lock(self->sup_mutex);  PORT_LOCK_DEBUG((" ...%p at %s locked(%p)...", 		   (void *)g_thread_self(), who, self));}static void su_source_unlock(su_port_t *self, char const *who){  g_static_mutex_unlock(self->sup_mutex);  PORT_LOCK_DEBUG((" ...%p at %s unlocked(%p)\n", 		   (void *)g_thread_self(), who, self));}static void su_source_incref(su_port_t *self, char const *who){  SU_SOURCE_INCREF(self, who);}static void su_source_decref(su_port_t *self, int blocking, char const *who){  /* XXX - blocking? */  SU_SOURCE_DECREF(self, who);}GSource *su_source_gsource(su_port_t *self){  return self->sup_source;}/** @internal * *  Register a @c su_wait_t object. The wait object, a callback function and *  a argument pointer is stored in the port object.  The callback function *  will be called when the wait object is signaled. * *  Please note if identical wait objects are inserted, only first one is *  ever signalled. *  * @param self	     pointer to port * @param root	     pointer to root object * @param waits	     pointer to wait object * @param callback   callback function pointer * @param arg	     argument given to callback function when it is invoked * @param priority   relative priority of the wait object  *              (0 is normal, 1 important, 2 realtime)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -