⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sip_transport_tcp.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 3 页
字号:
/* Called by transport manager to destroy transport */static pj_status_t tcp_destroy_transport(pjsip_transport *transport);/* Utility to destroy transport */static pj_status_t tcp_destroy(pjsip_transport *transport,			       pj_status_t reason);/* Callback from ioqueue on incoming packet */static void on_read_complete(pj_ioqueue_key_t *key,                              pj_ioqueue_op_key_t *op_key,                              pj_ssize_t bytes_read);/* Callback from ioqueue when packet is sent */static void on_write_complete(pj_ioqueue_key_t *key,                               pj_ioqueue_op_key_t *op_key,                               pj_ssize_t bytes_sent);/* Callback from ioqueue when connect completes */static void on_connect_complete(pj_ioqueue_key_t *key,                                 pj_status_t status);/* * Common function to create TCP transport, called when pending accept() and * pending connect() complete. */static pj_status_t tcp_create( struct tcp_listener *listener,			       pj_pool_t *pool,			       pj_sock_t sock, pj_bool_t is_server,			       const pj_sockaddr_in *local,			       const pj_sockaddr_in *remote,			       struct tcp_transport **p_tcp){    struct tcp_transport *tcp;    pj_ioqueue_t *ioqueue;    pj_ioqueue_callback tcp_callback;    pj_status_t status;        PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);    if (pool == NULL) {	pool = pjsip_endpt_create_pool(listener->endpt, "tcp",				       POOL_TP_INIT, POOL_TP_INC);	PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);    }        /*     * Create and initialize basic transport structure.     */    tcp = pj_pool_zalloc(pool, sizeof(*tcp));    tcp->sock = sock;    tcp->is_server = is_server;    tcp->listener = listener;    pj_list_init(&tcp->delayed_list);    tcp->base.pool = pool;    pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, 		     (is_server ? "tcps%p" :"tcpc%p"), tcp);    status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);    if (status != PJ_SUCCESS) {	goto on_error;    }    status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);    if (status != PJ_SUCCESS) {	goto on_error;    }    tcp->base.key.type = PJSIP_TRANSPORT_TCP;    pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in));    tcp->base.type_name = "tcp";    tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);    tcp->base.info = pj_pool_alloc(pool, 64);    pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d",		     pj_inet_ntoa(remote->sin_addr), 		     (int)pj_ntohs(remote->sin_port));    tcp->base.addr_len = sizeof(pj_sockaddr_in);    pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in));    sockaddr_to_host_port(pool, &tcp->base.local_name, local);    sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);    tcp->base.endpt = listener->endpt;    tcp->base.tpmgr = listener->tpmgr;    tcp->base.send_msg = &tcp_send_msg;    tcp->base.do_shutdown = &tcp_shutdown;    tcp->base.destroy = &tcp_destroy_transport;    /* Register socket to ioqueue */    pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback));    tcp_callback.on_read_complete = &on_read_complete;    tcp_callback.on_write_complete = &on_write_complete;    tcp_callback.on_connect_complete = &on_connect_complete;    ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);    status = pj_ioqueue_register_sock(pool, ioqueue, sock, 				      tcp, &tcp_callback, &tcp->key);    if (status != PJ_SUCCESS) {	goto on_error;    }    /* Register transport to transport manager */    status = pjsip_transport_register(listener->tpmgr, &tcp->base);    if (status != PJ_SUCCESS) {	goto on_error;    }    tcp->is_registered = PJ_TRUE;    /* Done setting up basic transport. */    *p_tcp = tcp;    PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",	      (tcp->is_server ? "server" : "client")));    return PJ_SUCCESS;on_error:    tcp_destroy(&tcp->base, status);    return status;}/* Flush all delayed transmision once the socket is connected. */static void tcp_flush_pending_tx(struct tcp_transport *tcp){    pj_lock_acquire(tcp->base.lock);    while (!pj_list_empty(&tcp->delayed_list)) {	struct delayed_tdata *pending_tx;	pjsip_tx_data *tdata;	pj_ioqueue_op_key_t *op_key;	pj_ssize_t size;	pj_status_t status;	pending_tx = tcp->delayed_list.next;	pj_list_erase(pending_tx);	tdata = pending_tx->tdata_op_key->tdata;	op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;	/* send to ioqueue! */	size = tdata->buf.cur - tdata->buf.start;	status = pj_ioqueue_send(tcp->key, op_key,				 tdata->buf.start, &size, 0);	if (status != PJ_EPENDING) {	    on_write_complete(tcp->key, op_key, size);	}    }    pj_lock_release(tcp->base.lock);}/* Called by transport manager to destroy transport */static pj_status_t tcp_destroy_transport(pjsip_transport *transport){    struct tcp_transport *tcp = (struct tcp_transport*)transport;    /* Transport would have been unregistered by now since this callback     * is called by transport manager.     */    tcp->is_registered = PJ_FALSE;    return tcp_destroy(transport, tcp->close_reason);}/* Destroy TCP transport */static pj_status_t tcp_destroy(pjsip_transport *transport, 			       pj_status_t reason){    struct tcp_transport *tcp = (struct tcp_transport*)transport;    if (tcp->close_reason == 0)	tcp->close_reason = reason;    if (tcp->is_registered) {	tcp->is_registered = PJ_FALSE;	pjsip_transport_destroy(transport);	/* pjsip_transport_destroy will recursively call this function	 * again.	 */	return PJ_SUCCESS;    }    /* Mark transport as closing */    tcp->is_closing = PJ_TRUE;    /* Cancel all delayed transmits */    while (!pj_list_empty(&tcp->delayed_list)) {	struct delayed_tdata *pending_tx;	pj_ioqueue_op_key_t *op_key;	pending_tx = tcp->delayed_list.next;	pj_list_erase(pending_tx);	op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;	on_write_complete(tcp->key, op_key, -reason);    }    if (tcp->rdata.tp_info.pool) {	pj_pool_release(tcp->rdata.tp_info.pool);	tcp->rdata.tp_info.pool = NULL;    }    if (tcp->key) {	pj_ioqueue_unregister(tcp->key);	tcp->key = NULL;	tcp->sock = PJ_INVALID_SOCKET;    }    if (tcp->sock != PJ_INVALID_SOCKET) {	pj_sock_close(tcp->sock);	tcp->sock = PJ_INVALID_SOCKET;    }    if (tcp->base.lock) {	pj_lock_destroy(tcp->base.lock);	tcp->base.lock = NULL;    }    if (tcp->base.ref_cnt) {	pj_atomic_destroy(tcp->base.ref_cnt);	tcp->base.ref_cnt = NULL;    }    if (tcp->base.pool) {	pj_pool_t *pool;	if (reason != PJ_SUCCESS) {	    char errmsg[PJ_ERR_MSG_SIZE];	    pj_strerror(reason, errmsg, sizeof(errmsg));	    PJ_LOG(4,(tcp->base.obj_name, 		      "TCP transport destroyed with reason %d: %s", 		      reason, errmsg));	} else {	    PJ_LOG(4,(tcp->base.obj_name, 		      "TCP transport destroyed normally"));	}	pool = tcp->base.pool;	tcp->base.pool = NULL;	pj_pool_release(pool);    }    return PJ_SUCCESS;}/* * This utility function creates receive data buffers and start * asynchronous recv() operations from the socket. It is called after * accept() or connect() operation complete. */static pj_status_t tcp_start_read(struct tcp_transport *tcp){    pj_pool_t *pool;    pj_ssize_t size;    pj_sockaddr_in *rem_addr;    pj_status_t status;    /* Init rdata */    pool = pjsip_endpt_create_pool(tcp->listener->endpt,				   "rtd%p",				   PJSIP_POOL_RDATA_LEN,				   PJSIP_POOL_RDATA_INC);    if (!pool) {	tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);	return PJ_ENOMEM;    }    tcp->rdata.tp_info.pool = pool;    tcp->rdata.tp_info.transport = &tcp->base;    tcp->rdata.tp_info.tp_data = tcp;    tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;    pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, 			   sizeof(pj_ioqueue_op_key_t));    tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;    tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);    rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr;    pj_ansi_strcpy(tcp->rdata.pkt_info.src_name,		   pj_inet_ntoa(rem_addr->sin_addr));    tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port);    size = sizeof(tcp->rdata.pkt_info.packet);    status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key,			     tcp->rdata.pkt_info.packet, &size,			     PJ_IOQUEUE_ALWAYS_ASYNC);    if (status != PJ_SUCCESS && status != PJ_EPENDING) {	PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d", 		   status));	return status;    }    return PJ_SUCCESS;}/* This callback is called by transport manager for the TCP factory * to create outgoing transport to the specified destination. */static pj_status_t lis_create_transport(pjsip_tpfactory *factory,					pjsip_tpmgr *mgr,					pjsip_endpoint *endpt,					const pj_sockaddr *rem_addr,					int addr_len,					pjsip_transport **p_transport){    struct tcp_listener *listener;    struct tcp_transport *tcp;    pj_sock_t sock;    pj_sockaddr_in local_addr;    pj_status_t status;    /* Sanity checks */    PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&		     addr_len && p_transport, PJ_EINVAL);    /* Check that address is a sockaddr_in */    PJ_ASSERT_RETURN(rem_addr->sa_family == PJ_AF_INET &&		     addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL);    listener = (struct tcp_listener*)factory;        /* Create socket */    status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock);    if (status != PJ_SUCCESS)	return status;    /* Bind to any port */    status = pj_sock_bind_in(sock, 0, 0);    if (status != PJ_SUCCESS) {	pj_sock_close(sock);	return status;    }    /* Get the local port */    addr_len = sizeof(pj_sockaddr_in);    status = pj_sock_getsockname(sock, &local_addr, &addr_len);    if (status != PJ_SUCCESS) {	pj_sock_close(sock);	return status;    }    /* Initially set the address from the listener's address */    local_addr.sin_addr.s_addr = 	((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr;    /* Create the transport descriptor */    status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr, 			(pj_sockaddr_in*)rem_addr, &tcp);    if (status != PJ_SUCCESS)	return status;    /* Start asynchronous connect() operation */    tcp->has_pending_connect = PJ_TRUE;    status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in));    if (status == PJ_SUCCESS) {	tcp->has_pending_connect = PJ_FALSE;    } else if (status != PJ_EPENDING) {	tcp_destroy(&tcp->base, status);	return status;    }    /* Update (again) local address, just in case local address currently     * set is different now that asynchronous connect() is started.     */    addr_len = sizeof(pj_sockaddr_in);    if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) {	pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;	/* Some systems (like old Win32 perhaps) may not set local address	 * properly before socket is fully connected.	 */	if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr &&	    local_addr.sin_addr.s_addr != 0) 	{	    tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr;	    tp_addr->sin_port = local_addr.sin_port;	    sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,				  &local_addr);	}    }    if (tcp->has_pending_connect) {	PJ_LOG(4,(tcp->base.obj_name, 		  "TCP transport %.*s:%d is connecting to %.*s:%d...",		  (int)tcp->base.local_name.host.slen,		  tcp->base.local_name.host.ptr,		  tcp->base.local_name.port,		  (int)tcp->base.remote_name.host.slen,		  tcp->base.remote_name.host.ptr,		  tcp->base.remote_name.port));    }    /* Done */    *p_transport = &tcp->base;    return PJ_SUCCESS;}/* * This callback is called by ioqueue when pending accept() operation has * completed. */static void on_accept_complete(	pj_ioqueue_key_t *key, 				pj_ioqueue_op_key_t *op_key, 				pj_sock_t sock, 				pj_status_t status){    struct tcp_listener *listener;    struct tcp_transport *tcp;    struct pending_accept *accept_op;    int err_cnt = 0;    listener = pj_ioqueue_get_user_data(key);    accept_op = (struct pending_accept*) op_key;    /*     * Loop while there is immediate connection or when there is error.     */    do {	if (status == PJ_EPENDING) {	    /*	     * This can only happen when this function is called during	     * initialization to kick off asynchronous accept().	     */	} else if (status != PJ_SUCCESS) {	    /*	     * Error in accept().	     */	    tcp_perror(listener->factory.obj_name, "Error in accept()", 		       status);	    /*	     * Prevent endless accept() error loop by limiting the	     * number of consecutive errors. Once the number of errors	     * is equal to maximum, we treat this as permanent error, and

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -