⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sip_transport_udp.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: sip_transport_udp.c 974 2007-02-19 01:13:53Z bennylp $ */
/* 
 * Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 */
#include <pjsip/sip_transport.h>
#include <pjsip/sip_endpoint.h>
#include <pjsip/sip_errno.h>
#include <pj/addr_resolv.h>
#include <pj/assert.h>
#include <pj/lock.h>
#include <pj/log.h>
#include <pj/os.h>
#include <pj/pool.h>
#include <pj/sock.h>
#include <pj/compat/socket.h>
#include <pj/string.h>


#define THIS_FILE   "sip_transport_udp.c"

/**
 * These are the target values for socket send and receive buffer sizes,
 * respectively. They will be applied to UDP socket with setsockopt().
 * When transport failed to set these size, it will decrease it until
 * sufficiently large number has been successfully set.
 *
 * The buffer size is important, especially in WinXP/2000 machines.
 * Basicly the lower the size, the more packets will be lost (dropped?)
 * when we're sending (receiving?) packets in large volumes.
 * 
 * The figure here is taken based on my experiment on WinXP/2000 machine,
 * and with this value, the rate of dropped packet is about 8% when
 * sending 1800 requests simultaneously (percentage taken as average
 * after 50K requests or so).
 *
 * More experiments are needed probably.
 */
#ifndef PJSIP_UDP_SO_SNDBUF_SIZE
#   define PJSIP_UDP_SO_SNDBUF_SIZE	(24*1024*1024)
#endif

#ifndef PJSIP_UDP_SO_RCVBUF_SIZE
#   define PJSIP_UDP_SO_RCVBUF_SIZE	(24*1024*1024)
#endif


/* Struct udp_transport "inherits" struct pjsip_transport */
struct udp_transport
{
    pjsip_transport	base;
    pj_sock_t		sock;
    pj_ioqueue_key_t   *key;
    int			rdata_cnt;
    pjsip_rx_data     **rdata;
    int			is_closing;
};


/*
 * Initialize transport's receive buffer from the specified pool.
 */
static void init_rdata(struct udp_transport *tp, unsigned rdata_index,
		       pj_pool_t *pool, pjsip_rx_data **p_rdata)
{
    pjsip_rx_data *rdata;

    /* Reset pool. */
    //note: already done by caller
    //pj_pool_reset(pool);

    rdata = pj_pool_zalloc(pool, sizeof(pjsip_rx_data));

    /* Init tp_info part. */
    rdata->tp_info.pool = pool;
    rdata->tp_info.transport = &tp->base;
    rdata->tp_info.tp_data = (void*)(long)rdata_index;
    rdata->tp_info.op_key.rdata = rdata;
    pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key, 
			   sizeof(pj_ioqueue_op_key_t));

    tp->rdata[rdata_index] = rdata;

    if (p_rdata)
	*p_rdata = rdata;
}


/*
 * udp_on_read_complete()
 *
 * This is callback notification from ioqueue that a pending recvfrom()
 * operation has completed.
 */
static void udp_on_read_complete( pj_ioqueue_key_t *key, 
				  pj_ioqueue_op_key_t *op_key, 
				  pj_ssize_t bytes_read)
{
    enum { MAX_IMMEDIATE_PACKET = 10 };
    pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
    pjsip_rx_data *rdata = rdata_op_key->rdata;
    struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
    int i;
    pj_status_t status;

    /* Don't do anything if transport is closing. */
    if (tp->is_closing) {
	tp->is_closing++;
	return;
    }

    /*
     * The idea of the loop is to process immediate data received by
     * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
     * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
     * complete asynchronously, to allow other sockets to get their data.
     */
    for (i=0;; ++i) {
	pj_uint32_t flags;

	/* Report the packet to transport manager. */
	if (bytes_read > 0) {
	    pj_size_t size_eaten;
	    const pj_sockaddr_in *src_addr = 
		(pj_sockaddr_in*)&rdata->pkt_info.src_addr;

	    /* Init pkt_info part. */
	    rdata->pkt_info.len = bytes_read;
	    rdata->pkt_info.zero = 0;
	    pj_gettimeofday(&rdata->pkt_info.timestamp);
	    pj_ansi_strcpy(rdata->pkt_info.src_name,
			   pj_inet_ntoa(src_addr->sin_addr));
	    rdata->pkt_info.src_port = pj_ntohs(src_addr->sin_port);

	    size_eaten = 
		pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 
					   rdata);

	    if (size_eaten < 0) {
		pj_assert(!"It shouldn't happen!");
		size_eaten = rdata->pkt_info.len;
	    }

	    /* Since this is UDP, the whole buffer is the message. */
	    rdata->pkt_info.len = 0;

	} else if (bytes_read == 0) {

	    /* TODO: */

	} else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
		   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 
		   -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) 
	{

	    /* Report error to endpoint. */
	    PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
				   rdata->tp_info.transport->obj_name,
				   -bytes_read, 
				   "Warning: pj_ioqueue_recvfrom()"
				   " callback error"));
	}

	if (i >= MAX_IMMEDIATE_PACKET) {
	    /* Force ioqueue_recvfrom() to return PJ_EPENDING */
	    flags = PJ_IOQUEUE_ALWAYS_ASYNC;
	} else {
	    flags = 0;
	}

	/* Reset pool. 
	 * Need to copy rdata fields to temp variable because they will
	 * be invalid after pj_pool_reset().
	 */
	{
	    pj_pool_t *rdata_pool = rdata->tp_info.pool;
	    struct udp_transport *rdata_tp ;
	    unsigned rdata_index;

	    rdata_tp = (struct udp_transport*)rdata->tp_info.transport;
	    rdata_index = (unsigned)(unsigned long)rdata->tp_info.tp_data;

	    pj_pool_reset(rdata_pool);
	    init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata);

	    /* Change some vars to point to new location after
	     * pool reset.
	     */
	    op_key = &rdata->tp_info.op_key.op_key;
	}

	/* Read next packet. */
	bytes_read = sizeof(rdata->pkt_info.packet);
	rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
	status = pj_ioqueue_recvfrom(key, op_key, 
				     rdata->pkt_info.packet,
				     &bytes_read, flags,
				     &rdata->pkt_info.src_addr, 
				     &rdata->pkt_info.src_addr_len);

	if (status == PJ_SUCCESS) {
	    /* Continue loop. */
	    pj_assert(i < MAX_IMMEDIATE_PACKET);

	} else if (status == PJ_EPENDING) {
	    break;

	} else {

	    if (i < MAX_IMMEDIATE_PACKET) {

		/* Report error to endpoint if this is not EWOULDBLOCK error.*/
		if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
		    status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 
		    status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) 
		{

		    PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
					   rdata->tp_info.transport->obj_name,
					   status, 
					   "Warning: pj_ioqueue_recvfrom"));
		}

		/* Continue loop. */
		bytes_read = 0;
	    } else {
		/* This is fatal error.
		 * Ioqueue operation will stop for this transport!
		 */
		PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
				       rdata->tp_info.transport->obj_name,
				       status, 
				       "FATAL: pj_ioqueue_recvfrom() error, "
				       "UDP transport stopping! Error"));
		break;
	    }
	}
    }
}

/*
 * udp_on_write_complete()
 *
 * This is callback notification from ioqueue that a pending sendto()
 * operation has completed.
 */
static void udp_on_write_complete( pj_ioqueue_key_t *key, 
				   pj_ioqueue_op_key_t *op_key,
				   pj_ssize_t bytes_sent)
{
    struct udp_transport *tp = pj_ioqueue_get_user_data(key);
    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;

    tdata_op_key->tdata = NULL;

    if (tdata_op_key->callback) {
	tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
    }
}

/*
 * udp_send_msg()
 *
 * This function is called by transport manager (by transport->send_msg())
 * to send outgoing message.
 */
static pj_status_t udp_send_msg( pjsip_transport *transport,
				 pjsip_tx_data *tdata,
				 const pj_sockaddr_t *rem_addr,
				 int addr_len,
				 void *token,
				 void (*callback)(pjsip_transport*,
						  void *token,
						  pj_ssize_t))
{
    struct udp_transport *tp = (struct udp_transport*)transport;
    pj_ssize_t size;
    pj_status_t status;

    PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
    PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
    
    /* Init op key. */
    tdata->op_key.tdata = tdata;
    tdata->op_key.token = token;
    tdata->op_key.callback = callback;

    /* Send to ioqueue! */
    size = tdata->buf.cur - tdata->buf.start;
    status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
			       tdata->buf.start, &size, 0,
			       rem_addr, addr_len);

    if (status != PJ_EPENDING)
	tdata->op_key.tdata = NULL;

    return status;
}

/*
 * udp_destroy()
 *
 * This function is called by transport manager (by transport->destroy()).
 */
static pj_status_t udp_destroy( pjsip_transport *transport )
{
    struct udp_transport *tp = (struct udp_transport*)transport;
    int i;

    /* Mark this transport as closing. */
    tp->is_closing = 1;

    /* Cancel all pending operations. */
    /* blp: NO NO NO...
     *      No need to post queued completion as we poll the ioqueue until
     *      we've got events anyway. Posting completion will only cause
     *      callback to be called twice with IOCP: one for the post completion
     *      and another one for closing the socket.
     *
    for (i=0; i<tp->rdata_cnt; ++i) {
	pj_ioqueue_post_completion(tp->key, 
				   &tp->rdata[i]->tp_info.op_key.op_key, -1);
    }
    */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -