⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 su_port.c

📁 this is simple sip stack.
💻 C
📖 第 1 页 / 共 3 页
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2005 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//**@ingroup su_wait * @CFILE su_port.c * * OS-Independent Socket Syncronization Interface. * * This looks like nth reincarnation of "reactor".  It implements the * poll/select/WaitForMultipleObjects and message passing functionality. * * @author Pekka Pessi <Pekka.Pessi@nokia.com> * @author Kai Vehmanen <kai.vehmanen@nokia.com> * * @date Created: Tue Sep 14 15:51:04 1999 ppessi */#include "config.h"/* React to multiple events per one poll() to make sure  * that high-priority events can never completely mask other events. * Enabled by default on all platforms except WIN32 */#ifndef WIN32#define SU_ENABLE_MULTISHOT_POLL 1#else#define SU_ENABLE_MULTISHOT_POLL 0#endif#include <stdlib.h>#include <assert.h>#include <stdarg.h>#include <stdio.h>#include <string.h>#include <limits.h>#include <errno.h>#define SU_PORT_IMPLEMENTATION 1#include "sofia-sip/su.h"#include "su_port.h"#include "sofia-sip/su_alloc.h"#if SU_HAVE_PTHREADS/* Pthread implementation */#include <pthread.h>#define SU_HAVE_MBOX 1#else#define SU_HAVE_MBOX 0#endif#if HAVE_SOCKETPAIR#define MBOX_SEND  1#else#define MBOX_SEND  0#endif#undef HAVE_EPOLL#if HAVE_EPOLL#include <sys/epoll.h>#endifstatic void su_port_lock(su_port_t *self, char const *who);static void su_port_unlock(su_port_t *self, char const *who);static void su_port_incref(su_port_t *self, char const *who);static void su_port_decref(su_port_t *self, int blocking, char const *who);static struct _GSource *su_port_gsource(su_port_t *port);static int su_port_send(su_port_t *self, su_msg_r rmsg);static int su_port_register(su_port_t *self,			    su_root_t *root, 			    su_wait_t *wait, 			    su_wakeup_f callback,			    su_wakeup_arg_t *arg,			    int priority);static int su_port_unregister(su_port_t *port,			      su_root_t *root, 			      su_wait_t *wait,				      su_wakeup_f callback, 			      su_wakeup_arg_t *arg);static int su_port_deregister(su_port_t *self, int i);static int su_port_unregister_all(su_port_t *self,			   su_root_t *root);staticint su_port_eventmask(su_port_t *self, int index, int socket, int events);staticvoid su_port_run(su_port_t *self);staticvoid su_port_break(su_port_t *self);staticsu_duration_t su_port_step(su_port_t *self, su_duration_t tout);#if 0unsigned su_port_query(su_port_t *, su_wait_t *, unsigned n_waits);void su_port_event(su_port_t *, su_wait_t *waitobj);#endifstaticint su_port_own_thread(su_port_t const *port);staticint su_port_add_prepoll(su_port_t *port,			su_root_t *root, 			su_prepoll_f *, 			su_prepoll_magic_t *);staticint su_port_remove_prepoll(su_port_t *port,			   su_root_t *root);staticsu_timer_t **su_port_timers(su_port_t *port);staticint su_port_multishot(su_port_t *port, int multishot);staticint su_port_threadsafe(su_port_t *port);staticint su_port_yield(su_port_t *port);su_port_vtable_t const su_port_vtable[1] =  {{      /* su_vtable_size: */ sizeof su_port_vtable,      su_port_lock,      su_port_unlock,      su_port_incref,      su_port_decref,      su_port_gsource,      su_port_send,      su_port_register,      su_port_unregister,      su_port_deregister,      su_port_unregister_all,      su_port_eventmask,      su_port_run,      su_port_break,      su_port_step,      su_port_own_thread,      su_port_add_prepoll,      su_port_remove_prepoll,      su_port_timers,      su_port_multishot,      su_port_threadsafe,      su_port_yield    }};static int su_port_wait_events(su_port_t *self, su_duration_t tout);/**  * Port is a per-thread reactor.   * * Multiple root objects executed by single thread share a su_port_t object.  */struct su_port_s {  su_home_t        sup_home[1];  su_port_vtable_t const *sup_vtable;    unsigned         sup_running;#if SU_HAVE_PTHREADS  pthread_t        sup_tid;  pthread_mutex_t  sup_mutex[1];#if __CYGWIN__  pthread_mutex_t  sup_reflock[1];  int              sup_ref;#else  pthread_rwlock_t sup_ref[1];#endif#else  int              sup_ref;#endif#if SU_HAVE_MBOX  su_socket_t      sup_mbox[MBOX_SEND + 1];  su_wait_t        sup_mbox_wait;#endif#if HAVE_EPOLL    int              sup_epoll;#endif  unsigned         sup_multishot; /**< Multishot operation? */  unsigned         sup_registers; /** Counter incremented by 				      su_port_register() or 				      su_port_unregister()				   */  int              sup_n_waits; /**< Active su_wait_t in su_waits */  int              sup_size_waits; /**< Size of allocate su_waits */  int              sup_pri_offset; /**< Offset to prioritized waits */  int              sup_free_index;   /**< Number of first free index */  int             *sup_indices; /** Indices to registrations */  int             *sup_reverses; /** Reverse index */  su_wakeup_f     *sup_wait_cbs;   su_wakeup_arg_t**sup_wait_args;   su_root_t      **sup_wait_roots;   su_wait_t       *sup_waits;   /* Pre-poll callback */  su_prepoll_f    *sup_prepoll;   su_prepoll_magic_t *sup_pp_magic;  su_root_t       *sup_pp_root;  /* Timer list */  su_timer_t      *sup_timers;  /* Message list - this is protected by lock  */  su_msg_t        *sup_head;  su_msg_t       **sup_tail;};#if SU_HAVE_PTHREADS#define SU_PORT_OWN_THREAD(p)   (pthread_equal((p)->sup_tid, pthread_self()))#if __CYGWIN__/* Debugging versions */#define SU_PORT_INITREF(p)      (pthread_mutex_init((p)->sup_reflock, NULL), printf("initref(%p)\n", (p)))#define SU_PORT_INCREF(p, f)    (pthread_mutex_lock(p->sup_reflock), p->sup_ref++, pthread_mutex_unlock(p->sup_reflock), printf("incref(%p) by %s\n", (p), f))#define SU_PORT_DECREF(p, f)    do {					\    pthread_mutex_lock(p->sup_reflock);	p->sup_ref--; pthread_mutex_unlock(p->sup_reflock); \    if ((p->sup_ref) == 0) {			\      printf("decref(%p) to 0 by %s\n", (p), f); su_port_destroy(p); }	\    else { printf("decref(%p) to %u by %s\n", (p), p->sup_ref, f); }  } while(0)#define SU_PORT_ZAPREF(p, f)    do { printf("zapref(%p) by %s\n", (p), f), \    pthread_mutex_lock(p->sup_reflock);	p->sup_ref--; pthread_mutex_unlock(p->sup_reflock); \  if ((p->sup_ref) != 0) { \    assert(!"SU_PORT_ZAPREF"); } \  su_port_destroy(p); } while(0)#define SU_PORT_INITLOCK(p) \   (pthread_mutex_init((p)->sup_mutex, NULL), printf("init_lock(%p)\n", p))#define SU_PORT_LOCK(p, f)    \   (printf("%ld at %s locking(%p)...", pthread_self(), f, p), pthread_mutex_lock((p)->sup_mutex), printf(" ...%ld at %s locked(%p)...", pthread_self(), f, p))#define SU_PORT_UNLOCK(p, f)  \  (pthread_mutex_unlock((p)->sup_mutex), printf(" ...%ld at %s unlocked(%p)\n", pthread_self(), f, p))#elif 1#define SU_PORT_INITREF(p)      (pthread_rwlock_init(p->sup_ref, NULL))#define SU_PORT_INCREF(p, f)    (pthread_rwlock_rdlock(p->sup_ref))#define SU_PORT_DECREF(p, f)    do { pthread_rwlock_unlock(p->sup_ref); \  if (pthread_rwlock_trywrlock(p->sup_ref) == 0) su_port_destroy(p); } while(0)#define SU_PORT_ZAPREF(p, f)    do { pthread_rwlock_unlock(p->sup_ref); \  if (pthread_rwlock_trywrlock(p->sup_ref) != 0) { \    assert(!"SU_PORT_ZAPREF"); pthread_rwlock_wrlock(p->sup_ref); } \  su_port_destroy(p); } while(0)#define SU_PORT_INITLOCK(p)     (pthread_mutex_init((p)->sup_mutex, NULL))#define SU_PORT_LOCK(p, f)      (pthread_mutex_lock((p)->sup_mutex))#define SU_PORT_UNLOCK(p, f)    (pthread_mutex_unlock((p)->sup_mutex))#else/* Debugging versions */#define SU_PORT_INITREF(p)      (pthread_rwlock_init((p)->sup_ref, NULL), printf("initref(%p)\n", (p)))#define SU_PORT_INCREF(p, f)    (pthread_rwlock_rdlock(p->sup_ref), printf("incref(%p) by %s\n", (p), f))#define SU_PORT_DECREF(p, f)    do {					\    pthread_rwlock_unlock(p->sup_ref);					\    if (pthread_rwlock_trywrlock(p->sup_ref) == 0) {			\      printf("decref(%p) to 0 by %s\n", (p), f); su_port_destroy(p); }	\    else { printf("decref(%p) by %s\n", (p), f); }  } while(0)#define SU_PORT_ZAPREF(p, f)    do { printf("zapref(%p) by %s\n", (p), f), \  pthread_rwlock_unlock(p->sup_ref); \  if (pthread_rwlock_trywrlock(p->sup_ref) != 0) { \    assert(!"SU_PORT_ZAPREF"); pthread_rwlock_wrlock(p->sup_ref); } \  su_port_destroy(p); } while(0)#define SU_PORT_INITLOCK(p) \   (pthread_mutex_init((p)->sup_mutex, NULL), printf("init_lock(%p)\n", p))#define SU_PORT_LOCK(p, f)    \   (printf("%ld at %s locking(%p)...", pthread_self(), f, p), pthread_mutex_lock((p)->sup_mutex), printf(" ...%ld at %s locked(%p)...", pthread_self(), f, p))#define SU_PORT_UNLOCK(p, f)  \  (pthread_mutex_unlock((p)->sup_mutex), printf(" ...%ld at %s unlocked(%p)\n", pthread_self(), f, p))#endif#else /* !SU_HAVE_PTHREADS */#define SU_PORT_OWN_THREAD(p)  1#define SU_PORT_INITLOCK(p)    (void)(p)#define SU_PORT_LOCK(p, f)      (void)(p)#define SU_PORT_UNLOCK(p, f)    (void)(p)#define SU_PORT_ZAPREF(p, f)    ((p)->sup_ref--)#define SU_PORT_INITREF(p)      ((p)->sup_ref = 1)#define SU_PORT_INCREF(p, f)    ((p)->sup_ref++)#define SU_PORT_DECREF(p, f) \do { if (--((p)->sup_ref) == 0) su_port_destroy(p); } while (0);#endif#if SU_HAVE_MBOXstatic int su_port_wakeup(su_root_magic_t *magic, 			  su_wait_t *w,			  su_wakeup_arg_t *arg);#endifstatic void su_port_destroy(su_port_t *self);/**@internal * * Allocates and initializes a message port. It creates a mailbox used to. * wake up the tasks waiting on the port if needed.  Currently, the * mailbox is simply an UDP socket connected to itself. * * @return *   If successful a pointer to the new message port is returned, otherwise *   NULL is returned.   */su_port_t *su_port_create(void){  su_port_t *self;  SU_DEBUG_9(("su_port_create() called\n"));  self = su_home_clone(NULL, sizeof(*self));  if (self) {#if SU_HAVE_MBOX    int af;    su_socket_t mb = INVALID_SOCKET;    char const *why;#endif    self->sup_vtable = su_port_vtable;        SU_PORT_INITREF(self);    SU_PORT_INITLOCK(self);    self->sup_tail = &self->sup_head;    self->sup_free_index = -1;    self->sup_multishot = (SU_ENABLE_MULTISHOT_POLL) != 0;#if SU_HAVE_PTHREADS    self->sup_tid = pthread_self();#endif#if HAVE_EPOLL    self->sup_epoll = epoll_create(su_root_size_hint);    if (self->sup_epoll == -1) {      why = "su_port_init: epoll_create"; goto error;    }#endif  #if SU_HAVE_MBOX#if HAVE_SOCKETPAIR#if defined(AF_LOCAL)    af = AF_LOCAL;#else    af = AF_UNIX;#endif    if (socketpair(af, SOCK_STREAM, 0, self->sup_mbox) == -1) {      why = "su_port_init: socketpair"; goto error;    }    mb = self->sup_mbox[0];    su_setblocking(self->sup_mbox[0], 0);    su_setblocking(self->sup_mbox[1], 0);#else    {      struct sockaddr_in sin = { sizeof(struct sockaddr_in), 0 };      size_t sinsize = sizeof sin;      struct sockaddr *sa = (struct sockaddr *)&sin;      af = PF_INET;      self->sup_mbox[0] = mb = socket(af, SOCK_DGRAM, IPPROTO_UDP);      if (mb == INVALID_SOCKET) {	why = "su_port_init: socket"; goto error;      }        sin.sin_family = AF_INET;      sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); /* 127.1 */            /* Get a port for us */      if (bind(mb, sa, sizeof sin) == -1) {	why = "su_port_init: bind"; goto error;      }      if (getsockname(mb, sa, &sinsize) == -1) {	why = "su_port_init: getsockname"; goto error;      }          if (connect(mb, sa, sinsize) == -1) {	why = "su_port_init: connect"; goto error;      }    }#endif        if (su_wait_create(&self->sup_mbox_wait, mb, SU_WAIT_IN) == -1) {      why = "su_port_init: su_wait_create"; goto error;    }    if (su_port_register(self, NULL, &self->sup_mbox_wait, su_port_wakeup, 			 (su_wakeup_arg_t *)self->sup_mbox, 0)	== -1) {      why = "su_port_create: su_port_register"; goto error;    }    SU_DEBUG_9(("su_port_create() returns %p\n", self));    return self;  error:    su_perror(why);    su_port_destroy(self), self = NULL;#endif  }  SU_DEBUG_9(("su_port_create() returns %p\n", self));  return self;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -