⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sip_transport_loop.c

📁 基于sip协议的网络电话源码
💻 C
字号:
/* $Id: sip_transport_loop.c 974 2007-02-19 01:13:53Z bennylp $ *//*  * Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  */#include <pjsip/sip_transport_loop.h>#include <pjsip/sip_endpoint.h>#include <pjsip/sip_errno.h>#include <pj/pool.h>#include <pj/os.h>#include <pj/string.h>#include <pj/lock.h>#include <pj/assert.h>#include <pj/compat/socket.h>#define ADDR_LOOP	"128.0.0.1"#define ADDR_LOOP_DGRAM	"129.0.0.1"/** This structure describes incoming packet. */struct recv_list{    PJ_DECL_LIST_MEMBER(struct recv_list);    pjsip_rx_data  rdata;};/** This structure is used to keep delayed send failure. */struct send_list{    PJ_DECL_LIST_MEMBER(struct send_list);    pj_time_val    sent_time;    pj_ssize_t	   sent;    pjsip_tx_data *tdata;    void	  *token;    void	 (*callback)(pjsip_transport*, void*, pj_ssize_t);};/** This structure describes the loop transport. */struct loop_transport{    pjsip_transport	     base;    pj_pool_t		    *pool;    pj_thread_t		    *thread;    pj_bool_t		     thread_quit_flag;    pj_bool_t		     discard;    int			     fail_mode;    unsigned		     recv_delay;    unsigned		     send_delay;    struct recv_list	     recv_list;    struct send_list	     send_list;};/* Helper function to create "incoming" packet */struct recv_list *create_incoming_packet( struct loop_transport *loop,					  pjsip_tx_data *tdata ){    pj_pool_t *pool;    struct recv_list *pkt;    pool = pjsip_endpt_create_pool(loop->base.endpt, "rdata", 				   PJSIP_POOL_RDATA_LEN, 				   PJSIP_POOL_RDATA_INC+5);    if (!pool)	return NULL;    pkt = pj_pool_zalloc(pool, sizeof(struct recv_list));    /* Initialize rdata. */    pkt->rdata.tp_info.pool = pool;    pkt->rdata.tp_info.transport = &loop->base;        /* Copy the packet. */    pj_memcpy(pkt->rdata.pkt_info.packet, tdata->buf.start,	      tdata->buf.cur - tdata->buf.start);    pkt->rdata.pkt_info.len = tdata->buf.cur - tdata->buf.start;    /* "Source address" info. */    pkt->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);    if (loop->base.key.type == PJSIP_TRANSPORT_LOOP)	pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP);    else	pj_ansi_strcpy(pkt->rdata.pkt_info.src_name, ADDR_LOOP_DGRAM);    pkt->rdata.pkt_info.src_port = loop->base.local_name.port;    /* When do we need to "deliver" this packet. */    pj_gettimeofday(&pkt->rdata.pkt_info.timestamp);    pkt->rdata.pkt_info.timestamp.msec += loop->recv_delay;    pj_time_val_normalize(&pkt->rdata.pkt_info.timestamp);    /* Done. */    return pkt;}/* Helper function to add pending notification callback. */static pj_status_t add_notification( struct loop_transport *loop,				     pjsip_tx_data *tdata,				     pj_ssize_t sent,				     void *token,				     void (*callback)(pjsip_transport*, 						      void*, pj_ssize_t)){    struct send_list *sent_status;    pjsip_tx_data_add_ref(tdata);    pj_lock_acquire(tdata->lock);    sent_status = pj_pool_alloc(tdata->pool, sizeof(struct send_list));    pj_lock_release(tdata->lock);    sent_status->sent = sent;    sent_status->tdata = tdata;    sent_status->token = token;    sent_status->callback = callback;    pj_gettimeofday(&sent_status->sent_time);    sent_status->sent_time.msec += loop->send_delay;    pj_time_val_normalize(&sent_status->sent_time);    pj_lock_acquire(loop->base.lock);    pj_list_push_back(&loop->send_list, sent_status);    pj_lock_release(loop->base.lock);    return PJ_SUCCESS;}/* Handler for sending outgoing message; called by transport manager. */static pj_status_t loop_send_msg( pjsip_transport *tp, 				  pjsip_tx_data *tdata,				  const pj_sockaddr_t *rem_addr,				  int addr_len,				  void *token,				  void (*cb)(pjsip_transport *transport,					     void *token, 					     pj_ssize_t sent_bytes)){    struct loop_transport *loop = (struct loop_transport*)tp;    struct recv_list *recv_pkt;        PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);    PJ_UNUSED_ARG(rem_addr);    PJ_UNUSED_ARG(addr_len);    /* Need to send failure? */    if (loop->fail_mode) {	if (loop->send_delay == 0) {	    return PJ_STATUS_FROM_OS(OSERR_ECONNRESET);	} else {	    add_notification(loop, tdata, -PJ_STATUS_FROM_OS(OSERR_ECONNRESET),			     token, cb);	    return PJ_EPENDING;	}    }    /* Discard any packets? */    if (loop->discard)	return PJ_SUCCESS;    /* Create rdata for the "incoming" packet. */    recv_pkt = create_incoming_packet(loop, tdata);    if (!recv_pkt)	return PJ_ENOMEM;    /* If delay is not configured, deliver this packet now! */    if (loop->recv_delay == 0) {	pj_ssize_t size_eaten;	size_eaten = pjsip_tpmgr_receive_packet( loop->base.tpmgr, 						 &recv_pkt->rdata);	pj_assert(size_eaten == recv_pkt->rdata.pkt_info.len);	pjsip_endpt_release_pool(loop->base.endpt, 				 recv_pkt->rdata.tp_info.pool);    } else {	/* Otherwise if delay is configured, add the "packet" to the 	 * receive list to be processed by worker thread.	 */	pj_lock_acquire(loop->base.lock);	pj_list_push_back(&loop->recv_list, recv_pkt);	pj_lock_release(loop->base.lock);    }    if (loop->send_delay != 0) {	add_notification(loop, tdata, tdata->buf.cur - tdata->buf.start,			 token, cb);	return PJ_EPENDING;    } else {	return PJ_SUCCESS;    }}/* Handler to destroy the transport; called by transport manager */static pj_status_t loop_destroy(pjsip_transport *tp){    struct loop_transport *loop = (struct loop_transport*)tp;        PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);        loop->thread_quit_flag = 1;    /* Unlock transport mutex before joining thread. */    pj_lock_release(tp->lock);    pj_thread_join(loop->thread);    pj_thread_destroy(loop->thread);    /* Clear pending send notifications. */    while (!pj_list_empty(&loop->send_list)) {	struct send_list *node = loop->send_list.next;	/* Notify callback. */	if (node->callback) {	    (*node->callback)(&loop->base, node->token, -PJSIP_ESHUTDOWN);	}	pj_list_erase(node);	pjsip_tx_data_dec_ref(node->tdata);    }    /* Clear "incoming" packets in the queue. */    while (!pj_list_empty(&loop->recv_list)) {	struct recv_list *node = loop->recv_list.next;	pj_list_erase(node);	pjsip_endpt_release_pool(loop->base.endpt,				 node->rdata.tp_info.pool);    }    /* Self destruct.. heheh.. */    pj_lock_destroy(loop->base.lock);    pj_atomic_destroy(loop->base.ref_cnt);    pjsip_endpt_release_pool(loop->base.endpt, loop->base.pool);    return PJ_SUCCESS;}/* Worker thread for loop transport. */static int loop_transport_worker_thread(void *arg){    struct loop_transport *loop = arg;    struct recv_list r;    struct send_list s;    pj_list_init(&r);    pj_list_init(&s);    while (!loop->thread_quit_flag) {	pj_time_val now;	pj_thread_sleep(1);	pj_gettimeofday(&now);	pj_lock_acquire(loop->base.lock);	/* Move expired send notification to local list. */	while (!pj_list_empty(&loop->send_list)) {	    struct send_list *node = loop->send_list.next;	    /* Break when next node time is greater than now. */	    if (PJ_TIME_VAL_GTE(node->sent_time, now))		break;	    /* Delete this from the list. */	    pj_list_erase(node);	    /* Add to local list. */	    pj_list_push_back(&s, node);	}	/* Move expired "incoming" packet to local list. */	while (!pj_list_empty(&loop->recv_list)) {	    struct recv_list *node = loop->recv_list.next;	    /* Break when next node time is greater than now. */	    if (PJ_TIME_VAL_GTE(node->rdata.pkt_info.timestamp, now))		break;	    /* Delete this from the list. */	    pj_list_erase(node);	    /* Add to local list. */	    pj_list_push_back(&r, node);	}	pj_lock_release(loop->base.lock);	/* Process send notification and incoming packet notification	 * without holding down the loop's mutex.	 */	while (!pj_list_empty(&s)) {	    struct send_list *node = s.next;	    pj_list_erase(node);	    /* Notify callback. */	    if (node->callback) {		(*node->callback)(&loop->base, node->token, node->sent);	    }	    /* Decrement tdata reference counter. */	    pjsip_tx_data_dec_ref(node->tdata);	}	/* Process "incoming" packet. */	while (!pj_list_empty(&r)) {	    struct recv_list *node = r.next;	    pj_ssize_t size_eaten;	    pj_list_erase(node);	    /* Notify transport manager about the "incoming packet" */	    size_eaten = pjsip_tpmgr_receive_packet(loop->base.tpmgr,						    &node->rdata);	    /* Must "eat" all the packets. */	    pj_assert(size_eaten == node->rdata.pkt_info.len);	    /* Done. */	    pjsip_endpt_release_pool(loop->base.endpt,				     node->rdata.tp_info.pool);	}    }    return 0;}/* Start loop transport. */PJ_DEF(pj_status_t) pjsip_loop_start( pjsip_endpoint *endpt,				      pjsip_transport **transport){    pj_pool_t *pool;    struct loop_transport *loop;    pj_status_t status;    /* Create pool. */    pool = pjsip_endpt_create_pool(endpt, "loop", 4000, 4000);    if (!pool)	return PJ_ENOMEM;    /* Create the loop structure. */    loop = pj_pool_zalloc(pool, sizeof(struct loop_transport));        /* Initialize transport properties. */    pj_ansi_snprintf(loop->base.obj_name, sizeof(loop->base.obj_name), 		     "loop%p", loop);    loop->base.pool = pool;    status = pj_atomic_create(pool, 0, &loop->base.ref_cnt);    if (status != PJ_SUCCESS)	goto on_error;    status = pj_lock_create_recursive_mutex(pool, "loop", &loop->base.lock);    if (status != PJ_SUCCESS)	goto on_error;    loop->base.key.type = PJSIP_TRANSPORT_LOOP_DGRAM;    //loop->base.key.rem_addr.sa_family = PJ_AF_INET;    loop->base.type_name = "LOOP-DGRAM";    loop->base.info = "LOOP-DGRAM";    loop->base.flag = PJSIP_TRANSPORT_DATAGRAM;    loop->base.local_name.host = pj_str(ADDR_LOOP_DGRAM);    loop->base.local_name.port = 	pjsip_transport_get_default_port_for_type(loop->base.key.type);    loop->base.addr_len = sizeof(pj_sockaddr_in);    loop->base.endpt = endpt;    loop->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);    loop->base.send_msg = &loop_send_msg;    loop->base.destroy = &loop_destroy;    pj_list_init(&loop->recv_list);    pj_list_init(&loop->send_list);    /* Create worker thread. */    status = pj_thread_create(pool, "loop", 			      &loop_transport_worker_thread, loop, 0, 			      PJ_THREAD_SUSPENDED, &loop->thread);    if (status != PJ_SUCCESS)	goto on_error;    /* Register to transport manager. */    status = pjsip_transport_register( loop->base.tpmgr, &loop->base);    if (status != PJ_SUCCESS)	goto on_error;    /* Start the thread. */    status = pj_thread_resume(loop->thread);    if (status != PJ_SUCCESS)	goto on_error;    /*     * Done.     */    if (transport)	*transport = &loop->base;    return PJ_SUCCESS;on_error:    if (loop->base.lock)	pj_lock_destroy(loop->base.lock);    if (loop->thread)	pj_thread_destroy(loop->thread);    if (loop->base.ref_cnt)	pj_atomic_destroy(loop->base.ref_cnt);    pjsip_endpt_release_pool(endpt, loop->pool);    return status;}PJ_DEF(pj_status_t) pjsip_loop_set_discard( pjsip_transport *tp,					    pj_bool_t discard,					    pj_bool_t *prev_value ){    struct loop_transport *loop = (struct loop_transport*)tp;    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);    if (prev_value)	*prev_value = loop->discard;    loop->discard = discard;    return PJ_SUCCESS;}PJ_DEF(pj_status_t) pjsip_loop_set_failure( pjsip_transport *tp,					    int fail_flag,					    int *prev_value ){    struct loop_transport *loop = (struct loop_transport*)tp;    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);    if (prev_value)	*prev_value = loop->fail_mode;    loop->fail_mode = fail_flag;    return PJ_SUCCESS;}PJ_DEF(pj_status_t) pjsip_loop_set_recv_delay( pjsip_transport *tp,					       unsigned delay,					       unsigned *prev_value){    struct loop_transport *loop = (struct loop_transport*)tp;    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);    if (prev_value)	*prev_value = loop->recv_delay;    loop->recv_delay = delay;    return PJ_SUCCESS;}PJ_DEF(pj_status_t) pjsip_loop_set_send_callback_delay( pjsip_transport *tp,							unsigned delay,							unsigned *prev_value){    struct loop_transport *loop = (struct loop_transport*)tp;    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);    if (prev_value)	*prev_value = loop->send_delay;    loop->send_delay = delay;    return PJ_SUCCESS;}PJ_DEF(pj_status_t) pjsip_loop_set_delay( pjsip_transport *tp, unsigned delay ){    struct loop_transport *loop = (struct loop_transport*)tp;    PJ_ASSERT_RETURN(tp && (tp->key.type == PJSIP_TRANSPORT_LOOP ||	             tp->key.type == PJSIP_TRANSPORT_LOOP_DGRAM), PJ_EINVAL);    loop->recv_delay = delay;    loop->send_delay = delay;    return PJ_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -