📄 sip_transport_tcp.c
字号:
/* Called by transport manager to destroy transport */static pj_status_t tcp_destroy_transport(pjsip_transport *transport);/* Utility to destroy transport */static pj_status_t tcp_destroy(pjsip_transport *transport, pj_status_t reason);/* Callback from ioqueue on incoming packet */static void on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read);/* Callback from ioqueue when packet is sent */static void on_write_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent);/* Callback from ioqueue when connect completes */static void on_connect_complete(pj_ioqueue_key_t *key, pj_status_t status);/* * Common function to create TCP transport, called when pending accept() and * pending connect() complete. */static pj_status_t tcp_create( struct tcp_listener *listener, pj_pool_t *pool, pj_sock_t sock, pj_bool_t is_server, const pj_sockaddr_in *local, const pj_sockaddr_in *remote, struct tcp_transport **p_tcp){ struct tcp_transport *tcp; pj_ioqueue_t *ioqueue; pj_ioqueue_callback tcp_callback; pj_status_t status; PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL); if (pool == NULL) { pool = pjsip_endpt_create_pool(listener->endpt, "tcp", POOL_TP_INIT, POOL_TP_INC); PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM); } /* * Create and initialize basic transport structure. */ tcp = pj_pool_zalloc(pool, sizeof(*tcp)); tcp->sock = sock; tcp->is_server = is_server; tcp->listener = listener; pj_list_init(&tcp->delayed_list); tcp->base.pool = pool; pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, (is_server ? "tcps%p" :"tcpc%p"), tcp); status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); if (status != PJ_SUCCESS) { goto on_error; } status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); if (status != PJ_SUCCESS) { goto on_error; } tcp->base.key.type = PJSIP_TRANSPORT_TCP; pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in)); tcp->base.type_name = "tcp"; tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); tcp->base.info = pj_pool_alloc(pool, 64); pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d", pj_inet_ntoa(remote->sin_addr), (int)pj_ntohs(remote->sin_port)); tcp->base.addr_len = sizeof(pj_sockaddr_in); pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in)); sockaddr_to_host_port(pool, &tcp->base.local_name, local); sockaddr_to_host_port(pool, &tcp->base.remote_name, remote); tcp->base.endpt = listener->endpt; tcp->base.tpmgr = listener->tpmgr; tcp->base.send_msg = &tcp_send_msg; tcp->base.do_shutdown = &tcp_shutdown; tcp->base.destroy = &tcp_destroy_transport; /* Register socket to ioqueue */ pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback)); tcp_callback.on_read_complete = &on_read_complete; tcp_callback.on_write_complete = &on_write_complete; tcp_callback.on_connect_complete = &on_connect_complete; ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); status = pj_ioqueue_register_sock(pool, ioqueue, sock, tcp, &tcp_callback, &tcp->key); if (status != PJ_SUCCESS) { goto on_error; } /* Register transport to transport manager */ status = pjsip_transport_register(listener->tpmgr, &tcp->base); if (status != PJ_SUCCESS) { goto on_error; } tcp->is_registered = PJ_TRUE; /* Done setting up basic transport. */ *p_tcp = tcp; PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created", (tcp->is_server ? "server" : "client"))); return PJ_SUCCESS;on_error: tcp_destroy(&tcp->base, status); return status;}/* Flush all delayed transmision once the socket is connected. */static void tcp_flush_pending_tx(struct tcp_transport *tcp){ pj_lock_acquire(tcp->base.lock); while (!pj_list_empty(&tcp->delayed_list)) { struct delayed_tdata *pending_tx; pjsip_tx_data *tdata; pj_ioqueue_op_key_t *op_key; pj_ssize_t size; pj_status_t status; pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); tdata = pending_tx->tdata_op_key->tdata; op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; /* send to ioqueue! */ size = tdata->buf.cur - tdata->buf.start; status = pj_ioqueue_send(tcp->key, op_key, tdata->buf.start, &size, 0); if (status != PJ_EPENDING) { on_write_complete(tcp->key, op_key, size); } } pj_lock_release(tcp->base.lock);}/* Called by transport manager to destroy transport */static pj_status_t tcp_destroy_transport(pjsip_transport *transport){ struct tcp_transport *tcp = (struct tcp_transport*)transport; /* Transport would have been unregistered by now since this callback * is called by transport manager. */ tcp->is_registered = PJ_FALSE; return tcp_destroy(transport, tcp->close_reason);}/* Destroy TCP transport */static pj_status_t tcp_destroy(pjsip_transport *transport, pj_status_t reason){ struct tcp_transport *tcp = (struct tcp_transport*)transport; if (tcp->close_reason == 0) tcp->close_reason = reason; if (tcp->is_registered) { tcp->is_registered = PJ_FALSE; pjsip_transport_destroy(transport); /* pjsip_transport_destroy will recursively call this function * again. */ return PJ_SUCCESS; } /* Mark transport as closing */ tcp->is_closing = PJ_TRUE; /* Cancel all delayed transmits */ while (!pj_list_empty(&tcp->delayed_list)) { struct delayed_tdata *pending_tx; pj_ioqueue_op_key_t *op_key; pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; on_write_complete(tcp->key, op_key, -reason); } if (tcp->rdata.tp_info.pool) { pj_pool_release(tcp->rdata.tp_info.pool); tcp->rdata.tp_info.pool = NULL; } if (tcp->key) { pj_ioqueue_unregister(tcp->key); tcp->key = NULL; tcp->sock = PJ_INVALID_SOCKET; } if (tcp->sock != PJ_INVALID_SOCKET) { pj_sock_close(tcp->sock); tcp->sock = PJ_INVALID_SOCKET; } if (tcp->base.lock) { pj_lock_destroy(tcp->base.lock); tcp->base.lock = NULL; } if (tcp->base.ref_cnt) { pj_atomic_destroy(tcp->base.ref_cnt); tcp->base.ref_cnt = NULL; } if (tcp->base.pool) { pj_pool_t *pool; if (reason != PJ_SUCCESS) { char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(reason, errmsg, sizeof(errmsg)); PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed with reason %d: %s", reason, errmsg)); } else { PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed normally")); } pool = tcp->base.pool; tcp->base.pool = NULL; pj_pool_release(pool); } return PJ_SUCCESS;}/* * This utility function creates receive data buffers and start * asynchronous recv() operations from the socket. It is called after * accept() or connect() operation complete. */static pj_status_t tcp_start_read(struct tcp_transport *tcp){ pj_pool_t *pool; pj_ssize_t size; pj_sockaddr_in *rem_addr; pj_status_t status; /* Init rdata */ pool = pjsip_endpt_create_pool(tcp->listener->endpt, "rtd%p", PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC); if (!pool) { tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM); return PJ_ENOMEM; } tcp->rdata.tp_info.pool = pool; tcp->rdata.tp_info.transport = &tcp->base; tcp->rdata.tp_info.tp_data = tcp; tcp->rdata.tp_info.op_key.rdata = &tcp->rdata; pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, sizeof(pj_ioqueue_op_key_t)); tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr; tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in); rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr; pj_ansi_strcpy(tcp->rdata.pkt_info.src_name, pj_inet_ntoa(rem_addr->sin_addr)); tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port); size = sizeof(tcp->rdata.pkt_info.packet); status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key, tcp->rdata.pkt_info.packet, &size, PJ_IOQUEUE_ALWAYS_ASYNC); if (status != PJ_SUCCESS && status != PJ_EPENDING) { PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d", status)); return status; } return PJ_SUCCESS;}/* This callback is called by transport manager for the TCP factory * to create outgoing transport to the specified destination. */static pj_status_t lis_create_transport(pjsip_tpfactory *factory, pjsip_tpmgr *mgr, pjsip_endpoint *endpt, const pj_sockaddr *rem_addr, int addr_len, pjsip_transport **p_transport){ struct tcp_listener *listener; struct tcp_transport *tcp; pj_sock_t sock; pj_sockaddr_in local_addr; pj_status_t status; /* Sanity checks */ PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr && addr_len && p_transport, PJ_EINVAL); /* Check that address is a sockaddr_in */ PJ_ASSERT_RETURN(rem_addr->sa_family == PJ_AF_INET && addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL); listener = (struct tcp_listener*)factory; /* Create socket */ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); if (status != PJ_SUCCESS) return status; /* Bind to any port */ status = pj_sock_bind_in(sock, 0, 0); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } /* Get the local port */ addr_len = sizeof(pj_sockaddr_in); status = pj_sock_getsockname(sock, &local_addr, &addr_len); if (status != PJ_SUCCESS) { pj_sock_close(sock); return status; } /* Initially set the address from the listener's address */ local_addr.sin_addr.s_addr = ((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr; /* Create the transport descriptor */ status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr, (pj_sockaddr_in*)rem_addr, &tcp); if (status != PJ_SUCCESS) return status; /* Start asynchronous connect() operation */ tcp->has_pending_connect = PJ_TRUE; status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); if (status == PJ_SUCCESS) { tcp->has_pending_connect = PJ_FALSE; } else if (status != PJ_EPENDING) { tcp_destroy(&tcp->base, status); return status; } /* Update (again) local address, just in case local address currently * set is different now that asynchronous connect() is started. */ addr_len = sizeof(pj_sockaddr_in); if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) { pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; /* Some systems (like old Win32 perhaps) may not set local address * properly before socket is fully connected. */ if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr && local_addr.sin_addr.s_addr != 0) { tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr; tp_addr->sin_port = local_addr.sin_port; sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name, &local_addr); } } if (tcp->has_pending_connect) { PJ_LOG(4,(tcp->base.obj_name, "TCP transport %.*s:%d is connecting to %.*s:%d...", (int)tcp->base.local_name.host.slen, tcp->base.local_name.host.ptr, tcp->base.local_name.port, (int)tcp->base.remote_name.host.slen, tcp->base.remote_name.host.ptr, tcp->base.remote_name.port)); } /* Done */ *p_transport = &tcp->base; return PJ_SUCCESS;}/* * This callback is called by ioqueue when pending accept() operation has * completed. */static void on_accept_complete( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t sock, pj_status_t status){ struct tcp_listener *listener; struct tcp_transport *tcp; struct pending_accept *accept_op; int err_cnt = 0; listener = pj_ioqueue_get_user_data(key); accept_op = (struct pending_accept*) op_key; /* * Loop while there is immediate connection or when there is error. */ do { if (status == PJ_EPENDING) { /* * This can only happen when this function is called during * initialization to kick off asynchronous accept(). */ } else if (status != PJ_SUCCESS) { /* * Error in accept(). */ tcp_perror(listener->factory.obj_name, "Error in accept()", status); /* * Prevent endless accept() error loop by limiting the * number of consecutive errors. Once the number of errors * is equal to maximum, we treat this as permanent error, and
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -