📄 tport_threadpool.c
字号:
return 0; } } return 0;}#include <pthread.h>/** Mutex for reading from socket */static pthread_mutex_t mutex[1] = { PTHREAD_MUTEX_INITIALIZER };/** Receive a UDP packet by threadpool. */staticint thrp_udp_recv(threadpool_t *thrp, thrp_udp_deliver_t *tpd){ tport_t const *tp = thrp->thrp_tport->pri_primary; unsigned char sample[2]; int N; int s = tp->tp_socket; pthread_mutex_lock(mutex); /* Simulate packet loss */ if (tp->tp_params->tpp_drop && su_randint(0, 1000) < tp->tp_params->tpp_drop) { recv(s, sample, 1, 0); pthread_mutex_unlock(mutex); SU_DEBUG_3(("tport(%p): simulated packet loss!\n", tp)); return 0; } /* Peek for first two bytes in message: determine if this is stun, sigcomp or sip */ N = recv(s, sample, sizeof sample, MSG_PEEK | MSG_TRUNC); if (N < 0) { if (su_is_blocking(su_errno())) N = 0; } else if (N <= 1) { SU_DEBUG_1(("%s(%p): runt of %u bytes\n", "thrp_udp_recv", thrp, N)); recv(s, sample, sizeof sample, 0); N = 0; }#if !HAVE_MSG_TRUNC else if ((N = su_getmsgsize(tp->tp_socket)) < 0) ;#endif else if ((sample[0] & 0xf8) == 0xf8) {#if HAVE_SIGCOMP if (thrp->thrp_compartment) { struct sigcomp_buffer *input; void *data; int dlen; tpd->tpd_udvm = sigcomp_udvm_create_for_compartment(thrp->thrp_compartment); input = sigcomp_udvm_input_buffer(tpd->tpd_udvm, N); assert(input); data = input->b_data + input->b_avail; dlen = input->b_size - input->b_avail; if (dlen < N) dlen = 0; tpd->tpd_namelen = sizeof(tpd->tpd_name); dlen = recvfrom(tp->tp_socket, data, dlen, 0, &tpd->tpd_name->su_sa, &tpd->tpd_namelen); SU_CANONIZE_SOCKADDR(tpd->tpd_name); if (dlen < N) { su_seterrno(EMSGSIZE); /* Protocol error */ N = -1; } else if (dlen == -1) N = -1; else { input->b_avail += dlen; input->b_complete = 1; pthread_mutex_unlock(mutex); N = thrp_udvm_decompress(thrp, tpd); if (N == -1) /* Do not report decompression errors as ICMP errors */ memset(tpd->tpd_name, 0, tpd->tpd_namelen); return N; } pthread_mutex_unlock(mutex); return N; }#endif recv(s, sample, 1, 0); pthread_mutex_unlock(mutex); /* XXX - send NACK ? */ su_seterrno(EBADMSG); N = -1; } else { /* receive as usual */ N = tport_recv_dgram_r(tp, &tpd->tpd_msg, N); } pthread_mutex_unlock(mutex); return N;}#if HAVE_SIGCOMPstaticint thrp_udvm_decompress(threadpool_t *thrp, thrp_udp_deliver_t *tpd){ struct sigcomp_udvm *udvm = tpd->tpd_udvm; struct sigcomp_buffer *output; msg_iovec_t iovec[msg_n_fragments] = {{ 0 }}; su_addrinfo_t *ai; tport_t *tp = thrp->thrp_tport->pri_primary; size_t n, m, i, dlen; int eos; void *data; ssize_t veclen; output = sigcomp_udvm_output_buffer(udvm, -1); if (sigcomp_udvm_decompress(udvm, output, NULL) < 0) { int error = sigcomp_udvm_errno(udvm); SU_DEBUG_3(("%s: UDVM error %d: %s\n", __func__, error, sigcomp_udvm_strerror(udvm))); su_seterrno(EREMOTEIO); return -1; } data = output->b_data + output->b_used; dlen = output->b_avail - output->b_used; /* XXX - if a message is larger than default output size... */ eos = output->b_complete; assert(output->b_complete); veclen = tport_recv_iovec(tp, &tpd->tpd_msg, iovec, dlen, eos); if (veclen <= 0) { n = -1; } else { for (i = 0, n = 0; i < veclen; i++) { m = iovec[i].mv_len; assert(dlen >= n + m); memcpy(iovec[i].mv_base, data + n, m); n += m; } assert(dlen == n); msg_recv_commit(tpd->tpd_msg, dlen, eos); /* Mark buffer as used */ /* Message address */ ai = msg_addrinfo(tpd->tpd_msg); ai->ai_flags |= TP_AI_COMPRESSED; ai->ai_family = tpd->tpd_name->su_sa.sa_family; ai->ai_socktype = SOCK_DGRAM; ai->ai_protocol = IPPROTO_UDP; memcpy(ai->ai_addr, tpd->tpd_name, ai->ai_addrlen = tpd->tpd_namelen); SU_DEBUG_9(("%s(%p): sigcomp msg sz = %d\n", __func__, tp, n)); } return n;}#endif/** Deliver message from threadpool to the stack * * @note Executed by stack thread only. */static void thrp_udp_deliver(su_root_magic_t *magic, su_msg_r m, union tport_su_msg_arg *arg){ thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver; threadpool_t *thrp = tpd->tpd_thrp; tport_t *tp = thrp->thrp_tport->pri_primary; su_time_t now = su_now(); assert(magic != thrp); thrp->thrp_r_recv++; if (thrp->thrp_killing) {#if HAVE_SIGCOMP sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL;#endif msg_destroy(tpd->tpd_msg); return; } SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, 1000 * su_time_diff(now, tpd->tpd_when))); if (tpd->tpd_errorcode) tport_error_report(tp, tpd->tpd_errorcode, tpd->tpd_name); else if (tpd->tpd_msg) { tport_deliver(tp, tpd->tpd_msg, NULL, &tpd->tpd_udvm, tpd->tpd_when); tp->tp_rlogged = NULL; }#if HAVE_SIGCOMP if (tpd->tpd_udvm) { sigcomp_udvm_free(tpd->tpd_udvm), tpd->tpd_udvm = NULL; }#endif}static void thrp_udp_deliver_report(threadpool_t *thrp, su_msg_r m, union tport_su_msg_arg *arg){ if (thrp->thrp_yield) { int qlen = thrp->thrp_r_sent - thrp->thrp_r_recv; int qsize = thrp->thrp_tport->pri_params->tpp_thrprqsize; if (qlen == 0 || qlen < qsize / 2) thrp_gain(thrp); }}/** Send a message to network using threadpool. * * @note Executed by stack thread only. */staticint tport_thread_send(tport_t *tp, msg_t *msg, tp_name_t const *tpn, struct sigcomp_compartment *cc, unsigned mtu){ threadpool_t *thrp = tp->tp_pri->tptp_pool; thrp_udp_deliver_t *tpd; int i, N = tp->tp_pri->tptp_poolsize; su_msg_r m; unsigned totalqlen = 0; unsigned qlen; if (!tp->tp_pri->tptp_pool) return tport_prepare_and_send(tp, msg, tpn, cc, mtu); SU_DEBUG_9(("tport_thread_send()\n")); if (thrp->thrp_killing) return (su_seterrno(ECHILD)), -1; qlen = totalqlen = thrp->thrp_s_sent - thrp->thrp_s_recv; /* Select thread with shortest queue */ for (i = 1; i < N; i++) { threadpool_t *other = tp->tp_pri->tptp_pool + i; unsigned len = other->thrp_s_sent - other->thrp_s_recv; if (len < qlen || (len == qlen && (other->thrp_s_sent - thrp->thrp_s_sent) < 0)) thrp = other, qlen = len; totalqlen += len; } if (totalqlen >= N * tp->tp_params->tpp_qsize) SU_DEBUG_3(("tport send queue: %u (shortest %u)\n", totalqlen, qlen)); if (su_msg_create(m, su_clone_task(thrp->thrp_clone), su_root_task(tp->tp_master->mr_root), thrp_udp_send, sizeof (*tpd)) != su_success) { SU_DEBUG_1(("thrp_udp_event(%p): su_msg_create(): %s\n", thrp, strerror(errno))); return -1; } tpd = su_msg_data(m)->thrp_udp_deliver; tpd->tpd_thrp = thrp; tpd->tpd_when = su_now(); tpd->tpd_mtu = mtu; tpd->tpd_msg = msg_ref_create(msg);#if HAVE_SIGCOMP tpd->tpd_cc = cc;#endif su_msg_report(m, thrp_udp_send_report); if (su_msg_send(m) == su_success) { thrp->thrp_s_sent++; return 0; } msg_ref_destroy(msg); return -1;}/** thrp_udp_send() is run by threadpool to send the message. */static void thrp_udp_send(threadpool_t *thrp, su_msg_r m, union tport_su_msg_arg *arg){ thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver; tport_t *tp = thrp->thrp_tport->pri_primary; msg_t *msg = tpd->tpd_msg; msg_iovec_t *iov, auto_iov[40], *iov0 = NULL; int iovlen, iovused, n; assert(thrp == tpd->tpd_thrp); thrp->thrp_s_recv++; { double delay = 1000 * su_time_diff(su_now(), tpd->tpd_when); if (delay > 100) SU_DEBUG_3(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay)); else SU_DEBUG_7(("thrp_udp_deliver(%p): got %p delay %f\n", thrp, tpd, delay)); } if (!msg) { tpd->tpd_errorcode = EINVAL; return; } /* Prepare message for sending - i.e., encode it */ if (msg_prepare(msg) < 0) { tpd->tpd_errorcode = errno; return; } if (tpd->tpd_mtu != 0 && msg_size(msg) > tpd->tpd_mtu) { tpd->tpd_errorcode = EMSGSIZE; return; } /* Use initially the I/O vector from stack */ iov = auto_iov, iovlen = sizeof(auto_iov)/sizeof(auto_iov[0]); /* Get a iovec for message contents */ for (;;) { iovused = msg_iovec(msg, iov, iovlen); if (iovused <= iovlen) break; iov = iov0 = realloc(iov0, sizeof(*iov) * iovused); iovlen = iovused; if (iov0 == NULL) { tpd->tpd_errorcode = errno; return; } } assert(iovused > 0); tpd->tpd_when = su_now(); if (0) ;#if HAVE_SIGCOMP else if (tpd->tpd_cc) { tport_sigcomp_t sc[1] = {{ NULL }}; n = tport_sigcomp_vsend(tp, msg, iov, iovused, tpd->tpd_cc, sc); } #endif else n = tport_send_dgram(tp, msg, iov, iovused); if (n == -1) tpd->tpd_errorcode = su_errno(); if (iov0) free(iov0);}static void thrp_udp_send_report(su_root_magic_t *magic, su_msg_r msg, union tport_su_msg_arg *arg){ thrp_udp_deliver_t *tpd = arg->thrp_udp_deliver; threadpool_t *thrp = tpd->tpd_thrp; tport_t *tp = thrp->thrp_tport->pri_primary; assert(magic != thrp); SU_DEBUG_7(("thrp_udp_send_report(%p): got %p delay %f\n", thrp, tpd, 1000 * su_time_diff(su_now(), tpd->tpd_when))); if (tp->tp_master->mr_log) tport_log_msg(tp, tpd->tpd_msg, "sent", "to", tpd->tpd_when); if (tpd->tpd_errorcode) tport_error_report(tp, tpd->tpd_errorcode, msg_addr(tpd->tpd_msg)); msg_ref_destroy(tpd->tpd_msg);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -