📄 tport_threadpool.c
字号:
/* * This file is part of the Sofia-SIP package * * Copyright (C) 2006 Nokia Corporation. * * Contact: Pekka Pessi <pekka.pessi@nokia.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * *//**@CFILE tport_threadpool.c Multithreading transport * * See tport.docs for more detailed description of tport interface. * * @author Pekka Pessi <Pekka.Pessi@nokia.com> * @author Martti Mela <Martti.Mela@nokia.com> * * @date Created: Fri Mar 24 08:45:49 EET 2006 ppessi */#include "config.h"#undef HAVE_SIGCOMP #define SU_ROOT_MAGIC_T struct tport_threadpool#define SU_WAKEUP_ARG_T struct tport_s#define SU_MSG_ARG_T union tport_su_msg_arg#include "tport_internal.h"#include <stdlib.h>#include <time.h>#include <assert.h>#include <errno.h>#include <limits.h>#if HAVE_FUNC#elif HAVE_FUNCTION#define __func__ __FUNCTION__#elsestatic char const __func__[] = "tport_threadpool";#endif/* ==== Thread pools =================================================== */typedef struct threadpool threadpool_t;typedef struct { tport_primary_t tptp_primary; threadpool_t *tptp_pool; /**< Worker threads */ unsigned tptp_poolsize;} tport_threadpool_t;struct threadpool{ /* Shared */ su_clone_r thrp_clone; tport_threadpool_t *thrp_tport; int thrp_killing; /* Threadpool is being killed */ /* Private variables */ su_root_t *thrp_root; int thrp_reg; struct sigcomp_compartment *thrp_compartment; su_msg_r thrp_rmsg; /* Slave thread counters */ int thrp_r_sent; int thrp_s_recv; unsigned thrp_rcvd_msgs; unsigned thrp_rcvd_bytes; /* Master thread counters */ int thrp_s_sent; int thrp_r_recv; int thrp_yield;};typedef struct { threadpool_t *tpd_thrp; int tpd_errorcode; msg_t *tpd_msg; su_time_t tpd_when; unsigned tpd_mtu;#if HAVE_SIGCOMP struct sigcomp_compartment *tpd_cc;#endif struct sigcomp_udvm *tpd_udvm; socklen_t tpd_namelen; su_sockaddr_t tpd_name[1];} thrp_udp_deliver_t;union tport_su_msg_arg{ threadpool_t *thrp; thrp_udp_deliver_t thrp_udp_deliver[1];};int tport_threadpool_init_primary(tport_primary_t *, tp_name_t tpn[1], su_addrinfo_t *, tagi_t const *, char const **return_culprit);static void tport_threadpool_deinit_primary(tport_primary_t *pri);static int tport_thread_send(tport_t *tp, msg_t *msg, tp_name_t const *tpn, struct sigcomp_compartment *cc, unsigned mtu);tport_vtable_t const tport_threadpool_vtable ={ "udp", tport_type_local, sizeof (tport_threadpool_t), tport_threadpool_init_primary, tport_threadpool_deinit_primary, NULL, NULL, 0, /* No secondary transports! */ NULL, NULL, NULL, NULL, NULL, tport_recv_dgram, tport_send_dgram, NULL, tport_thread_send};static int thrp_udp_init(su_root_t *, threadpool_t *);static void thrp_udp_deinit(su_root_t *, threadpool_t *);static int thrp_udp_event(threadpool_t *thrp, su_wait_t *w, tport_t *_tp);static int thrp_udp_recv_deliver(threadpool_t *thrp, tport_t const *tp, thrp_udp_deliver_t *tpd, int events);static int thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd);#if HAVE_SIGCOMPstatic int thrp_udvm_decompress(threadpool_t *thrp, thrp_udp_deliver_t *tpd);#endifstatic void thrp_udp_deliver(threadpool_t *thrp, su_msg_r msg, union tport_su_msg_arg *arg);static void thrp_udp_deliver_report(threadpool_t *thrp, su_msg_r m, union tport_su_msg_arg *arg);static void thrp_udp_send(threadpool_t *thrp, su_msg_r msg, union tport_su_msg_arg *arg);static void thrp_udp_send_report(threadpool_t *thrp, su_msg_r msg, union tport_su_msg_arg *arg);/** Launch threads in the tport pool. */int tport_threadpool_init_primary(tport_primary_t *pri, tp_name_t tpn[1], su_addrinfo_t *ai, tagi_t const *tags, char const **return_culprit){ tport_threadpool_t *tptp = (tport_threadpool_t *)pri; tport_t *tp = pri->pri_primary; threadpool_t *thrp; int i, N = tp->tp_params->tpp_thrpsize; assert(ai->ai_socktype == SOCK_DGRAM); if (tport_udp_init_primary(pri, tpn, ai, tags, return_culprit) < 0) return -1; if (N == 0) return 0; thrp = su_zalloc(tp->tp_home, (sizeof *thrp) * N); if (!thrp) return -1; su_setblocking(tp->tp_socket, 0); tptp->tptp_pool = thrp; tptp->tptp_poolsize = N; for (i = 0; i < N; i++) {#if HAVE_SIGCOMP if (tport_has_sigcomp(tp)) thrp[i].thrp_compartment = tport_primary_compartment(tp->tp_master);#endif thrp[i].thrp_tport = tptp; if (su_clone_start(pri->pri_master->mr_root, thrp[i].thrp_clone, thrp + i, thrp_udp_init, thrp_udp_deinit) < 0) goto error; } tp->tp_events = 0; return 0; error: assert(!"tport_launch_threadpool"); return -1;}/** Kill threads in the tport pool. * * @note Executed by stack thread only. */static void tport_threadpool_deinit_primary(tport_primary_t *pri){ tport_threadpool_t *tptp = (tport_threadpool_t *)pri; threadpool_t *thrp = tptp->tptp_pool; int i, N = pri->tptp_poolsize; if (!thrp) return; /* Prevent application from using these. */ for (i = 0; i < N; i++) thrp[i].thrp_killing = 1; /* Stop every task in the threadpool. */ for (i = 0; i < N; i++) su_clone_wait(pri->pri_master->mr_root, thrp[i].thrp_clone); su_free(pri->pri_home, tptp), tptp->tptp_pool = NULL; tptp->tptp_poolsize = 0; SU_DEBUG_3(("%s(%p): zapped threadpool\n", __func__, pri));}static int thrp_udp_init(su_root_t *root, threadpool_t *thrp){ tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary; su_wait_t wait[1]; assert(tp); thrp->thrp_root = root; if (su_wait_create(wait, tp->tp_socket, SU_WAIT_IN | SU_WAIT_ERR) < 0) return -1; thrp->thrp_reg = su_root_register(root, wait, thrp_udp_event, tp, 0); if (thrp->thrp_reg == -1) return -1; return 0;}static void thrp_udp_deinit(su_root_t *root, threadpool_t *thrp){ if (thrp->thrp_reg) su_root_deregister(root, thrp->thrp_reg), thrp->thrp_reg = 0; su_msg_destroy(thrp->thrp_rmsg);}su_inline voidthrp_yield(threadpool_t *thrp){ tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary; su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, 0); thrp->thrp_yield = 1;}su_inline voidthrp_gain(threadpool_t *thrp){ tport_t *tp = thrp->thrp_tport->tptp_primary->pri_primary; int events = SU_WAIT_IN | SU_WAIT_ERR; su_root_eventmask(thrp->thrp_root, thrp->thrp_reg, tp->tp_socket, events); thrp->thrp_yield = 0;}static int thrp_udp_event(threadpool_t *thrp, su_wait_t *w, tport_t *tp){#if HAVE_POLL assert(w->fd == tp->tp_socket);#endif for (;;) { thrp_udp_deliver_t *tpd; int events; if (!*thrp->thrp_rmsg) { if (su_msg_create(thrp->thrp_rmsg, su_root_parent(thrp->thrp_root), su_root_task(thrp->thrp_root), thrp_udp_deliver, sizeof (*tpd)) == -1) { SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp, strerror(errno))); return 0; } } tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver; assert(tpd); tpd->tpd_thrp = thrp; events = su_wait_events(w, tp->tp_socket); if (!events) return 0; thrp_udp_recv_deliver(thrp, tp, tpd, events); if (*thrp->thrp_rmsg) { SU_DEBUG_7(("thrp_udp_event(%p): no msg sent\n", thrp)); tpd = su_msg_data(thrp->thrp_rmsg)->thrp_udp_deliver; memset(tpd, 0, sizeof *tpd); return 0; } if (thrp->thrp_yield || (thrp->thrp_s_sent - thrp->thrp_s_recv) > 0) return 0; su_wait(w, 1, 0); }}static int thrp_udp_recv_deliver(threadpool_t *thrp, tport_t const *tp, thrp_udp_deliver_t *tpd, int events){ unsigned qlen = thrp->thrp_r_sent - thrp->thrp_r_recv; SU_DEBUG_7(("thrp_udp_event(%p): events%s%s%s%s for %p\n", thrp, events & SU_WAIT_IN ? " IN" : "", events & SU_WAIT_HUP ? " HUP" : "", events & SU_WAIT_OUT ? " OUT" : "", events & SU_WAIT_ERR ? " ERR" : "", tpd)); if (events & SU_WAIT_ERR) { tpd->tpd_errorcode = tport_udp_error(tp, tpd->tpd_name); if (tpd->tpd_errorcode) { if (thrp->thrp_yield) su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report); tpd->tpd_when = su_now(); su_msg_send(thrp->thrp_rmsg); thrp->thrp_r_sent++; return 0; } } if (events & SU_WAIT_IN) { if (thrp_udp_recv(thrp, tpd) < 0) { tpd->tpd_errorcode = su_errno(); assert(tpd->tpd_errorcode); if (su_is_blocking(tpd->tpd_errorcode)) return 0; } else if (tpd->tpd_msg) { int n = msg_extract(tpd->tpd_msg); (void)n; thrp->thrp_rcvd_msgs++; thrp->thrp_rcvd_bytes += msg_size(tpd->tpd_msg); }#if HAVE_SIGCOMP if (tpd->tpd_udvm && !tpd->tpd_msg) sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;#endif assert(!tpd->tpd_msg || !tpd->tpd_errorcode); if (tpd->tpd_msg || tpd->tpd_errorcode) { if (qlen >= tp->tp_params->tpp_thrprqsize) { SU_DEBUG_7(("tport recv queue %i: %u\n", (int)(thrp - tp->tp_pri->tptp_pool), qlen)); thrp_yield(thrp); } if (qlen >= tp->tp_params->tpp_thrprqsize / 2) su_msg_report(thrp->thrp_rmsg, thrp_udp_deliver_report); tpd->tpd_when = su_now(); su_msg_send(thrp->thrp_rmsg); thrp->thrp_r_sent++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -