📄 sip_transport_tcp.c
字号:
* we stop the accept() operation.
*/
++err_cnt;
if (err_cnt >= 10) {
PJ_LOG(1, (listener->factory.obj_name,
"Too many errors, listener stopping"));
}
} else {
pj_pool_t *pool;
struct pending_accept *new_op;
if (sock == PJ_INVALID_SOCKET) {
sock = accept_op->new_sock;
}
if (sock == PJ_INVALID_SOCKET) {
pj_assert(!"Should not happen. status should be error");
goto next_accept;
}
PJ_LOG(4,(listener->factory.obj_name,
"TCP listener %.*s:%d: got incoming TCP connection "
"from %s:%d, sock=%d",
(int)listener->factory.addr_name.host.slen,
listener->factory.addr_name.host.ptr,
listener->factory.addr_name.port,
pj_inet_ntoa(accept_op->remote_addr.sin_addr),
pj_ntohs(accept_op->remote_addr.sin_port),
sock));
/* Create new accept_opt */
pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p",
POOL_TP_INIT, POOL_TP_INC);
new_op = pj_pool_zalloc(pool, sizeof(struct pending_accept));
new_op->pool = pool;
new_op->listener = listener;
new_op->index = accept_op->index;
pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key));
listener->accept_op[accept_op->index] = new_op;
/*
* Incoming connections!
* Create TCP transport for the new socket.
*/
status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE,
&accept_op->local_addr,
&accept_op->remote_addr, &tcp);
if (status == PJ_SUCCESS) {
status = tcp_start_read(tcp);
if (status != PJ_SUCCESS) {
PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
tcp_destroy(&tcp->base, status);
}
}
accept_op = new_op;
}
next_accept:
/*
* Start the next asynchronous accept() operation.
*/
accept_op->addr_len = sizeof(pj_sockaddr_in);
accept_op->new_sock = PJ_INVALID_SOCKET;
status = pj_ioqueue_accept(listener->key,
&accept_op->op_key,
&accept_op->new_sock,
&accept_op->local_addr,
&accept_op->remote_addr,
&accept_op->addr_len);
/*
* Loop while we have immediate connection or when there is error.
*/
} while (status != PJ_EPENDING);
}
/*
* Callback from ioqueue when packet is sent.
*/
static void on_write_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent)
{
struct tcp_transport *tcp = pj_ioqueue_get_user_data(key);
pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
tdata_op_key->tdata = NULL;
/* Check for error/closure */
if (bytes_sent <= 0) {
pj_status_t status;
PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
bytes_sent));
status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
-bytes_sent;
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
}
if (tdata_op_key->callback) {
/*
* Notify sip_transport.c that packet has been sent.
*/
tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
}
}
/*
* This callback is called by transport manager to send SIP message
*/
static pj_status_t tcp_send_msg(pjsip_transport *transport,
pjsip_tx_data *tdata,
const pj_sockaddr_t *rem_addr,
int addr_len,
void *token,
void (*callback)(pjsip_transport *transport,
void *token,
pj_ssize_t sent_bytes))
{
struct tcp_transport *tcp = (struct tcp_transport*)transport;
pj_ssize_t size;
pj_bool_t delayed = PJ_FALSE;
pj_status_t status = PJ_SUCCESS;
/* Sanity check */
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
/* Check that there's no pending operation associated with the tdata */
PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
/* Check the address is supported */
PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL);
/* Init op key. */
tdata->op_key.tdata = tdata;
tdata->op_key.token = token;
tdata->op_key.callback = callback;
/* If asynchronous connect() has not completed yet, just put the
* transmit data in the pending transmission list since we can not
* use the socket yet.
*/
if (tcp->has_pending_connect) {
/*
* Looks like connect() is still in progress. Check again (this time
* with holding the lock) to be sure.
*/
pj_lock_acquire(tcp->base.lock);
if (tcp->has_pending_connect) {
struct delayed_tdata *delayed_tdata;
/*
* connect() is still in progress. Put the transmit data to
* the delayed list.
*/
delayed_tdata = pj_pool_alloc(tdata->pool,
sizeof(*delayed_tdata));
delayed_tdata->tdata_op_key = &tdata->op_key;
pj_list_push_back(&tcp->delayed_list, delayed_tdata);
status = PJ_EPENDING;
/* Prevent pj_ioqueue_send() to be called below */
delayed = PJ_TRUE;
}
pj_lock_release(tcp->base.lock);
}
if (!delayed) {
/*
* Transport is ready to go. Send the packet to ioqueue to be
* sent asynchronously.
*/
size = tdata->buf.cur - tdata->buf.start;
status = pj_ioqueue_send(tcp->key,
(pj_ioqueue_op_key_t*)&tdata->op_key,
tdata->buf.start, &size, 0);
if (status != PJ_EPENDING) {
/* Not pending (could be immediate success or error) */
tdata->op_key.tdata = NULL;
/* Shutdown transport on closure/errors */
if (size <= 0) {
PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
size));
if (status == PJ_SUCCESS)
status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
}
}
}
return status;
}
/*
* This callback is called by transport manager to shutdown transport.
* This normally is only used by UDP transport.
*/
static pj_status_t tcp_shutdown(pjsip_transport *transport)
{
PJ_UNUSED_ARG(transport);
/* Nothing to do for TCP */
return PJ_SUCCESS;
}
/*
* Callback from ioqueue that an incoming data is received from the socket.
*/
static void on_read_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read)
{
enum { MAX_IMMEDIATE_PACKET = 10 };
pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
pjsip_rx_data *rdata = rdata_op_key->rdata;
struct tcp_transport *tcp =
(struct tcp_transport*)rdata->tp_info.transport;
int i;
pj_status_t status;
/* Don't do anything if transport is closing. */
if (tcp->is_closing) {
tcp->is_closing++;
return;
}
/*
* The idea of the loop is to process immediate data received by
* pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When
* i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to
* complete asynchronously, to allow other sockets to get their data.
*/
for (i=0;; ++i) {
pj_uint32_t flags;
/* Houston, we have packet! Report the packet to transport manager
* to be parsed.
*/
if (bytes_read > 0) {
pj_size_t size_eaten;
/* Init pkt_info part. */
rdata->pkt_info.len += bytes_read;
rdata->pkt_info.zero = 0;
pj_gettimeofday(&rdata->pkt_info.timestamp);
/* Report to transport manager.
* The transport manager will tell us how many bytes of the packet
* have been processed (as valid SIP message).
*/
size_eaten =
pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
rdata);
pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
/* Move unprocessed data to the front of the buffer */
if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) {
pj_memmove(rdata->pkt_info.packet,
rdata->pkt_info.packet + size_eaten,
rdata->pkt_info.len - size_eaten);
}
rdata->pkt_info.len -= size_eaten;
} else if (bytes_read == 0) {
/* Transport is closed */
PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
/* We can not destroy the transport since high level objects may
* still keep reference to this transport. So we can only
* instruct transport manager to gracefully start the shutdown
* procedure for this transport.
*/
if (tcp->close_reason==PJ_SUCCESS)
tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
pjsip_transport_shutdown(&tcp->base);
return;
//} else if (bytes_read < 0) {
} else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
-bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
-bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
{
/* Socket error. */
PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));
/* We can not destroy the transport since high level objects may
* still keep reference to this transport. So we can only
* instruct transport manager to gracefully start the shutdown
* procedure for this transport.
*/
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read;
pjsip_transport_shutdown(&tcp->base);
return;
}
if (i >= MAX_IMMEDIATE_PACKET) {
/* Receive quota reached. Force ioqueue_recv() to
* return PJ_EPENDING
*/
flags = PJ_IOQUEUE_ALWAYS_ASYNC;
} else {
flags = 0;
}
/* Reset pool. */
pj_pool_reset(rdata->tp_info.pool);
/* Read next packet. */
bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len;
rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
status = pj_ioqueue_recv(key, op_key,
rdata->pkt_info.packet+rdata->pkt_info.len,
&bytes_read, flags);
if (status == PJ_SUCCESS) {
/* Continue loop. */
pj_assert(i < MAX_IMMEDIATE_PACKET);
} else if (status == PJ_EPENDING) {
break;
} else {
/* Socket error */
PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));
/* We can not destroy the transport since high level objects may
* still keep reference to this transport. So we can only
* instruct transport manager to gracefully start the shutdown
* procedure for this transport.
*/
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
return;
}
}
}
/*
* Callback from ioqueue when asynchronous connect() operation completes.
*/
static void on_connect_complete(pj_ioqueue_key_t *key,
pj_status_t status)
{
struct tcp_transport *tcp;
pj_sockaddr_in addr;
int addrlen;
tcp = pj_ioqueue_get_user_data(key);
/* Mark that pending connect() operation has completed. */
tcp->has_pending_connect = PJ_FALSE;
/* Check connect() status */
if (status != PJ_SUCCESS) {
tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
/* Cancel all delayed transmits */
while (!pj_list_empty(&tcp->delayed_list)) {
struct delayed_tdata *pending_tx;
pj_ioqueue_op_key_t *op_key;
pending_tx = tcp->delayed_list.next;
pj_list_erase(pending_tx);
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
on_write_complete(tcp->key, op_key, -status);
}
/* We can not destroy the transport since high level objects may
* still keep reference to this transport. So we can only
* instruct transport manager to gracefully start the shutdown
* procedure for this transport.
*/
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
return;
}
PJ_LOG(4,(tcp->base.obj_name,
"TCP transport %.*s:%d is connected to %.*s:%d",
(int)tcp->base.local_name.host.slen,
tcp->base.local_name.host.ptr,
tcp->base.local_name.port,
(int)tcp->base.remote_name.host.slen,
tcp->base.remote_name.host.ptr,
tcp->base.remote_name.port));
/* Update (again) local address, just in case local address currently
* set is different now that the socket is connected (could happen
* on some systems, like old Win32 probably?).
*/
addrlen = sizeof(pj_sockaddr_in);
if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;
if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) {
tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr;
tp_addr->sin_port = addr.sin_port;
sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
tp_addr);
}
}
/* Start pending read */
status = tcp_start_read(tcp);
if (status != PJ_SUCCESS) {
/* We can not destroy the transport since high level objects may
* still keep reference to this transport. So we can only
* instruct transport manager to gracefully start the shutdown
* procedure for this transport.
*/
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
return;
}
/* Flush all pending send operations */
tcp_flush_pending_tx(tcp);
}
#endif /* PJ_HAS_TCP */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -