📄 sip_transport_udp.c
字号:
/* $Id: sip_transport_udp.c 974 2007-02-19 01:13:53Z bennylp $ */
/*
* Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pjsip/sip_transport.h>
#include <pjsip/sip_endpoint.h>
#include <pjsip/sip_errno.h>
#include <pj/addr_resolv.h>
#include <pj/assert.h>
#include <pj/lock.h>
#include <pj/log.h>
#include <pj/os.h>
#include <pj/pool.h>
#include <pj/sock.h>
#include <pj/compat/socket.h>
#include <pj/string.h>
#define THIS_FILE "sip_transport_udp.c"
/**
* These are the target values for socket send and receive buffer sizes,
* respectively. They will be applied to UDP socket with setsockopt().
* When transport failed to set these size, it will decrease it until
* sufficiently large number has been successfully set.
*
* The buffer size is important, especially in WinXP/2000 machines.
* Basicly the lower the size, the more packets will be lost (dropped?)
* when we're sending (receiving?) packets in large volumes.
*
* The figure here is taken based on my experiment on WinXP/2000 machine,
* and with this value, the rate of dropped packet is about 8% when
* sending 1800 requests simultaneously (percentage taken as average
* after 50K requests or so).
*
* More experiments are needed probably.
*/
#ifndef PJSIP_UDP_SO_SNDBUF_SIZE
# define PJSIP_UDP_SO_SNDBUF_SIZE (24*1024*1024)
#endif
#ifndef PJSIP_UDP_SO_RCVBUF_SIZE
# define PJSIP_UDP_SO_RCVBUF_SIZE (24*1024*1024)
#endif
/* Struct udp_transport "inherits" struct pjsip_transport */
struct udp_transport
{
pjsip_transport base;
pj_sock_t sock;
pj_ioqueue_key_t *key;
int rdata_cnt;
pjsip_rx_data **rdata;
int is_closing;
};
/*
* Initialize transport's receive buffer from the specified pool.
*/
static void init_rdata(struct udp_transport *tp, unsigned rdata_index,
pj_pool_t *pool, pjsip_rx_data **p_rdata)
{
pjsip_rx_data *rdata;
/* Reset pool. */
//note: already done by caller
//pj_pool_reset(pool);
rdata = pj_pool_zalloc(pool, sizeof(pjsip_rx_data));
/* Init tp_info part. */
rdata->tp_info.pool = pool;
rdata->tp_info.transport = &tp->base;
rdata->tp_info.tp_data = (void*)(long)rdata_index;
rdata->tp_info.op_key.rdata = rdata;
pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key,
sizeof(pj_ioqueue_op_key_t));
tp->rdata[rdata_index] = rdata;
if (p_rdata)
*p_rdata = rdata;
}
/*
* udp_on_read_complete()
*
* This is callback notification from ioqueue that a pending recvfrom()
* operation has completed.
*/
static void udp_on_read_complete( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read)
{
enum { MAX_IMMEDIATE_PACKET = 10 };
pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
pjsip_rx_data *rdata = rdata_op_key->rdata;
struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
int i;
pj_status_t status;
/* Don't do anything if transport is closing. */
if (tp->is_closing) {
tp->is_closing++;
return;
}
/*
* The idea of the loop is to process immediate data received by
* pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
* i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
* complete asynchronously, to allow other sockets to get their data.
*/
for (i=0;; ++i) {
pj_uint32_t flags;
/* Report the packet to transport manager. */
if (bytes_read > 0) {
pj_size_t size_eaten;
const pj_sockaddr_in *src_addr =
(pj_sockaddr_in*)&rdata->pkt_info.src_addr;
/* Init pkt_info part. */
rdata->pkt_info.len = bytes_read;
rdata->pkt_info.zero = 0;
pj_gettimeofday(&rdata->pkt_info.timestamp);
pj_ansi_strcpy(rdata->pkt_info.src_name,
pj_inet_ntoa(src_addr->sin_addr));
rdata->pkt_info.src_port = pj_ntohs(src_addr->sin_port);
size_eaten =
pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
rdata);
if (size_eaten < 0) {
pj_assert(!"It shouldn't happen!");
size_eaten = rdata->pkt_info.len;
}
/* Since this is UDP, the whole buffer is the message. */
rdata->pkt_info.len = 0;
} else if (bytes_read == 0) {
/* TODO: */
} else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
-bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
-bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
{
/* Report error to endpoint. */
PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
rdata->tp_info.transport->obj_name,
-bytes_read,
"Warning: pj_ioqueue_recvfrom()"
" callback error"));
}
if (i >= MAX_IMMEDIATE_PACKET) {
/* Force ioqueue_recvfrom() to return PJ_EPENDING */
flags = PJ_IOQUEUE_ALWAYS_ASYNC;
} else {
flags = 0;
}
/* Reset pool.
* Need to copy rdata fields to temp variable because they will
* be invalid after pj_pool_reset().
*/
{
pj_pool_t *rdata_pool = rdata->tp_info.pool;
struct udp_transport *rdata_tp ;
unsigned rdata_index;
rdata_tp = (struct udp_transport*)rdata->tp_info.transport;
rdata_index = (unsigned)(unsigned long)rdata->tp_info.tp_data;
pj_pool_reset(rdata_pool);
init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata);
/* Change some vars to point to new location after
* pool reset.
*/
op_key = &rdata->tp_info.op_key.op_key;
}
/* Read next packet. */
bytes_read = sizeof(rdata->pkt_info.packet);
rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
status = pj_ioqueue_recvfrom(key, op_key,
rdata->pkt_info.packet,
&bytes_read, flags,
&rdata->pkt_info.src_addr,
&rdata->pkt_info.src_addr_len);
if (status == PJ_SUCCESS) {
/* Continue loop. */
pj_assert(i < MAX_IMMEDIATE_PACKET);
} else if (status == PJ_EPENDING) {
break;
} else {
if (i < MAX_IMMEDIATE_PACKET) {
/* Report error to endpoint if this is not EWOULDBLOCK error.*/
if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
{
PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
rdata->tp_info.transport->obj_name,
status,
"Warning: pj_ioqueue_recvfrom"));
}
/* Continue loop. */
bytes_read = 0;
} else {
/* This is fatal error.
* Ioqueue operation will stop for this transport!
*/
PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
rdata->tp_info.transport->obj_name,
status,
"FATAL: pj_ioqueue_recvfrom() error, "
"UDP transport stopping! Error"));
break;
}
}
}
}
/*
* udp_on_write_complete()
*
* This is callback notification from ioqueue that a pending sendto()
* operation has completed.
*/
static void udp_on_write_complete( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent)
{
struct udp_transport *tp = pj_ioqueue_get_user_data(key);
pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
tdata_op_key->tdata = NULL;
if (tdata_op_key->callback) {
tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
}
}
/*
* udp_send_msg()
*
* This function is called by transport manager (by transport->send_msg())
* to send outgoing message.
*/
static pj_status_t udp_send_msg( pjsip_transport *transport,
pjsip_tx_data *tdata,
const pj_sockaddr_t *rem_addr,
int addr_len,
void *token,
void (*callback)(pjsip_transport*,
void *token,
pj_ssize_t))
{
struct udp_transport *tp = (struct udp_transport*)transport;
pj_ssize_t size;
pj_status_t status;
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
/* Init op key. */
tdata->op_key.tdata = tdata;
tdata->op_key.token = token;
tdata->op_key.callback = callback;
/* Send to ioqueue! */
size = tdata->buf.cur - tdata->buf.start;
status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
tdata->buf.start, &size, 0,
rem_addr, addr_len);
if (status != PJ_EPENDING)
tdata->op_key.tdata = NULL;
return status;
}
/*
* udp_destroy()
*
* This function is called by transport manager (by transport->destroy()).
*/
static pj_status_t udp_destroy( pjsip_transport *transport )
{
struct udp_transport *tp = (struct udp_transport*)transport;
int i;
/* Mark this transport as closing. */
tp->is_closing = 1;
/* Cancel all pending operations. */
/* blp: NO NO NO...
* No need to post queued completion as we poll the ioqueue until
* we've got events anyway. Posting completion will only cause
* callback to be called twice with IOCP: one for the post completion
* and another one for closing the socket.
*
for (i=0; i<tp->rdata_cnt; ++i) {
pj_ioqueue_post_completion(tp->key,
&tp->rdata[i]->tp_info.op_key.op_key, -1);
}
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -