📄 sip_transport_udp.c
字号:
/* $Id: sip_transport_udp.c 974 2007-02-19 01:13:53Z bennylp $ *//* * Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <pjsip/sip_transport.h>#include <pjsip/sip_endpoint.h>#include <pjsip/sip_errno.h>#include <pj/addr_resolv.h>#include <pj/assert.h>#include <pj/lock.h>#include <pj/log.h>#include <pj/os.h>#include <pj/pool.h>#include <pj/sock.h>#include <pj/compat/socket.h>#include <pj/string.h>#define THIS_FILE "sip_transport_udp.c"/** * These are the target values for socket send and receive buffer sizes, * respectively. They will be applied to UDP socket with setsockopt(). * When transport failed to set these size, it will decrease it until * sufficiently large number has been successfully set. * * The buffer size is important, especially in WinXP/2000 machines. * Basicly the lower the size, the more packets will be lost (dropped?) * when we're sending (receiving?) packets in large volumes. * * The figure here is taken based on my experiment on WinXP/2000 machine, * and with this value, the rate of dropped packet is about 8% when * sending 1800 requests simultaneously (percentage taken as average * after 50K requests or so). * * More experiments are needed probably. */#ifndef PJSIP_UDP_SO_SNDBUF_SIZE# define PJSIP_UDP_SO_SNDBUF_SIZE (24*1024*1024)#endif#ifndef PJSIP_UDP_SO_RCVBUF_SIZE# define PJSIP_UDP_SO_RCVBUF_SIZE (24*1024*1024)#endif/* Struct udp_transport "inherits" struct pjsip_transport */struct udp_transport{ pjsip_transport base; pj_sock_t sock; pj_ioqueue_key_t *key; int rdata_cnt; pjsip_rx_data **rdata; int is_closing;};/* * Initialize transport's receive buffer from the specified pool. */static void init_rdata(struct udp_transport *tp, unsigned rdata_index, pj_pool_t *pool, pjsip_rx_data **p_rdata){ pjsip_rx_data *rdata; /* Reset pool. */ //note: already done by caller //pj_pool_reset(pool); rdata = pj_pool_zalloc(pool, sizeof(pjsip_rx_data)); /* Init tp_info part. */ rdata->tp_info.pool = pool; rdata->tp_info.transport = &tp->base; rdata->tp_info.tp_data = (void*)(long)rdata_index; rdata->tp_info.op_key.rdata = rdata; pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t)); tp->rdata[rdata_index] = rdata; if (p_rdata) *p_rdata = rdata;}/* * udp_on_read_complete() * * This is callback notification from ioqueue that a pending recvfrom() * operation has completed. */static void udp_on_read_complete( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read){ enum { MAX_IMMEDIATE_PACKET = 10 }; pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; pjsip_rx_data *rdata = rdata_op_key->rdata; struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport; int i; pj_status_t status; /* Don't do anything if transport is closing. */ if (tp->is_closing) { tp->is_closing++; return; } /* * The idea of the loop is to process immediate data received by * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to * complete asynchronously, to allow other sockets to get their data. */ for (i=0;; ++i) { pj_uint32_t flags; /* Report the packet to transport manager. */ if (bytes_read > 0) { pj_size_t size_eaten; const pj_sockaddr_in *src_addr = (pj_sockaddr_in*)&rdata->pkt_info.src_addr; /* Init pkt_info part. */ rdata->pkt_info.len = bytes_read; rdata->pkt_info.zero = 0; pj_gettimeofday(&rdata->pkt_info.timestamp); pj_ansi_strcpy(rdata->pkt_info.src_name, pj_inet_ntoa(src_addr->sin_addr)); rdata->pkt_info.src_port = pj_ntohs(src_addr->sin_port); size_eaten = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata); if (size_eaten < 0) { pj_assert(!"It shouldn't happen!"); size_eaten = rdata->pkt_info.len; } /* Since this is UDP, the whole buffer is the message. */ rdata->pkt_info.len = 0; } else if (bytes_read == 0) { /* TODO: */ } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { /* Report error to endpoint. */ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, -bytes_read, "Warning: pj_ioqueue_recvfrom()" " callback error")); } if (i >= MAX_IMMEDIATE_PACKET) { /* Force ioqueue_recvfrom() to return PJ_EPENDING */ flags = PJ_IOQUEUE_ALWAYS_ASYNC; } else { flags = 0; } /* Reset pool. * Need to copy rdata fields to temp variable because they will * be invalid after pj_pool_reset(). */ { pj_pool_t *rdata_pool = rdata->tp_info.pool; struct udp_transport *rdata_tp ; unsigned rdata_index; rdata_tp = (struct udp_transport*)rdata->tp_info.transport; rdata_index = (unsigned)(unsigned long)rdata->tp_info.tp_data; pj_pool_reset(rdata_pool); init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata); /* Change some vars to point to new location after * pool reset. */ op_key = &rdata->tp_info.op_key.op_key; } /* Read next packet. */ bytes_read = sizeof(rdata->pkt_info.packet); rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr); status = pj_ioqueue_recvfrom(key, op_key, rdata->pkt_info.packet, &bytes_read, flags, &rdata->pkt_info.src_addr, &rdata->pkt_info.src_addr_len); if (status == PJ_SUCCESS) { /* Continue loop. */ pj_assert(i < MAX_IMMEDIATE_PACKET); } else if (status == PJ_EPENDING) { break; } else { if (i < MAX_IMMEDIATE_PACKET) { /* Report error to endpoint if this is not EWOULDBLOCK error.*/ if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, status, "Warning: pj_ioqueue_recvfrom")); } /* Continue loop. */ bytes_read = 0; } else { /* This is fatal error. * Ioqueue operation will stop for this transport! */ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, status, "FATAL: pj_ioqueue_recvfrom() error, " "UDP transport stopping! Error")); break; } } }}/* * udp_on_write_complete() * * This is callback notification from ioqueue that a pending sendto() * operation has completed. */static void udp_on_write_complete( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent){ struct udp_transport *tp = pj_ioqueue_get_user_data(key); pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; tdata_op_key->tdata = NULL; if (tdata_op_key->callback) { tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); }}/* * udp_send_msg() * * This function is called by transport manager (by transport->send_msg()) * to send outgoing message. */static pj_status_t udp_send_msg( pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, int addr_len, void *token, void (*callback)(pjsip_transport*, void *token, pj_ssize_t)){ struct udp_transport *tp = (struct udp_transport*)transport; pj_ssize_t size; pj_status_t status; PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); /* Init op key. */ tdata->op_key.tdata = tdata; tdata->op_key.token = token; tdata->op_key.callback = callback; /* Send to ioqueue! */ size = tdata->buf.cur - tdata->buf.start; status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key, tdata->buf.start, &size, 0, rem_addr, addr_len); if (status != PJ_EPENDING) tdata->op_key.tdata = NULL; return status;}/* * udp_destroy() * * This function is called by transport manager (by transport->destroy()). */static pj_status_t udp_destroy( pjsip_transport *transport ){ struct udp_transport *tp = (struct udp_transport*)transport; int i; /* Mark this transport as closing. */ tp->is_closing = 1; /* Cancel all pending operations. */ /* blp: NO NO NO... * No need to post queued completion as we poll the ioqueue until * we've got events anyway. Posting completion will only cause * callback to be called twice with IOCP: one for the post completion * and another one for closing the socket. * for (i=0; i<tp->rdata_cnt; ++i) { pj_ioqueue_post_completion(tp->key, &tp->rdata[i]->tp_info.op_key.op_key, -1); } */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -