📄 sip_transport_tcp.c
字号:
* we stop the accept() operation. */ ++err_cnt; if (err_cnt >= 10) { PJ_LOG(1, (listener->factory.obj_name, "Too many errors, listener stopping")); } } else { pj_pool_t *pool; struct pending_accept *new_op; if (sock == PJ_INVALID_SOCKET) { sock = accept_op->new_sock; } if (sock == PJ_INVALID_SOCKET) { pj_assert(!"Should not happen. status should be error"); goto next_accept; } PJ_LOG(4,(listener->factory.obj_name, "TCP listener %.*s:%d: got incoming TCP connection " "from %s:%d, sock=%d", (int)listener->factory.addr_name.host.slen, listener->factory.addr_name.host.ptr, listener->factory.addr_name.port, pj_inet_ntoa(accept_op->remote_addr.sin_addr), pj_ntohs(accept_op->remote_addr.sin_port), sock)); /* Create new accept_opt */ pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p", POOL_TP_INIT, POOL_TP_INC); new_op = pj_pool_zalloc(pool, sizeof(struct pending_accept)); new_op->pool = pool; new_op->listener = listener; new_op->index = accept_op->index; pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key)); listener->accept_op[accept_op->index] = new_op; /* * Incoming connections! * Create TCP transport for the new socket. */ status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE, &accept_op->local_addr, &accept_op->remote_addr, &tcp); if (status == PJ_SUCCESS) { status = tcp_start_read(tcp); if (status != PJ_SUCCESS) { PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); tcp_destroy(&tcp->base, status); } } accept_op = new_op; }next_accept: /* * Start the next asynchronous accept() operation. */ accept_op->addr_len = sizeof(pj_sockaddr_in); accept_op->new_sock = PJ_INVALID_SOCKET; status = pj_ioqueue_accept(listener->key, &accept_op->op_key, &accept_op->new_sock, &accept_op->local_addr, &accept_op->remote_addr, &accept_op->addr_len); /* * Loop while we have immediate connection or when there is error. */ } while (status != PJ_EPENDING);}/* * Callback from ioqueue when packet is sent. */static void on_write_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent){ struct tcp_transport *tcp = pj_ioqueue_get_user_data(key); pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; tdata_op_key->tdata = NULL; /* Check for error/closure */ if (bytes_sent <= 0) { pj_status_t status; PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", bytes_sent)); status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) : -bytes_sent; if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); } if (tdata_op_key->callback) { /* * Notify sip_transport.c that packet has been sent. */ tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent); }}/* * This callback is called by transport manager to send SIP message */static pj_status_t tcp_send_msg(pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, int addr_len, void *token, void (*callback)(pjsip_transport *transport, void *token, pj_ssize_t sent_bytes)){ struct tcp_transport *tcp = (struct tcp_transport*)transport; pj_ssize_t size; pj_bool_t delayed = PJ_FALSE; pj_status_t status = PJ_SUCCESS; /* Sanity check */ PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); /* Check that there's no pending operation associated with the tdata */ PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); /* Check the address is supported */ PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL); /* Init op key. */ tdata->op_key.tdata = tdata; tdata->op_key.token = token; tdata->op_key.callback = callback; /* If asynchronous connect() has not completed yet, just put the * transmit data in the pending transmission list since we can not * use the socket yet. */ if (tcp->has_pending_connect) { /* * Looks like connect() is still in progress. Check again (this time * with holding the lock) to be sure. */ pj_lock_acquire(tcp->base.lock); if (tcp->has_pending_connect) { struct delayed_tdata *delayed_tdata; /* * connect() is still in progress. Put the transmit data to * the delayed list. */ delayed_tdata = pj_pool_alloc(tdata->pool, sizeof(*delayed_tdata)); delayed_tdata->tdata_op_key = &tdata->op_key; pj_list_push_back(&tcp->delayed_list, delayed_tdata); status = PJ_EPENDING; /* Prevent pj_ioqueue_send() to be called below */ delayed = PJ_TRUE; } pj_lock_release(tcp->base.lock); } if (!delayed) { /* * Transport is ready to go. Send the packet to ioqueue to be * sent asynchronously. */ size = tdata->buf.cur - tdata->buf.start; status = pj_ioqueue_send(tcp->key, (pj_ioqueue_op_key_t*)&tdata->op_key, tdata->buf.start, &size, 0); if (status != PJ_EPENDING) { /* Not pending (could be immediate success or error) */ tdata->op_key.tdata = NULL; /* Shutdown transport on closure/errors */ if (size <= 0) { PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", size)); if (status == PJ_SUCCESS) status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); } } } return status;}/* * This callback is called by transport manager to shutdown transport. * This normally is only used by UDP transport. */static pj_status_t tcp_shutdown(pjsip_transport *transport){ PJ_UNUSED_ARG(transport); /* Nothing to do for TCP */ return PJ_SUCCESS;}/* * Callback from ioqueue that an incoming data is received from the socket. */static void on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read){ enum { MAX_IMMEDIATE_PACKET = 10 }; pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; pjsip_rx_data *rdata = rdata_op_key->rdata; struct tcp_transport *tcp = (struct tcp_transport*)rdata->tp_info.transport; int i; pj_status_t status; /* Don't do anything if transport is closing. */ if (tcp->is_closing) { tcp->is_closing++; return; } /* * The idea of the loop is to process immediate data received by * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to * complete asynchronously, to allow other sockets to get their data. */ for (i=0;; ++i) { pj_uint32_t flags; /* Houston, we have packet! Report the packet to transport manager * to be parsed. */ if (bytes_read > 0) { pj_size_t size_eaten; /* Init pkt_info part. */ rdata->pkt_info.len += bytes_read; rdata->pkt_info.zero = 0; pj_gettimeofday(&rdata->pkt_info.timestamp); /* Report to transport manager. * The transport manager will tell us how many bytes of the packet * have been processed (as valid SIP message). */ size_eaten = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata); pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); /* Move unprocessed data to the front of the buffer */ if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) { pj_memmove(rdata->pkt_info.packet, rdata->pkt_info.packet + size_eaten, rdata->pkt_info.len - size_eaten); } rdata->pkt_info.len -= size_eaten; } else if (bytes_read == 0) { /* Transport is closed */ PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); pjsip_transport_shutdown(&tcp->base); return; //} else if (bytes_read < 0) { } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) { /* Socket error. */ PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; pjsip_transport_shutdown(&tcp->base); return; } if (i >= MAX_IMMEDIATE_PACKET) { /* Receive quota reached. Force ioqueue_recv() to * return PJ_EPENDING */ flags = PJ_IOQUEUE_ALWAYS_ASYNC; } else { flags = 0; } /* Reset pool. */ pj_pool_reset(rdata->tp_info.pool); /* Read next packet. */ bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len; rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr); status = pj_ioqueue_recv(key, op_key, rdata->pkt_info.packet+rdata->pkt_info.len, &bytes_read, flags); if (status == PJ_SUCCESS) { /* Continue loop. */ pj_assert(i < MAX_IMMEDIATE_PACKET); } else if (status == PJ_EPENDING) { break; } else { /* Socket error */ PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return; } }}/* * Callback from ioqueue when asynchronous connect() operation completes. */static void on_connect_complete(pj_ioqueue_key_t *key, pj_status_t status){ struct tcp_transport *tcp; pj_sockaddr_in addr; int addrlen; tcp = pj_ioqueue_get_user_data(key); /* Mark that pending connect() operation has completed. */ tcp->has_pending_connect = PJ_FALSE; /* Check connect() status */ if (status != PJ_SUCCESS) { tcp_perror(tcp->base.obj_name, "TCP connect() error", status); /* Cancel all delayed transmits */ while (!pj_list_empty(&tcp->delayed_list)) { struct delayed_tdata *pending_tx; pj_ioqueue_op_key_t *op_key; pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; on_write_complete(tcp->key, op_key, -status); } /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return; } PJ_LOG(4,(tcp->base.obj_name, "TCP transport %.*s:%d is connected to %.*s:%d", (int)tcp->base.local_name.host.slen, tcp->base.local_name.host.ptr, tcp->base.local_name.port, (int)tcp->base.remote_name.host.slen, tcp->base.remote_name.host.ptr, tcp->base.remote_name.port)); /* Update (again) local address, just in case local address currently * set is different now that the socket is connected (could happen * on some systems, like old Win32 probably?). */ addrlen = sizeof(pj_sockaddr_in); if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) { pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) { tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr; tp_addr->sin_port = addr.sin_port; sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, tp_addr); } } /* Start pending read */ status = tcp_start_read(tcp); if (status != PJ_SUCCESS) { /* We can not destroy the transport since high level objects may * still keep reference to this transport. So we can only * instruct transport manager to gracefully start the shutdown * procedure for this transport. */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); return; } /* Flush all pending send operations */ tcp_flush_pending_tx(tcp);}#endif /* PJ_HAS_TCP */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -