⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sip_transport_tcp.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 3 页
字号:
	     * we stop the accept() operation.	     */	    ++err_cnt;	    if (err_cnt >= 10) {		PJ_LOG(1, (listener->factory.obj_name, 			   "Too many errors, listener stopping"));	    }	} else {	    pj_pool_t *pool;	    struct pending_accept *new_op;	    if (sock == PJ_INVALID_SOCKET) {		sock = accept_op->new_sock;	    }	    if (sock == PJ_INVALID_SOCKET) {		pj_assert(!"Should not happen. status should be error");		goto next_accept;	    }	    PJ_LOG(4,(listener->factory.obj_name, 		      "TCP listener %.*s:%d: got incoming TCP connection "		      "from %s:%d, sock=%d",		      (int)listener->factory.addr_name.host.slen,		      listener->factory.addr_name.host.ptr,		      listener->factory.addr_name.port,		      pj_inet_ntoa(accept_op->remote_addr.sin_addr),		      pj_ntohs(accept_op->remote_addr.sin_port),		      sock));	    /* Create new accept_opt */	    pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p", 					   POOL_TP_INIT, POOL_TP_INC);	    new_op = pj_pool_zalloc(pool, sizeof(struct pending_accept));	    new_op->pool = pool;	    new_op->listener = listener;	    new_op->index = accept_op->index;	    pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key));	    listener->accept_op[accept_op->index] = new_op;	    /* 	     * Incoming connections!	     * Create TCP transport for the new socket.	     */	    status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE,				 &accept_op->local_addr, 				 &accept_op->remote_addr, &tcp);	    if (status == PJ_SUCCESS) {		status = tcp_start_read(tcp);		if (status != PJ_SUCCESS) {		    PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));		    tcp_destroy(&tcp->base, status);		}	    }	    accept_op = new_op;	}next_accept:	/*	 * Start the next asynchronous accept() operation.	 */	accept_op->addr_len = sizeof(pj_sockaddr_in);	accept_op->new_sock = PJ_INVALID_SOCKET;	status = pj_ioqueue_accept(listener->key, 				   &accept_op->op_key,				   &accept_op->new_sock,				   &accept_op->local_addr,				   &accept_op->remote_addr,				   &accept_op->addr_len);	/*	 * Loop while we have immediate connection or when there is error.	 */    } while (status != PJ_EPENDING);}/*  * Callback from ioqueue when packet is sent. */static void on_write_complete(pj_ioqueue_key_t *key,                               pj_ioqueue_op_key_t *op_key,                               pj_ssize_t bytes_sent){    struct tcp_transport *tcp = pj_ioqueue_get_user_data(key);    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;    tdata_op_key->tdata = NULL;    /* Check for error/closure */    if (bytes_sent <= 0) {	pj_status_t status;	PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 		  bytes_sent));	status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :				     -bytes_sent;	if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;	pjsip_transport_shutdown(&tcp->base);    }    if (tdata_op_key->callback) {	/*	 * Notify sip_transport.c that packet has been sent.	 */	tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);    }}/*  * This callback is called by transport manager to send SIP message  */static pj_status_t tcp_send_msg(pjsip_transport *transport, 				pjsip_tx_data *tdata,				const pj_sockaddr_t *rem_addr,				int addr_len,				void *token,				void (*callback)(pjsip_transport *transport,						 void *token, 						 pj_ssize_t sent_bytes)){    struct tcp_transport *tcp = (struct tcp_transport*)transport;    pj_ssize_t size;    pj_bool_t delayed = PJ_FALSE;    pj_status_t status = PJ_SUCCESS;    /* Sanity check */    PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);    /* Check that there's no pending operation associated with the tdata */    PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);        /* Check the address is supported */    PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL);    /* Init op key. */    tdata->op_key.tdata = tdata;    tdata->op_key.token = token;    tdata->op_key.callback = callback;    /* If asynchronous connect() has not completed yet, just put the     * transmit data in the pending transmission list since we can not     * use the socket yet.     */    if (tcp->has_pending_connect) {	/*	 * Looks like connect() is still in progress. Check again (this time	 * with holding the lock) to be sure.	 */	pj_lock_acquire(tcp->base.lock);	if (tcp->has_pending_connect) {	    struct delayed_tdata *delayed_tdata;	    /*	     * connect() is still in progress. Put the transmit data to	     * the delayed list.	     */	    delayed_tdata = pj_pool_alloc(tdata->pool, 					  sizeof(*delayed_tdata));	    delayed_tdata->tdata_op_key = &tdata->op_key;	    pj_list_push_back(&tcp->delayed_list, delayed_tdata);	    status = PJ_EPENDING;	    /* Prevent pj_ioqueue_send() to be called below */	    delayed = PJ_TRUE;	}	pj_lock_release(tcp->base.lock);    }         if (!delayed) {	/*	 * Transport is ready to go. Send the packet to ioqueue to be	 * sent asynchronously.	 */	size = tdata->buf.cur - tdata->buf.start;	status = pj_ioqueue_send(tcp->key, 				 (pj_ioqueue_op_key_t*)&tdata->op_key,				 tdata->buf.start, &size, 0);	if (status != PJ_EPENDING) {	    /* Not pending (could be immediate success or error) */	    tdata->op_key.tdata = NULL;	    /* Shutdown transport on closure/errors */	    if (size <= 0) {		PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 			  size));		if (status == PJ_SUCCESS) 		    status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);		if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;		pjsip_transport_shutdown(&tcp->base);	    }	}    }    return status;}/*  * This callback is called by transport manager to shutdown transport. * This normally is only used by UDP transport. */static pj_status_t tcp_shutdown(pjsip_transport *transport){    PJ_UNUSED_ARG(transport);    /* Nothing to do for TCP */    return PJ_SUCCESS;}/*  * Callback from ioqueue that an incoming data is received from the socket. */static void on_read_complete(pj_ioqueue_key_t *key,                              pj_ioqueue_op_key_t *op_key,                              pj_ssize_t bytes_read){    enum { MAX_IMMEDIATE_PACKET = 10 };    pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;    pjsip_rx_data *rdata = rdata_op_key->rdata;    struct tcp_transport *tcp = 	(struct tcp_transport*)rdata->tp_info.transport;    int i;    pj_status_t status;    /* Don't do anything if transport is closing. */    if (tcp->is_closing) {	tcp->is_closing++;	return;    }    /*     * The idea of the loop is to process immediate data received by     * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When     * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to     * complete asynchronously, to allow other sockets to get their data.     */    for (i=0;; ++i) {	pj_uint32_t flags;	/* Houston, we have packet! Report the packet to transport manager	 * to be parsed.	 */	if (bytes_read > 0) {	    pj_size_t size_eaten;	    /* Init pkt_info part. */	    rdata->pkt_info.len += bytes_read;	    rdata->pkt_info.zero = 0;	    pj_gettimeofday(&rdata->pkt_info.timestamp);	    /* Report to transport manager.	     * The transport manager will tell us how many bytes of the packet	     * have been processed (as valid SIP message).	     */	    size_eaten = 		pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 					   rdata);	    pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);	    /* Move unprocessed data to the front of the buffer */	    if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) {		pj_memmove(rdata->pkt_info.packet,			   rdata->pkt_info.packet + size_eaten,			   rdata->pkt_info.len - size_eaten);	    }	    	    rdata->pkt_info.len -= size_eaten;	} else if (bytes_read == 0) {	    /* Transport is closed */	    PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));	    	    /* We can not destroy the transport since high level objects may	     * still keep reference to this transport. So we can only 	     * instruct transport manager to gracefully start the shutdown	     * procedure for this transport.	     */	    if (tcp->close_reason==PJ_SUCCESS) 		tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);	    pjsip_transport_shutdown(&tcp->base);	    return;	//} else if (bytes_read < 0)  {	} else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&		   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 		   -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) 	{	    /* Socket error. */	    PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));	    /* We can not destroy the transport since high level objects may	     * still keep reference to this transport. So we can only 	     * instruct transport manager to gracefully start the shutdown	     * procedure for this transport.	     */	    if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read;	    pjsip_transport_shutdown(&tcp->base);	    return;	}	if (i >= MAX_IMMEDIATE_PACKET) {	    /* Receive quota reached. Force ioqueue_recv() to 	     * return PJ_EPENDING 	     */	    flags = PJ_IOQUEUE_ALWAYS_ASYNC;	} else {	    flags = 0;	}	/* Reset pool. */	pj_pool_reset(rdata->tp_info.pool);	/* Read next packet. */	bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len;	rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);	status = pj_ioqueue_recv(key, op_key, 				 rdata->pkt_info.packet+rdata->pkt_info.len,				 &bytes_read, flags);	if (status == PJ_SUCCESS) {	    /* Continue loop. */	    pj_assert(i < MAX_IMMEDIATE_PACKET);	} else if (status == PJ_EPENDING) {	    break;	} else {	    /* Socket error */	    PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));	    /* We can not destroy the transport since high level objects may	     * still keep reference to this transport. So we can only 	     * instruct transport manager to gracefully start the shutdown	     * procedure for this transport.	     */	    if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;	    pjsip_transport_shutdown(&tcp->base);	    return;	}    }}/*  * Callback from ioqueue when asynchronous connect() operation completes. */static void on_connect_complete(pj_ioqueue_key_t *key,                                 pj_status_t status){    struct tcp_transport *tcp;    pj_sockaddr_in addr;    int addrlen;    tcp = pj_ioqueue_get_user_data(key);    /* Mark that pending connect() operation has completed. */    tcp->has_pending_connect = PJ_FALSE;    /* Check connect() status */    if (status != PJ_SUCCESS) {	tcp_perror(tcp->base.obj_name, "TCP connect() error", status);	/* Cancel all delayed transmits */	while (!pj_list_empty(&tcp->delayed_list)) {	    struct delayed_tdata *pending_tx;	    pj_ioqueue_op_key_t *op_key;	    pending_tx = tcp->delayed_list.next;	    pj_list_erase(pending_tx);	    op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;	    on_write_complete(tcp->key, op_key, -status);	}	/* We can not destroy the transport since high level objects may	 * still keep reference to this transport. So we can only 	 * instruct transport manager to gracefully start the shutdown	 * procedure for this transport.	 */	if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;	pjsip_transport_shutdown(&tcp->base);	return;    }    PJ_LOG(4,(tcp->base.obj_name, 	      "TCP transport %.*s:%d is connected to %.*s:%d",	      (int)tcp->base.local_name.host.slen,	      tcp->base.local_name.host.ptr,	      tcp->base.local_name.port,	      (int)tcp->base.remote_name.host.slen,	      tcp->base.remote_name.host.ptr,	      tcp->base.remote_name.port));    /* Update (again) local address, just in case local address currently     * set is different now that the socket is connected (could happen     * on some systems, like old Win32 probably?).     */    addrlen = sizeof(pj_sockaddr_in);    if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {	pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;	if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) {	    tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr;	    tp_addr->sin_port = addr.sin_port;	    sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,				  tp_addr);	}    }    /* Start pending read */    status = tcp_start_read(tcp);    if (status != PJ_SUCCESS) {	/* We can not destroy the transport since high level objects may	 * still keep reference to this transport. So we can only 	 * instruct transport manager to gracefully start the shutdown	 * procedure for this transport.	 */	if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;	pjsip_transport_shutdown(&tcp->base);	return;    }    /* Flush all pending send operations */    tcp_flush_pending_tx(tcp);}#endif	/* PJ_HAS_TCP */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -