⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tport_threadpool.c

📁 Sofia SIP is an open-source SIP User-Agent library, compliant with the IETF RFC3261 specification.
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2006 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//**@CFILE tport_threadpool.c Multithreading transport * * See tport.docs for more detailed description of tport interface. * * @author Pekka Pessi <Pekka.Pessi@nokia.com> * @author Martti Mela <Martti.Mela@nokia.com> * * @date Created: Fri Mar 24 08:45:49 EET 2006 ppessi */#include "config.h"#undef HAVE_SIGCOMP #define SU_ROOT_MAGIC_T         struct tport_threadpool#define SU_WAKEUP_ARG_T         struct tport_s#define SU_MSG_ARG_T            union tport_su_msg_arg#include "tport_internal.h"#include <stdlib.h>#include <time.h>#include <assert.h>#include <errno.h>#include <limits.h>#if HAVE_FUNC#elif HAVE_FUNCTION#define __func__ __FUNCTION__#elsestatic char const __func__[] = "tport_threadpool";#endif/* ==== Thread pools =================================================== */typedef struct threadpool threadpool_t;typedef struct {  tport_primary_t tptp_primary;  threadpool_t   *tptp_pool;   /**< Worker threads */  unsigned        tptp_poolsize;} tport_threadpool_t;struct threadpool{  /* Shared */  su_clone_r thrp_clone;  tport_threadpool_t *thrp_tport;  int        thrp_killing; /* Threadpool is being killed */  /* Private variables */  su_root_t    *thrp_root;  int           thrp_reg;  struct sigcomp_compartment *thrp_compartment;  su_msg_r   thrp_rmsg;  /* Slave thread counters */  int        thrp_r_sent;  int        thrp_s_recv;  unsigned   thrp_rcvd_msgs;  unsigned   thrp_rcvd_bytes;  /* Master thread counters */  int        thrp_s_sent;  int        thrp_r_recv;  int        thrp_yield;};typedef struct {  threadpool_t *tpd_thrp;  int  tpd_errorcode;  msg_t *tpd_msg;  su_time_t tpd_when;  unsigned tpd_mtu;#if HAVE_SIGCOMP  struct sigcomp_compartment *tpd_cc;#endif  struct sigcomp_udvm *tpd_udvm;  socklen_t tpd_namelen;  su_sockaddr_t tpd_name[1];} thrp_udp_deliver_t;union tport_su_msg_arg{  threadpool_t   *thrp;  thrp_udp_deliver_t thrp_udp_deliver[1];};int tport_threadpool_init_primary(tport_primary_t *, 				  tp_name_t tpn[1], 				  su_addrinfo_t *, 				  tagi_t const *,				  char const **return_culprit);static void tport_threadpool_deinit_primary(tport_primary_t *pri);static int tport_thread_send(tport_t *tp,			     msg_t *msg,			     tp_name_t const *tpn, 			     struct sigcomp_compartment *cc,			     unsigned mtu);tport_vtable_t const tport_threadpool_vtable ={  "udp", tport_type_local,  sizeof (tport_threadpool_t),  tport_threadpool_init_primary,  tport_threadpool_deinit_primary,  NULL,  NULL,  0,				/* No secondary transports! */  NULL,  NULL,  NULL,  NULL,  NULL,  tport_recv_dgram,  tport_send_dgram,  NULL,  tport_thread_send};static int thrp_udp_init(su_root_t *, threadpool_t *);static void thrp_udp_deinit(su_root_t *, threadpool_t *);static int thrp_udp_event(threadpool_t *thrp, 			    su_wait_t *w, 			    tport_t *_tp);static int thrp_udp_recv_deliver(threadpool_t *thrp, 				 tport_t const *tp, 				 thrp_udp_deliver_t *tpd,				 int events);static int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd);#if HAVE_SIGCOMPstatic int thrp_udvm_decompress(threadpool_t *thrp, 				thrp_udp_deliver_t *tpd);#endifstatic void thrp_udp_deliver(threadpool_t *thrp,			     su_msg_r msg,			     union tport_su_msg_arg *arg);static void thrp_udp_deliver_report(threadpool_t *thrp,				    su_msg_r m,				    union tport_su_msg_arg *arg);static void thrp_udp_send(threadpool_t *thrp,			  su_msg_r msg,			  union tport_su_msg_arg *arg);static void thrp_udp_send_report(threadpool_t *thrp,				 su_msg_r msg,				 union tport_su_msg_arg *arg);/** Launch threads in the tport pool. */int tport_threadpool_init_primary(tport_primary_t *pri, 				  tp_name_t tpn[1], 				  su_addrinfo_t *ai,				  tagi_t const *tags,				  char const **return_culprit){  tport_threadpool_t *tptp = (tport_threadpool_t *)pri;  tport_t *tp = pri->pri_primary;  threadpool_t *thrp;  int i, N = tp->tp_params->tpp_thrpsize;  assert(ai->ai_socktype == SOCK_DGRAM);  if (tport_udp_init_primary(pri, tpn, ai, tags, return_culprit) < 0)    return -1;  if (N == 0)    return 0;  thrp = su_zalloc(tp->tp_home, (sizeof *thrp) * N);  if (!thrp)    return -1;  su_setblocking(tp->tp_socket, 0);  tptp->tptp_pool = thrp;  tptp->tptp_poolsize = N;  for (i = 0; i < N; i++) {#if HAVE_SIGCOMP    if (tport_has_sigcomp(tp))      thrp[i].thrp_compartment = tport_primary_compartment(tp->tp_master);#endif    thrp[i].thrp_tport = tptp;    if (su_clone_start(pri->pri_master->mr_root, 		       thrp[i].thrp_clone,		       thrp + i,		       thrp_udp_init,		       thrp_udp_deinit) < 0)      goto error;  }  tp->tp_events = 0;    return 0; error:  assert(!"tport_launch_threadpool");  return -1;}/** Kill threads in the tport pool. * * @note Executed by stack thread only. */static void tport_threadpool_deinit_primary(tport_primary_t *pri){  tport_threadpool_t *tptp = (tport_threadpool_t *)pri;  threadpool_t *thrp = tptp->tptp_pool;  int i, N = pri->tptp_poolsize;  if (!thrp)    return;  /* Prevent application from using these. */  for (i = 0; i < N; i++)    thrp[i].thrp_killing = 1;  /* Stop every task in the threadpool. */  for (i = 0; i < N; i++)     su_clone_wait(pri->pri_master->mr_root, thrp[i].thrp_clone);  su_free(pri->pri_home, tptp), tptp->tptp_pool = NULL;  tptp->tptp_poolsize = 0;  SU_DEBUG_3(("%s(%p): zapped threadpool\n", __func__, pri));}static int thrp_udp_init(su_root_t *root, threadpool_t *thrp){  tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;  su_wait_t wait[1];  assert(tp);  thrp->thrp_root = root;  if (su_wait_create(wait, tp->tp_socket, SU_WAIT_IN | SU_WAIT_ERR) < 0)    return -1;  thrp->thrp_reg = su_root_register(root, wait, thrp_udp_event, tp, 0);  if (thrp->thrp_reg  == -1)    return -1;  return 0;}static void thrp_udp_deinit(su_root_t *root, threadpool_t *thrp){  if (thrp->thrp_reg)    su_root_deregister(root, thrp->thrp_reg), thrp->thrp_reg = 0;  su_msg_destroy(thrp->thrp_rmsg);}su_inline voidthrp_yield(threadpool_t *thrp){  tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;  su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, 0);  thrp->thrp_yield = 1;}su_inline voidthrp_gain(threadpool_t *thrp){  tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary;  int events = SU_WAIT_IN | SU_WAIT_ERR;  su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, events);  thrp->thrp_yield = 0;}static int thrp_udp_event(threadpool_t *thrp, 			  su_wait_t *w, 			  tport_t *tp){#if HAVE_POLL  assert(w->fd == tp->tp_socket);#endif  for (;;) {    thrp_udp_deliver_t *tpd;    int events;    if (!*thrp->thrp_rmsg) {      if (su_msg_create(thrp->thrp_rmsg,			su_root_parent(thrp->thrp_root),			su_root_task(thrp->thrp_root),			thrp_udp_deliver,			sizeof (*tpd)) == -1) {	SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp, 		    strerror(errno)));	return 0;      }    }    tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver; assert(tpd);    tpd->tpd_thrp = thrp;    events = su_wait_events(w, tp->tp_socket);    if (!events)      return 0;    thrp_udp_recv_deliver(thrp, tp, tpd, events);    if (*thrp->thrp_rmsg) {      SU_DEBUG_7(("thrp_udp_event(%p): no msg sent\n", thrp));      tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver;      memset(tpd, 0, sizeof *tpd);      return 0;    }     if (thrp->thrp_yield || (thrp->thrp_s_sent - thrp->thrp_s_recv) > 0)      return 0;    su_wait(w, 1, 0);  }}static int thrp_udp_recv_deliver(threadpool_t *thrp, 				 tport_t const *tp, 				 thrp_udp_deliver_t *tpd,				 int events){  unsigned qlen = thrp->thrp_r_sent - thrp->thrp_r_recv;  SU_DEBUG_7(("thrp_udp_event(%p): events%s%s%s%s for %p\n", thrp,	      events & SU_WAIT_IN ? " IN" : "",	      events & SU_WAIT_HUP ? " HUP" : "",	      events & SU_WAIT_OUT ? " OUT" : "",	      events & SU_WAIT_ERR ? " ERR" : "",	      tpd));  if (events & SU_WAIT_ERR) {    tpd->tpd_errorcode = tport_udp_error(tp, tpd->tpd_name);    if (tpd->tpd_errorcode) {      if (thrp->thrp_yield)	su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report);      tpd->tpd_when = su_now();      su_msg_send(thrp->thrp_rmsg);      thrp->thrp_r_sent++;      return 0;    }  }  if (events & SU_WAIT_IN) {    if (thrp_udp_recv(thrp, tpd) < 0) {      tpd->tpd_errorcode = su_errno();      assert(tpd->tpd_errorcode);      if (su_is_blocking(tpd->tpd_errorcode))	return 0;    }     else if (tpd->tpd_msg) {      int n = msg_extract(tpd->tpd_msg); (void)n;            thrp->thrp_rcvd_msgs++;      thrp->thrp_rcvd_bytes += msg_size(tpd->tpd_msg);    }#if HAVE_SIGCOMP    if (tpd->tpd_udvm && !tpd->tpd_msg)      sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;#endif    assert(!tpd->tpd_msg || !tpd->tpd_errorcode);    if (tpd->tpd_msg || tpd->tpd_errorcode) {      if (qlen >= tp->tp_params->tpp_thrprqsize) {	SU_DEBUG_7(("tport recv queue %i: %u\n", 		    (int)(thrp - tp->tp_pri->tptp_pool), qlen));	thrp_yield(thrp);      }      if (qlen >= tp->tp_params->tpp_thrprqsize / 2)	su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report);      tpd->tpd_when = su_now();      su_msg_send(thrp->thrp_rmsg);      thrp->thrp_r_sent++;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -