📄 sip_transport_tcp.c
字号:
/* $Id: sip_transport_tcp.c 1158 2007-04-06 10:25:23Z bennylp $ */
/*
* Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pjsip/sip_transport_tcp.h>
#include <pjsip/sip_endpoint.h>
#include <pjsip/sip_errno.h>
#include <pj/compat/socket.h>
#include <pj/addr_resolv.h>
#include <pj/assert.h>
#include <pj/ioqueue.h>
#include <pj/lock.h>
#include <pj/log.h>
#include <pj/os.h>
#include <pj/pool.h>
#include <pj/string.h>
/* Only declare the API if PJ_HAS_TCP is true */
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
#define THIS_FILE "sip_transport_tcp.c"
#define MAX_ASYNC_CNT 16
#define POOL_LIS_INIT 4000
#define POOL_LIS_INC 4001
#define POOL_TP_INIT 4000
#define POOL_TP_INC 4002
struct tcp_listener;
struct tcp_transport;
/*
* This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to
* track pending/asynchronous accept() operation. TCP transport may have
* more than one pending accept() operations, depending on the value of
* async_cnt.
*/
struct pending_accept
{
pj_ioqueue_op_key_t op_key;
struct tcp_listener *listener;
unsigned index;
pj_pool_t *pool;
pj_sock_t new_sock;
int addr_len;
pj_sockaddr_in local_addr;
pj_sockaddr_in remote_addr;
};
/*
* This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
* SIP transport factory).
*/
struct tcp_listener
{
pjsip_tpfactory factory;
pj_bool_t is_registered;
pjsip_endpoint *endpt;
pjsip_tpmgr *tpmgr;
pj_sock_t sock;
pj_ioqueue_key_t *key;
unsigned async_cnt;
struct pending_accept *accept_op[MAX_ASYNC_CNT];
};
/*
* This structure is used to keep delayed transmit operation in a list.
* A delayed transmission occurs when application sends tx_data when
* the TCP connect/establishment is still in progress. These delayed
* transmission will be "flushed" once the socket is connected (either
* successfully or with errors).
*/
struct delayed_tdata
{
PJ_DECL_LIST_MEMBER(struct delayed_tdata);
pjsip_tx_data_op_key *tdata_op_key;
};
/*
* This structure describes the TCP transport, and it's descendant of
* pjsip_transport.
*/
struct tcp_transport
{
pjsip_transport base;
pj_bool_t is_server;
struct tcp_listener *listener;
pj_bool_t is_registered;
pj_bool_t is_closing;
pj_status_t close_reason;
pj_sock_t sock;
pj_ioqueue_key_t *key;
pj_bool_t has_pending_connect;
/* TCP transport can only have one rdata!
* Otherwise chunks of incoming PDU may be received on different
* buffer.
*/
pjsip_rx_data rdata;
/* Pending transmission list. */
struct delayed_tdata delayed_list;
};
/****************************************************************************
* PROTOTYPES
*/
/* This callback is called when pending accept() operation completes. */
static void on_accept_complete( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t sock,
pj_status_t status);
/* This callback is called by transport manager to destroy listener */
static pj_status_t lis_destroy(pjsip_tpfactory *factory);
/* This callback is called by transport manager to create transport */
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
pjsip_tpmgr *mgr,
pjsip_endpoint *endpt,
const pj_sockaddr *rem_addr,
int addr_len,
pjsip_transport **transport);
/* Common function to create and initialize transport */
static pj_status_t tcp_create(struct tcp_listener *listener,
pj_pool_t *pool,
pj_sock_t sock, pj_bool_t is_server,
const pj_sockaddr_in *local,
const pj_sockaddr_in *remote,
struct tcp_transport **p_tcp);
static void tcp_perror(const char *sender, const char *title,
pj_status_t status)
{
char errmsg[PJ_ERR_MSG_SIZE];
pj_strerror(status, errmsg, sizeof(errmsg));
PJ_LOG(1,(sender, "%s: %s [code=%d]", title, errmsg, status));
}
static void sockaddr_to_host_port( pj_pool_t *pool,
pjsip_host_port *host_port,
const pj_sockaddr_in *addr )
{
enum { M = 48 };
host_port->host.ptr = pj_pool_alloc(pool, M);
host_port->host.slen = pj_ansi_snprintf( host_port->host.ptr, M, "%s",
pj_inet_ntoa(addr->sin_addr));
host_port->port = pj_ntohs(addr->sin_port);
}
/****************************************************************************
* The TCP listener/transport factory.
*/
/*
* This is the public API to create, initialize, register, and start the
* TCP listener.
*/
PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
const pj_sockaddr_in *local,
const pjsip_host_port *a_name,
unsigned async_cnt,
pjsip_tpfactory **p_factory)
{
pj_pool_t *pool;
struct tcp_listener *listener;
pj_ioqueue_callback listener_cb;
pj_sockaddr_in *listener_addr;
int addr_len;
unsigned i;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL);
/* Verify that address given in a_name (if any) is valid */
if (a_name && a_name->host.slen) {
pj_sockaddr_in tmp;
status = pj_sockaddr_in_init(&tmp, &a_name->host,
(pj_uint16_t)a_name->port);
if (status != PJ_SUCCESS || tmp.sin_addr.s_addr == PJ_INADDR_ANY ||
tmp.sin_addr.s_addr == PJ_INADDR_NONE)
{
/* Invalid address */
return PJ_EINVAL;
}
}
pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,
POOL_LIS_INC);
PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener));
listener->factory.pool = pool;
listener->factory.type = PJSIP_TRANSPORT_TCP;
listener->factory.type_name = "tcp";
listener->factory.flag =
pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);
listener->sock = PJ_INVALID_SOCKET;
pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
status = pj_lock_create_recursive_mutex(pool, "tcplis",
&listener->factory.lock);
if (status != PJ_SUCCESS)
goto on_error;
/* Create and bind socket */
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &listener->sock);
if (status != PJ_SUCCESS)
goto on_error;
listener_addr = (pj_sockaddr_in*)&listener->factory.local_addr;
if (local) {
pj_memcpy(listener_addr, local, sizeof(pj_sockaddr_in));
} else {
pj_sockaddr_in_init(listener_addr, NULL, 0);
}
status = pj_sock_bind(listener->sock, listener_addr,
sizeof(pj_sockaddr_in));
if (status != PJ_SUCCESS)
goto on_error;
/* Retrieve the bound address */
addr_len = sizeof(pj_sockaddr_in);
status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len);
if (status != PJ_SUCCESS)
goto on_error;
/* If published host/IP is specified, then use that address as the
* listener advertised address.
*/
if (a_name && a_name->host.slen) {
/* Copy the address */
listener->factory.addr_name = *a_name;
pj_strdup(listener->factory.pool, &listener->factory.addr_name.host,
&a_name->host);
listener->factory.addr_name.port = a_name->port;
} else {
/* No published address is given, use the bound address */
/* If the address returns 0.0.0.0, use the default
* interface address as the transport's address.
*/
if (listener_addr->sin_addr.s_addr == 0) {
pj_in_addr hostip;
status = pj_gethostip(&hostip);
if (status != PJ_SUCCESS)
goto on_error;
listener_addr->sin_addr = hostip;
}
/* Save the address name */
sockaddr_to_host_port(listener->factory.pool,
&listener->factory.addr_name, listener_addr);
}
/* If port is zero, get the bound port */
if (listener->factory.addr_name.port == 0) {
listener->factory.addr_name.port = pj_ntohs(listener_addr->sin_port);
}
pj_ansi_snprintf(listener->factory.obj_name,
sizeof(listener->factory.obj_name),
"tcplis:%d", listener->factory.addr_name.port);
/* Start listening to the address */
status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG);
if (status != PJ_SUCCESS)
goto on_error;
/* Register socket to ioqeuue */
pj_bzero(&listener_cb, sizeof(listener_cb));
listener_cb.on_accept_complete = &on_accept_complete;
status = pj_ioqueue_register_sock(pool, pjsip_endpt_get_ioqueue(endpt),
listener->sock, listener,
&listener_cb, &listener->key);
if (status != PJ_SUCCESS)
goto on_error;
/* Register to transport manager */
listener->endpt = endpt;
listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
listener->factory.create_transport = lis_create_transport;
listener->factory.destroy = lis_destroy;
listener->is_registered = PJ_TRUE;
status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
&listener->factory);
if (status != PJ_SUCCESS) {
listener->is_registered = PJ_FALSE;
goto on_error;
}
/* Start pending accept() operations */
if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT;
listener->async_cnt = async_cnt;
for (i=0; i<async_cnt; ++i) {
pj_pool_t *pool;
pool = pjsip_endpt_create_pool(endpt, "tcps%p", POOL_TP_INIT,
POOL_TP_INIT);
if (!pool) {
status = PJ_ENOMEM;
goto on_error;
}
listener->accept_op[i] = pj_pool_zalloc(pool,
sizeof(struct pending_accept));
pj_ioqueue_op_key_init(&listener->accept_op[i]->op_key,
sizeof(listener->accept_op[i]->op_key));
listener->accept_op[i]->pool = pool;
listener->accept_op[i]->listener = listener;
listener->accept_op[i]->index = i;
on_accept_complete(listener->key, &listener->accept_op[i]->op_key,
listener->sock, PJ_EPENDING);
}
PJ_LOG(4,(listener->factory.obj_name,
"SIP TCP listener ready for incoming connections at %.*s:%d",
(int)listener->factory.addr_name.host.slen,
listener->factory.addr_name.host.ptr,
listener->factory.addr_name.port));
/* Return the pointer to user */
if (p_factory) *p_factory = &listener->factory;
return PJ_SUCCESS;
on_error:
lis_destroy(&listener->factory);
return status;
}
/*
* This is the public API to create, initialize, register, and start the
* TCP listener.
*/
PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
const pj_sockaddr_in *local,
unsigned async_cnt,
pjsip_tpfactory **p_factory)
{
return pjsip_tcp_transport_start2(endpt, local, NULL, async_cnt, p_factory);
}
/* This callback is called by transport manager to destroy listener */
static pj_status_t lis_destroy(pjsip_tpfactory *factory)
{
struct tcp_listener *listener = (struct tcp_listener *)factory;
unsigned i;
if (listener->is_registered) {
pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
listener->is_registered = PJ_FALSE;
}
if (listener->key) {
pj_ioqueue_unregister(listener->key);
listener->key = NULL;
listener->sock = PJ_INVALID_SOCKET;
}
if (listener->sock != PJ_INVALID_SOCKET) {
pj_sock_close(listener->sock);
listener->sock = PJ_INVALID_SOCKET;
}
if (listener->factory.lock) {
pj_lock_destroy(listener->factory.lock);
listener->factory.lock = NULL;
}
for (i=0; i<PJ_ARRAY_SIZE(listener->accept_op); ++i) {
if (listener->accept_op[i] && listener->accept_op[i]->pool) {
pj_pool_t *pool = listener->accept_op[i]->pool;
listener->accept_op[i]->pool = NULL;
pj_pool_release(pool);
}
}
if (listener->factory.pool) {
pj_pool_t *pool = listener->factory.pool;
PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener destroyed"));
listener->factory.pool = NULL;
pj_pool_release(pool);
}
return PJ_SUCCESS;
}
/***************************************************************************/
/*
* TCP Transport
*/
/*
* Prototypes.
*/
/* Called by transport manager to send message */
static pj_status_t tcp_send_msg(pjsip_transport *transport,
pjsip_tx_data *tdata,
const pj_sockaddr_t *rem_addr,
int addr_len,
void *token,
void (*callback)(pjsip_transport *transport,
void *token,
pj_ssize_t sent_bytes));
/* Called by transport manager to shutdown */
static pj_status_t tcp_shutdown(pjsip_transport *transport);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -