📄 sip_transport_tcp.c
字号:
/* Called by transport manager to destroy transport */
static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
/* Utility to destroy transport */
static pj_status_t tcp_destroy(pjsip_transport *transport,
pj_status_t reason);
/* Callback from ioqueue on incoming packet */
static void on_read_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read);
/* Callback from ioqueue when packet is sent */
static void on_write_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent);
/* Callback from ioqueue when connect completes */
static void on_connect_complete(pj_ioqueue_key_t *key,
pj_status_t status);
/*
* Common function to create TCP transport, called when pending accept() and
* pending connect() complete.
*/
static pj_status_t tcp_create( struct tcp_listener *listener,
pj_pool_t *pool,
pj_sock_t sock, pj_bool_t is_server,
const pj_sockaddr_in *local,
const pj_sockaddr_in *remote,
struct tcp_transport **p_tcp)
{
struct tcp_transport *tcp;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback tcp_callback;
pj_status_t status;
PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
if (pool == NULL) {
pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
POOL_TP_INIT, POOL_TP_INC);
PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
}
/*
* Create and initialize basic transport structure.
*/
tcp = pj_pool_zalloc(pool, sizeof(*tcp));
tcp->sock = sock;
tcp->is_server = is_server;
tcp->listener = listener;
pj_list_init(&tcp->delayed_list);
tcp->base.pool = pool;
pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,
(is_server ? "tcps%p" :"tcpc%p"), tcp);
status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
if (status != PJ_SUCCESS) {
goto on_error;
}
status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
if (status != PJ_SUCCESS) {
goto on_error;
}
tcp->base.key.type = PJSIP_TRANSPORT_TCP;
pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in));
tcp->base.type_name = "tcp";
tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);
tcp->base.info = pj_pool_alloc(pool, 64);
pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d",
pj_inet_ntoa(remote->sin_addr),
(int)pj_ntohs(remote->sin_port));
tcp->base.addr_len = sizeof(pj_sockaddr_in);
pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in));
sockaddr_to_host_port(pool, &tcp->base.local_name, local);
sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);
tcp->base.endpt = listener->endpt;
tcp->base.tpmgr = listener->tpmgr;
tcp->base.send_msg = &tcp_send_msg;
tcp->base.do_shutdown = &tcp_shutdown;
tcp->base.destroy = &tcp_destroy_transport;
/* Register socket to ioqueue */
pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback));
tcp_callback.on_read_complete = &on_read_complete;
tcp_callback.on_write_complete = &on_write_complete;
tcp_callback.on_connect_complete = &on_connect_complete;
ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
status = pj_ioqueue_register_sock(pool, ioqueue, sock,
tcp, &tcp_callback, &tcp->key);
if (status != PJ_SUCCESS) {
goto on_error;
}
/* Register transport to transport manager */
status = pjsip_transport_register(listener->tpmgr, &tcp->base);
if (status != PJ_SUCCESS) {
goto on_error;
}
tcp->is_registered = PJ_TRUE;
/* Done setting up basic transport. */
*p_tcp = tcp;
PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
(tcp->is_server ? "server" : "client")));
return PJ_SUCCESS;
on_error:
tcp_destroy(&tcp->base, status);
return status;
}
/* Flush all delayed transmision once the socket is connected. */
static void tcp_flush_pending_tx(struct tcp_transport *tcp)
{
pj_lock_acquire(tcp->base.lock);
while (!pj_list_empty(&tcp->delayed_list)) {
struct delayed_tdata *pending_tx;
pjsip_tx_data *tdata;
pj_ioqueue_op_key_t *op_key;
pj_ssize_t size;
pj_status_t status;
pending_tx = tcp->delayed_list.next;
pj_list_erase(pending_tx);
tdata = pending_tx->tdata_op_key->tdata;
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
/* send to ioqueue! */
size = tdata->buf.cur - tdata->buf.start;
status = pj_ioqueue_send(tcp->key, op_key,
tdata->buf.start, &size, 0);
if (status != PJ_EPENDING) {
on_write_complete(tcp->key, op_key, size);
}
}
pj_lock_release(tcp->base.lock);
}
/* Called by transport manager to destroy transport */
static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
{
struct tcp_transport *tcp = (struct tcp_transport*)transport;
/* Transport would have been unregistered by now since this callback
* is called by transport manager.
*/
tcp->is_registered = PJ_FALSE;
return tcp_destroy(transport, tcp->close_reason);
}
/* Destroy TCP transport */
static pj_status_t tcp_destroy(pjsip_transport *transport,
pj_status_t reason)
{
struct tcp_transport *tcp = (struct tcp_transport*)transport;
if (tcp->close_reason == 0)
tcp->close_reason = reason;
if (tcp->is_registered) {
tcp->is_registered = PJ_FALSE;
pjsip_transport_destroy(transport);
/* pjsip_transport_destroy will recursively call this function
* again.
*/
return PJ_SUCCESS;
}
/* Mark transport as closing */
tcp->is_closing = PJ_TRUE;
/* Cancel all delayed transmits */
while (!pj_list_empty(&tcp->delayed_list)) {
struct delayed_tdata *pending_tx;
pj_ioqueue_op_key_t *op_key;
pending_tx = tcp->delayed_list.next;
pj_list_erase(pending_tx);
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
on_write_complete(tcp->key, op_key, -reason);
}
if (tcp->rdata.tp_info.pool) {
pj_pool_release(tcp->rdata.tp_info.pool);
tcp->rdata.tp_info.pool = NULL;
}
if (tcp->key) {
pj_ioqueue_unregister(tcp->key);
tcp->key = NULL;
tcp->sock = PJ_INVALID_SOCKET;
}
if (tcp->sock != PJ_INVALID_SOCKET) {
pj_sock_close(tcp->sock);
tcp->sock = PJ_INVALID_SOCKET;
}
if (tcp->base.lock) {
pj_lock_destroy(tcp->base.lock);
tcp->base.lock = NULL;
}
if (tcp->base.ref_cnt) {
pj_atomic_destroy(tcp->base.ref_cnt);
tcp->base.ref_cnt = NULL;
}
if (tcp->base.pool) {
pj_pool_t *pool;
if (reason != PJ_SUCCESS) {
char errmsg[PJ_ERR_MSG_SIZE];
pj_strerror(reason, errmsg, sizeof(errmsg));
PJ_LOG(4,(tcp->base.obj_name,
"TCP transport destroyed with reason %d: %s",
reason, errmsg));
} else {
PJ_LOG(4,(tcp->base.obj_name,
"TCP transport destroyed normally"));
}
pool = tcp->base.pool;
tcp->base.pool = NULL;
pj_pool_release(pool);
}
return PJ_SUCCESS;
}
/*
* This utility function creates receive data buffers and start
* asynchronous recv() operations from the socket. It is called after
* accept() or connect() operation complete.
*/
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
{
pj_pool_t *pool;
pj_ssize_t size;
pj_sockaddr_in *rem_addr;
pj_status_t status;
/* Init rdata */
pool = pjsip_endpt_create_pool(tcp->listener->endpt,
"rtd%p",
PJSIP_POOL_RDATA_LEN,
PJSIP_POOL_RDATA_INC);
if (!pool) {
tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
return PJ_ENOMEM;
}
tcp->rdata.tp_info.pool = pool;
tcp->rdata.tp_info.transport = &tcp->base;
tcp->rdata.tp_info.tp_data = tcp;
tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key,
sizeof(pj_ioqueue_op_key_t));
tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr;
pj_ansi_strcpy(tcp->rdata.pkt_info.src_name,
pj_inet_ntoa(rem_addr->sin_addr));
tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port);
size = sizeof(tcp->rdata.pkt_info.packet);
status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key,
tcp->rdata.pkt_info.packet, &size,
PJ_IOQUEUE_ALWAYS_ASYNC);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d",
status));
return status;
}
return PJ_SUCCESS;
}
/* This callback is called by transport manager for the TCP factory
* to create outgoing transport to the specified destination.
*/
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
pjsip_tpmgr *mgr,
pjsip_endpoint *endpt,
const pj_sockaddr *rem_addr,
int addr_len,
pjsip_transport **p_transport)
{
struct tcp_listener *listener;
struct tcp_transport *tcp;
pj_sock_t sock;
pj_sockaddr_in local_addr;
pj_status_t status;
/* Sanity checks */
PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
addr_len && p_transport, PJ_EINVAL);
/* Check that address is a sockaddr_in */
PJ_ASSERT_RETURN(rem_addr->sa_family == PJ_AF_INET &&
addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL);
listener = (struct tcp_listener*)factory;
/* Create socket */
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock);
if (status != PJ_SUCCESS)
return status;
/* Bind to any port */
status = pj_sock_bind_in(sock, 0, 0);
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
/* Get the local port */
addr_len = sizeof(pj_sockaddr_in);
status = pj_sock_getsockname(sock, &local_addr, &addr_len);
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
/* Initially set the address from the listener's address */
local_addr.sin_addr.s_addr =
((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr;
/* Create the transport descriptor */
status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr,
(pj_sockaddr_in*)rem_addr, &tcp);
if (status != PJ_SUCCESS)
return status;
/* Start asynchronous connect() operation */
tcp->has_pending_connect = PJ_TRUE;
status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in));
if (status == PJ_SUCCESS) {
tcp->has_pending_connect = PJ_FALSE;
} else if (status != PJ_EPENDING) {
tcp_destroy(&tcp->base, status);
return status;
}
/* Update (again) local address, just in case local address currently
* set is different now that asynchronous connect() is started.
*/
addr_len = sizeof(pj_sockaddr_in);
if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) {
pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;
/* Some systems (like old Win32 perhaps) may not set local address
* properly before socket is fully connected.
*/
if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr &&
local_addr.sin_addr.s_addr != 0)
{
tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr;
tp_addr->sin_port = local_addr.sin_port;
sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
&local_addr);
}
}
if (tcp->has_pending_connect) {
PJ_LOG(4,(tcp->base.obj_name,
"TCP transport %.*s:%d is connecting to %.*s:%d...",
(int)tcp->base.local_name.host.slen,
tcp->base.local_name.host.ptr,
tcp->base.local_name.port,
(int)tcp->base.remote_name.host.slen,
tcp->base.remote_name.host.ptr,
tcp->base.remote_name.port));
}
/* Done */
*p_transport = &tcp->base;
return PJ_SUCCESS;
}
/*
* This callback is called by ioqueue when pending accept() operation has
* completed.
*/
static void on_accept_complete( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t sock,
pj_status_t status)
{
struct tcp_listener *listener;
struct tcp_transport *tcp;
struct pending_accept *accept_op;
int err_cnt = 0;
listener = pj_ioqueue_get_user_data(key);
accept_op = (struct pending_accept*) op_key;
/*
* Loop while there is immediate connection or when there is error.
*/
do {
if (status == PJ_EPENDING) {
/*
* This can only happen when this function is called during
* initialization to kick off asynchronous accept().
*/
} else if (status != PJ_SUCCESS) {
/*
* Error in accept().
*/
tcp_perror(listener->factory.obj_name, "Error in accept()",
status);
/*
* Prevent endless accept() error loop by limiting the
* number of consecutive errors. Once the number of errors
* is equal to maximum, we treat this as permanent error, and
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -