⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 su_source.c

📁 this is simple sip stack.
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//** * @file su_source.c * @brief Wrapper for glib GSource. *   * @author Pekka Pessi <Pekka.Pessi@nokia.com>. *  * @date Created: Thu Mar  4 15:15:15 2004 ppessi *  */#if HAVE_CONFIG_H#include "config.h"#endif/* Use Posix stuff */#define _XOPEN_SOURCE  (500)#include <stdlib.h>#include <assert.h>#include <stdio.h>#include <string.h>#include <limits.h>#include <glib.h>#define SU_PORT_IMPLEMENTATION 1#define SU_MSG_ARG_T union { char anoymous[4]; }#define su_port_s su_source_s#include "sofia-sip/su_source.h"#include "sofia-sip/su.h"#include "su_port.h"#include "sofia-sip/su_alloc.h"static su_port_t *su_source_create(void) __attribute__((__malloc__));static gboolean su_source_prepare(GSource *gs, gint *return_tout);static gboolean su_source_check(GSource *gs);static gboolean su_source_dispatch(GSource *gs,			    GSourceFunc callback,			    gpointer user_data);static void su_source_finalize(GSource *source);static int su_source_getmsgs(su_port_t *self);staticGSourceFuncs su_source_funcs[1] = {{    su_source_prepare,    su_source_check,    su_source_dispatch,    su_source_finalize,    NULL,    NULL  }};static void su_source_lock(su_port_t *self, char const *who);static void su_source_unlock(su_port_t *self, char const *who);static void su_source_incref(su_port_t *self, char const *who);static void su_source_decref(su_port_t *self, int blocking, char const *who);static struct _GSource *su_source_gsource(su_port_t *port);static int su_source_send(su_port_t *self, su_msg_r rmsg);static int su_source_register(su_port_t *self,			    su_root_t *root, 			    su_wait_t *wait, 			    su_wakeup_f callback,			    su_wakeup_arg_t *arg,			    int priority);static int su_source_unregister(su_port_t *port,			      su_root_t *root, 			      su_wait_t *wait,				      su_wakeup_f callback, 			      su_wakeup_arg_t *arg);static int su_source_deregister(su_port_t *self, int i);static int su_source_unregister_all(su_port_t *self,				  su_root_t *root);static int su_source_eventmask(su_port_t *self, 			     int index, int socket, int events);static void su_source_run(su_port_t *self);static void su_source_break(su_port_t *self);static su_duration_t su_source_step(su_port_t *self, su_duration_t tout);static int su_source_own_thread(su_port_t const *port);static int su_source_add_prepoll(su_port_t *port,			       su_root_t *root, 			       su_prepoll_f *, 			       su_prepoll_magic_t *);static int su_source_remove_prepoll(su_port_t *port,				  su_root_t *root);static su_timer_t **su_source_timers(su_port_t *port);static int su_source_multishot(su_port_t *self, int multishot);static int su_source_threadsafe(su_port_t *port);staticsu_port_vtable_t const su_source_vtable[1] =  {{      /* su_vtable_size: */ sizeof su_source_vtable,      su_source_lock,      su_source_unlock,      su_source_incref,      su_source_decref,      su_source_gsource,      su_source_send,      su_source_register,      su_source_unregister,      su_source_deregister,      su_source_unregister_all,      su_source_eventmask,      su_source_run,      su_source_break,      su_source_step,      su_source_own_thread,      su_source_add_prepoll,      su_source_remove_prepoll,      su_source_timers,      su_source_multishot,      su_source_threadsafe    }};/**  * Port is a per-thread reactor.   * * Multiple root objects executed by single thread share a su_port_t object.  */struct su_source_s {  su_home_t        sup_home[1];  su_port_vtable_t const *sup_vtable;    GThread         *sup_tid;  GStaticMutex     sup_mutex[1];  GStaticRWLock    sup_ref[1];  GSource         *sup_source;  GMainLoop       *sup_main_loop;    /* Message list - this is protected by lock  */  su_msg_t        *sup_head;  su_msg_t       **sup_tail;  /* Waits */  unsigned         sup_registers; /** Counter incremented by 				      su_port_register() or 				      su_port_unregister()				  */  unsigned         sup_n_waits;   unsigned         sup_size_waits;   unsigned         sup_max_index;  unsigned        *sup_indices;   su_wait_t       *sup_waits;   su_wakeup_f     *sup_wait_cbs;   su_wakeup_arg_t**sup_wait_args;   su_root_t      **sup_wait_roots;   /* Timer list */  su_timer_t      *sup_timers;};typedef struct _SuSource{  GSource    ss_source[1];  su_port_t  ss_port[1];} SuSource;#define SU_SOURCE_OWN_THREAD(p)   ((p)->sup_tid == g_thread_self())#if 1#define SU_SOURCE_INCREF(p, f)    (g_source_ref(p->sup_source))#define SU_SOURCE_DECREF(p, f)    (g_source_unref(p->sup_source))#define SU_SOURCE_INITLOCK(p)     (g_static_mutex_init((p)->sup_mutex))#define SU_SOURCE_LOCK(p, f)      (g_static_mutex_lock((p)->sup_mutex))#define SU_SOURCE_UNLOCK(p, f)    (g_static_mutex_unlock((p)->sup_mutex))#else/* Debugging versions */#define SU_SOURCE_INCREF(p, f)    (g_source_ref(p->sup_source), printf("incref(%p) by %s\n", (p), f))#define SU_SOURCE_DECREF(p, f)    do { printf("decref(%p) by %s\n", (p), f), \  g_source_unref(p->sup_source); } while(0)#define SU_SOURCE_INITLOCK(p) \   (g_static_mutex_init((p)->sup_mutex), printf("init_lock(%p)\n", p))#define SU_SOURCE_LOCK(p, f)    \   (printf("%ld at %s locking(%p)...", g_thread_self(), f, p), g_static_mutex_lock((p)->sup_mutex), printf(" ...%ld at %s locked(%p)...", g_thread_self(), f, p))#define SU_SOURCE_UNLOCK(p, f)  \  (g_static_mutex_unlock((p)->sup_mutex), printf(" ...%ld at %s unlocked(%p)\n", g_thread_self(), f, p))#endif#if HAVE_FUNC#define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))#elif HAVE_FUNCTION#define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))#else#define enter (void)0#endif/** Create a root that uses GSource as reactor */su_root_t *su_root_source_create(su_root_magic_t *magic){  return su_root_create_with_port(magic, su_source_create());}/**@internal * * Allocates and initializes a reactor and message port object. * * @return *   If successful a pointer to the new message port is returned, otherwise *   NULL is returned.   */su_port_t *su_source_create(void){  SuSource *ss;  SU_DEBUG_9(("su_source_create() called\n"));  ss = (SuSource *)g_source_new(su_source_funcs, (sizeof *ss));  if (ss) {    su_port_t *self = ss->ss_port;    self->sup_vtable = su_source_vtable;    self->sup_source = ss->ss_source;        SU_SOURCE_INITLOCK(self);    self->sup_tail = &self->sup_head;    self->sup_tid = g_thread_self();    SU_DEBUG_9(("su_source_with_main_context() returns %p\n", self));    return self;  } else {    su_perror("su_source_with_main_context(): su_home_clone");    SU_DEBUG_9(("su_source_with_main_context() fails\n"));    return NULL;  }}/** @internal Destroy a port. */static void su_source_finalize(GSource *gs){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  assert(gs);  SU_DEBUG_9(("su_source_finalize() called\n"));  if (self->sup_waits)     free(self->sup_waits), self->sup_waits = NULL;  if (self->sup_wait_cbs)    free(self->sup_wait_cbs), self->sup_wait_cbs = NULL;  if (self->sup_wait_args)    free(self->sup_wait_args), self->sup_wait_args = NULL;  if (self->sup_wait_roots)    free(self->sup_wait_roots), self->sup_wait_roots = NULL;  if (self->sup_indices)    free(self->sup_indices), self->sup_indices = NULL;  su_home_deinit(self->sup_home);}/* Seconds from 1.1.1900 to 1.1.1970 */#define NTP_EPOCH 2208988800UL staticgboolean su_source_prepare(GSource *gs, gint *return_tout){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  enter;    if (self->sup_head)    return TRUE;  *return_tout = -1;  if (self->sup_timers) {    su_time_t now;    GTimeVal  gtimeval;    su_duration_t tout;    tout = SU_DURATION_MAX;    g_source_get_current_time(gs, &gtimeval);    now.tv_sec = gtimeval.tv_sec + 2208988800UL;    now.tv_usec = gtimeval.tv_usec;    tout = su_timer_next_expires(self->sup_timers, now);    if (tout == 0)      return TRUE;    if ((gint)tout < 0 || tout > (su_duration_t)G_MAXINT)      tout = -1;    *return_tout = (gint)tout;  }    return FALSE;}staticgboolean su_source_check(GSource *gs){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  gint tout;  unsigned i, I;  enter;  I = self->sup_n_waits;#if SU_HAVE_POLL  for (i = 0; i < I; i++) {    if (self->sup_waits[i].revents)      return TRUE;  }#endif  return su_source_prepare(gs, &tout);}static gboolean su_source_dispatch(GSource *gs,			    GSourceFunc callback,			    gpointer user_data){  SuSource *ss = (SuSource *)gs;  su_port_t *self = ss->ss_port;  enter;  if (self->sup_head)    su_source_getmsgs(self);  if (self->sup_timers) {    su_time_t now;    GTimeVal  gtimeval;    su_duration_t tout;    int timers = 0;    tout = SU_DURATION_MAX;    g_source_get_current_time(gs, &gtimeval);    now.tv_sec = gtimeval.tv_sec + 2208988800UL;    now.tv_usec = gtimeval.tv_usec;    timers = su_timer_expire(&self->sup_timers, &tout, now);  }#if SU_HAVE_POLL  {    su_root_t *root;    su_wait_t *waits = self->sup_waits;    unsigned i, n = self->sup_n_waits;    unsigned version = self->sup_registers;    for (i = 0; i < n; i++) {      if (waits[i].revents) {	root = self->sup_wait_roots[i];	self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, 			      &waits[i], 			      self->sup_wait_args[i]);	/* Callback used su_register()/su_unregister() */	if (version != self->sup_registers)	  break;      }    }  }#endif  if (!callback)    return TRUE;  return callback(user_data);}static void su_source_lock(su_port_t *self, char const *who){  SU_SOURCE_LOCK(self, who);}static void su_source_unlock(su_port_t *self, char const *who){  SU_SOURCE_UNLOCK(self, who);}static void su_source_incref(su_port_t *self, char const *who){  SU_SOURCE_INCREF(self, who);}static void su_source_decref(su_port_t *self, int blocking, char const *who){  /* XXX - blocking? */  SU_SOURCE_DECREF(self, who);}GSource *su_source_gsource(su_port_t *self){  return self->sup_source;}/** @internal Send a message to the port. */int su_source_send(su_port_t *self, su_msg_r rmsg){  enter;    if (self) {    su_msg_t *msg;    GMainContext *gmc;    SU_SOURCE_LOCK(self, "su_source_send");        msg = rmsg[0]; rmsg[0] = NULL;    *self->sup_tail = msg;    self->sup_tail = &msg->sum_next;    SU_SOURCE_UNLOCK(self, "su_source_send");    gmc = g_source_get_context(self->sup_source);    if (gmc)      g_main_context_wakeup(gmc);    return 0;  }  else {    su_msg_destroy(rmsg);    return -1;  }}/** @internal * Execute the messages in the incoming queue until the queue is empty.. * * @param self - pointer to a port object * * @retval 0 if there was a signal to handle,  * @retval -1 otherwise. */staticint su_source_getmsgs(su_port_t *self){  enter;    if (self && self->sup_head) {    su_root_t *root;    su_msg_f f;    SU_SOURCE_INCREF(self, "su_source_getmsgs");    SU_SOURCE_LOCK(self, "su_source_getmsgs");    while (self->sup_head) {      su_msg_t *msg = self->sup_head;      self->sup_head = msg->sum_next;      if (!self->sup_head) {	assert(self->sup_tail == &msg->sum_next);	self->sup_tail = &self->sup_head;      }      root = msg->sum_to->sut_root;      f = msg->sum_func;      SU_SOURCE_UNLOCK(self, "su_source_getmsgs");      if (f) 	f(su_root_magic(root), &msg, msg->sum_data);      if (msg && msg->sum_report)	su_msg_delivery_report(&msg);      else	su_msg_destroy(&msg);      SU_SOURCE_LOCK(self, "su_source_getmsgs");    }    SU_SOURCE_UNLOCK(self, "su_source_getmsgs");    SU_SOURCE_DECREF(self, "su_source_getmsgs");    return 0;  }  else    return -1;}/** @internal * *  Register a @c su_wait_t object. The wait object, a callback function and *  a argument pointer is stored in the port object.  The callback function *  will be called when the wait object is signaled. * *  Please note if identical wait objects are inserted, only first one is *  ever signalled. *  * @param self	     pointer to port * @param root	     pointer to root object * @param waits	     pointer to wait object * @param callback   callback function pointer * @param arg	     argument given to callback function when it is invoked * @param priority   relative priority of the wait object  *              (0 is normal, 1 important, 2 realtime) *  * @return *   The function @su_source_register returns nonzero index of the wait object,  *   or -1 upon an error.  */int su_source_register(su_port_t *self,		       su_root_t *root, 		       su_wait_t *wait, 		       su_wakeup_f callback,		       su_wakeup_arg_t *arg,		       int priority){  unsigned i, j, I;  unsigned n;  enter;    assert(SU_SOURCE_OWN_THREAD(self));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -