⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tport_threadpool.c

📁 Sofia SIP is an open-source SIP User-Agent library, compliant with the IETF RFC3261 specification.
💻 C
📖 第 1 页 / 共 2 页
字号:
      return 0;    }  }  return 0;}#include <pthread.h>/** Mutex for reading from socket */static pthread_mutex_t mutex[1] = { PTHREAD_MUTEX_INITIALIZER };/** Receive a UDP packet by threadpool. */staticint thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd){  tport_t const *tp = thrp->thrp_tport->pri_primary;  unsigned char sample[2];  int N;  int s = tp->tp_socket;  pthread_mutex_lock(mutex);  /* Simulate packet loss */  if (tp->tp_params->tpp_drop &&       su_randint(0, 1000) < tp->tp_params->tpp_drop) {    recv(s, sample, 1, 0);    pthread_mutex_unlock(mutex);    SU_DEBUG_3(("tport(%p): simulated packet loss!\n", tp));    return 0;  }  /* Peek for first two bytes in message:     determine if this is stun, sigcomp or sip  */  N = recv(s, sample, sizeof sample, MSG_PEEK | MSG_TRUNC);  if (N < 0) {    if (su_is_blocking(su_errno()))      N = 0;  }  else if (N <= 1) {    SU_DEBUG_1(("%s(%p): runt of %u bytes\n", "thrp_udp_recv", thrp, N));    recv(s, sample, sizeof sample, 0);    N = 0;  }#if !HAVE_MSG_TRUNC  else if ((N = su_getmsgsize(tp->tp_socket)) < 0)    ;#endif  else if ((sample[0] & 0xf8) == 0xf8) {#if HAVE_SIGCOMP    if (thrp->thrp_compartment) {      struct sigcomp_buffer *input;      void *data;      int dlen;      tpd->tpd_udvm = 	sigcomp_udvm_create_for_compartment(thrp->thrp_compartment);      input = sigcomp_udvm_input_buffer(tpd->tpd_udvm, N); assert(input);      data = input->b_data + input->b_avail;      dlen = input->b_size - input->b_avail;      if (dlen < N)	dlen = 0;      tpd->tpd_namelen = sizeof(tpd->tpd_name);          dlen = recvfrom(tp->tp_socket, data, dlen, 0, 		      &tpd->tpd_name->su_sa, &tpd->tpd_namelen);      SU_CANONIZE_SOCKADDR(tpd->tpd_name);            if (dlen < N) {	su_seterrno(EMSGSIZE);		/* Protocol error */	N = -1;      } else if (dlen == -1) 	N = -1;      else {	input->b_avail += dlen; 	input->b_complete = 1;		pthread_mutex_unlock(mutex);      	N = thrp_udvm_decompress(thrp, tpd);	if (N == -1)	  /* Do not report decompression errors as ICMP errors */	  memset(tpd->tpd_name, 0, tpd->tpd_namelen);	return N;      }      pthread_mutex_unlock(mutex);      return N;    }#endif    recv(s, sample, 1, 0);    pthread_mutex_unlock(mutex);    /* XXX - send NACK ? */    su_seterrno(EBADMSG);    N = -1;  }  else {    /* receive as usual */    N = tport_recv_dgram_r(tp, &tpd->tpd_msg, N);  }     pthread_mutex_unlock(mutex);  return N;}#if HAVE_SIGCOMPstaticint thrp_udvm_decompress(threadpool_t *thrp, thrp_udp_deliver_t *tpd){  struct sigcomp_udvm *udvm = tpd->tpd_udvm;  struct sigcomp_buffer *output;  msg_iovec_t iovec[msg_n_fragments] = {{ 0 }};  su_addrinfo_t *ai;  tport_t *tp = thrp->thrp_tport->pri_primary;  size_t n, m, i, dlen;  int eos;  void *data;  ssize_t veclen;  output = sigcomp_udvm_output_buffer(udvm, -1);    if (sigcomp_udvm_decompress(udvm, output, NULL) < 0) {    int error = sigcomp_udvm_errno(udvm);    SU_DEBUG_3(("%s: UDVM error %d: %s\n", __func__,		error, sigcomp_udvm_strerror(udvm)));    su_seterrno(EREMOTEIO);    return -1;  }   data = output->b_data + output->b_used;  dlen = output->b_avail - output->b_used;  /* XXX - if a message is larger than default output size... */  eos = output->b_complete; assert(output->b_complete);      veclen = tport_recv_iovec(tp, &tpd->tpd_msg, iovec, dlen, eos);      if (veclen <= 0) {    n = -1;  } else {    for (i = 0, n = 0; i < veclen; i++) {      m = iovec[i].mv_len; assert(dlen >= n + m);      memcpy(iovec[i].mv_base, data + n, m);      n += m;    }    assert(dlen == n);    msg_recv_commit(tpd->tpd_msg, dlen, eos);    /* Mark buffer as used */        /* Message address */    ai = msg_addrinfo(tpd->tpd_msg);    ai->ai_flags |= TP_AI_COMPRESSED;    ai->ai_family = tpd->tpd_name->su_sa.sa_family;    ai->ai_socktype = SOCK_DGRAM;    ai->ai_protocol = IPPROTO_UDP;    memcpy(ai->ai_addr, tpd->tpd_name, ai->ai_addrlen = tpd->tpd_namelen);    SU_DEBUG_9(("%s(%p): sigcomp msg sz = %d\n", __func__, tp, n));  }  return n;}#endif/** Deliver message from threadpool to the stack * * @note Executed by stack thread only. */static void thrp_udp_deliver(su_root_magic_t *magic,		      su_msg_r m,		      union tport_su_msg_arg *arg){  thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;  threadpool_t *thrp = tpd->tpd_thrp;  tport_t *tp = thrp->thrp_tport->pri_primary;  su_time_t now = su_now();  assert(magic != thrp);  thrp->thrp_r_recv++;  if (thrp->thrp_killing) {#if HAVE_SIGCOMP    sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;#endif    msg_destroy(tpd->tpd_msg);    return;  }  SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n", 	      thrp, tpd, 1000 * su_time_diff(now, tpd->tpd_when)));  if (tpd->tpd_errorcode)    tport_error_report(tp, tpd->tpd_errorcode, tpd->tpd_name);  else if (tpd->tpd_msg) {    tport_deliver(tp, tpd->tpd_msg, NULL, &tpd->tpd_udvm, tpd->tpd_when);    tp->tp_rlogged = NULL;  }#if HAVE_SIGCOMP   if (tpd->tpd_udvm) {    sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;  }#endif}static void thrp_udp_deliver_report(threadpool_t *thrp,			     su_msg_r m,			     union tport_su_msg_arg *arg){  if (thrp->thrp_yield) {    int qlen = thrp->thrp_r_sent - thrp->thrp_r_recv;    int qsize = thrp->thrp_tport->pri_params->tpp_thrprqsize;    if (qlen == 0 || qlen < qsize / 2)      thrp_gain(thrp);  }}/** Send a message to network using threadpool. * * @note Executed by stack thread only. */staticint tport_thread_send(tport_t *tp,		      msg_t *msg,		      tp_name_t const *tpn, 		      struct sigcomp_compartment *cc,		      unsigned mtu){    threadpool_t *thrp = tp->tp_pri->tptp_pool;  thrp_udp_deliver_t *tpd;  int i, N = tp->tp_pri->tptp_poolsize;  su_msg_r m;  unsigned totalqlen = 0;  unsigned qlen;  if (!tp->tp_pri->tptp_pool)    return tport_prepare_and_send(tp, msg, tpn, cc, mtu);  SU_DEBUG_9(("tport_thread_send()\n"));  if (thrp->thrp_killing)    return (su_seterrno(ECHILD)), -1;  qlen = totalqlen = thrp->thrp_s_sent - thrp->thrp_s_recv;  /* Select thread with shortest queue */   for (i = 1; i < N; i++) {    threadpool_t *other = tp->tp_pri->tptp_pool + i;    unsigned len = other->thrp_s_sent - other->thrp_s_recv;    if (len < qlen || 	(len == qlen && (other->thrp_s_sent - thrp->thrp_s_sent) < 0))      thrp = other, qlen = len;    totalqlen += len;  }  if (totalqlen >= N * tp->tp_params->tpp_qsize)    SU_DEBUG_3(("tport send queue: %u (shortest %u)\n", totalqlen, qlen));  if (su_msg_create(m,		    su_clone_task(thrp->thrp_clone),		    su_root_task(tp->tp_master->mr_root),		    thrp_udp_send,		    sizeof (*tpd)) != su_success) {    SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp, 		strerror(errno)));    return -1;  }  tpd = su_msg_data(m)->thrp_udp_deliver;  tpd->tpd_thrp = thrp;  tpd->tpd_when = su_now();  tpd->tpd_mtu = mtu;  tpd->tpd_msg = msg_ref_create(msg);#if HAVE_SIGCOMP  tpd->tpd_cc = cc;#endif  su_msg_report(m, thrp_udp_send_report);  if (su_msg_send(m) == su_success) {    thrp->thrp_s_sent++;    return 0;  }  msg_ref_destroy(msg);  return -1;}/** thrp_udp_send() is run by threadpool to send the message. */static void thrp_udp_send(threadpool_t *thrp,		   su_msg_r m,		   union tport_su_msg_arg *arg){  thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;  tport_t *tp = thrp->thrp_tport->pri_primary;  msg_t *msg = tpd->tpd_msg;  msg_iovec_t *iov, auto_iov[40], *iov0 = NULL;  int iovlen, iovused, n;  assert(thrp == tpd->tpd_thrp);  thrp->thrp_s_recv++;  {     double delay = 1000 * su_time_diff(su_now(), tpd->tpd_when);    if (delay > 100)      SU_DEBUG_3(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay));    else      SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay));  }  if (!msg) {    tpd->tpd_errorcode = EINVAL;    return;  }  /* Prepare message for sending - i.e., encode it */  if (msg_prepare(msg) < 0) {    tpd->tpd_errorcode = errno;    return;  }  if (tpd->tpd_mtu != 0 && msg_size(msg) > tpd->tpd_mtu) {    tpd->tpd_errorcode = EMSGSIZE;    return;  }  /* Use initially the I/O vector from stack */  iov = auto_iov, iovlen = sizeof(auto_iov)/sizeof(auto_iov[0]);  /* Get a iovec for message contents */  for (;;) {    iovused = msg_iovec(msg, iov, iovlen);    if (iovused <= iovlen)       break;    iov = iov0 = realloc(iov0, sizeof(*iov) * iovused);    iovlen = iovused;    if (iov0 == NULL) {      tpd->tpd_errorcode = errno;      return;    }  }  assert(iovused > 0);  tpd->tpd_when = su_now();  if (0)    ;#if HAVE_SIGCOMP  else if (tpd->tpd_cc) {    tport_sigcomp_t sc[1] = {{ NULL }};    n = tport_sigcomp_vsend(tp, msg, iov, iovused, tpd->tpd_cc, sc);  } #endif  else     n = tport_send_dgram(tp, msg, iov, iovused);  if (n == -1)    tpd->tpd_errorcode = su_errno();  if (iov0)    free(iov0);}static void thrp_udp_send_report(su_root_magic_t *magic,			  su_msg_r msg,			  union tport_su_msg_arg *arg){  thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver;  threadpool_t *thrp = tpd->tpd_thrp;  tport_t *tp = thrp->thrp_tport->pri_primary;  assert(magic != thrp);  SU_DEBUG_7(("thrp_udp_send_report(%p): got %p delay %f\n", 	      thrp, tpd, 1000 * su_time_diff(su_now(), tpd->tpd_when)));  if (tp->tp_master->mr_log)    tport_log_msg(tp, tpd->tpd_msg, "sent", "to", tpd->tpd_when);  if (tpd->tpd_errorcode)    tport_error_report(tp, tpd->tpd_errorcode, msg_addr(tpd->tpd_msg));  msg_ref_destroy(tpd->tpd_msg);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -