⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sip_transport_tcp.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
	     * we stop the accept() operation.
	     */
	    ++err_cnt;
	    if (err_cnt >= 10) {
		PJ_LOG(1, (listener->factory.obj_name, 
			   "Too many errors, listener stopping"));
	    }

	} else {
	    pj_pool_t *pool;
	    struct pending_accept *new_op;

	    if (sock == PJ_INVALID_SOCKET) {
		sock = accept_op->new_sock;
	    }

	    if (sock == PJ_INVALID_SOCKET) {
		pj_assert(!"Should not happen. status should be error");
		goto next_accept;
	    }

	    PJ_LOG(4,(listener->factory.obj_name, 
		      "TCP listener %.*s:%d: got incoming TCP connection "
		      "from %s:%d, sock=%d",
		      (int)listener->factory.addr_name.host.slen,
		      listener->factory.addr_name.host.ptr,
		      listener->factory.addr_name.port,
		      pj_inet_ntoa(accept_op->remote_addr.sin_addr),
		      pj_ntohs(accept_op->remote_addr.sin_port),
		      sock));

	    /* Create new accept_opt */
	    pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p", 
					   POOL_TP_INIT, POOL_TP_INC);
	    new_op = pj_pool_zalloc(pool, sizeof(struct pending_accept));
	    new_op->pool = pool;
	    new_op->listener = listener;
	    new_op->index = accept_op->index;
	    pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key));
	    listener->accept_op[accept_op->index] = new_op;

	    /* 
	     * Incoming connections!
	     * Create TCP transport for the new socket.
	     */
	    status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE,
				 &accept_op->local_addr, 
				 &accept_op->remote_addr, &tcp);
	    if (status == PJ_SUCCESS) {
		status = tcp_start_read(tcp);
		if (status != PJ_SUCCESS) {
		    PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
		    tcp_destroy(&tcp->base, status);
		}
	    }

	    accept_op = new_op;
	}

next_accept:
	/*
	 * Start the next asynchronous accept() operation.
	 */
	accept_op->addr_len = sizeof(pj_sockaddr_in);
	accept_op->new_sock = PJ_INVALID_SOCKET;

	status = pj_ioqueue_accept(listener->key, 
				   &accept_op->op_key,
				   &accept_op->new_sock,
				   &accept_op->local_addr,
				   &accept_op->remote_addr,
				   &accept_op->addr_len);

	/*
	 * Loop while we have immediate connection or when there is error.
	 */

    } while (status != PJ_EPENDING);
}


/* 
 * Callback from ioqueue when packet is sent.
 */
static void on_write_complete(pj_ioqueue_key_t *key, 
                              pj_ioqueue_op_key_t *op_key, 
                              pj_ssize_t bytes_sent)
{
    struct tcp_transport *tcp = pj_ioqueue_get_user_data(key);
    pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;

    tdata_op_key->tdata = NULL;

    /* Check for error/closure */
    if (bytes_sent <= 0) {
	pj_status_t status;

	PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 
		  bytes_sent));

	status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
				     -bytes_sent;
	if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
	pjsip_transport_shutdown(&tcp->base);
    }

    if (tdata_op_key->callback) {
	/*
	 * Notify sip_transport.c that packet has been sent.
	 */
	tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
    }
}


/* 
 * This callback is called by transport manager to send SIP message 
 */
static pj_status_t tcp_send_msg(pjsip_transport *transport, 
				pjsip_tx_data *tdata,
				const pj_sockaddr_t *rem_addr,
				int addr_len,
				void *token,
				void (*callback)(pjsip_transport *transport,
						 void *token, 
						 pj_ssize_t sent_bytes))
{
    struct tcp_transport *tcp = (struct tcp_transport*)transport;
    pj_ssize_t size;
    pj_bool_t delayed = PJ_FALSE;
    pj_status_t status = PJ_SUCCESS;

    /* Sanity check */
    PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);

    /* Check that there's no pending operation associated with the tdata */
    PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
    
    /* Check the address is supported */
    PJ_ASSERT_RETURN(rem_addr && addr_len==sizeof(pj_sockaddr_in), PJ_EINVAL);



    /* Init op key. */
    tdata->op_key.tdata = tdata;
    tdata->op_key.token = token;
    tdata->op_key.callback = callback;

    /* If asynchronous connect() has not completed yet, just put the
     * transmit data in the pending transmission list since we can not
     * use the socket yet.
     */
    if (tcp->has_pending_connect) {

	/*
	 * Looks like connect() is still in progress. Check again (this time
	 * with holding the lock) to be sure.
	 */
	pj_lock_acquire(tcp->base.lock);

	if (tcp->has_pending_connect) {
	    struct delayed_tdata *delayed_tdata;

	    /*
	     * connect() is still in progress. Put the transmit data to
	     * the delayed list.
	     */
	    delayed_tdata = pj_pool_alloc(tdata->pool, 
					  sizeof(*delayed_tdata));
	    delayed_tdata->tdata_op_key = &tdata->op_key;

	    pj_list_push_back(&tcp->delayed_list, delayed_tdata);
	    status = PJ_EPENDING;

	    /* Prevent pj_ioqueue_send() to be called below */
	    delayed = PJ_TRUE;
	}

	pj_lock_release(tcp->base.lock);
    } 
    
    if (!delayed) {
	/*
	 * Transport is ready to go. Send the packet to ioqueue to be
	 * sent asynchronously.
	 */
	size = tdata->buf.cur - tdata->buf.start;
	status = pj_ioqueue_send(tcp->key, 
				 (pj_ioqueue_op_key_t*)&tdata->op_key,
				 tdata->buf.start, &size, 0);

	if (status != PJ_EPENDING) {
	    /* Not pending (could be immediate success or error) */
	    tdata->op_key.tdata = NULL;

	    /* Shutdown transport on closure/errors */
	    if (size <= 0) {

		PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", 
			  size));

		if (status == PJ_SUCCESS) 
		    status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
		if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
		pjsip_transport_shutdown(&tcp->base);
	    }
	}
    }

    return status;
}


/* 
 * This callback is called by transport manager to shutdown transport.
 * This normally is only used by UDP transport.
 */
static pj_status_t tcp_shutdown(pjsip_transport *transport)
{

    PJ_UNUSED_ARG(transport);

    /* Nothing to do for TCP */
    return PJ_SUCCESS;
}


/* 
 * Callback from ioqueue that an incoming data is received from the socket.
 */
static void on_read_complete(pj_ioqueue_key_t *key, 
                             pj_ioqueue_op_key_t *op_key, 
                             pj_ssize_t bytes_read)
{
    enum { MAX_IMMEDIATE_PACKET = 10 };
    pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
    pjsip_rx_data *rdata = rdata_op_key->rdata;
    struct tcp_transport *tcp = 
	(struct tcp_transport*)rdata->tp_info.transport;
    int i;
    pj_status_t status;

    /* Don't do anything if transport is closing. */
    if (tcp->is_closing) {
	tcp->is_closing++;
	return;
    }

    /*
     * The idea of the loop is to process immediate data received by
     * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When
     * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to
     * complete asynchronously, to allow other sockets to get their data.
     */
    for (i=0;; ++i) {
	pj_uint32_t flags;

	/* Houston, we have packet! Report the packet to transport manager
	 * to be parsed.
	 */
	if (bytes_read > 0) {
	    pj_size_t size_eaten;

	    /* Init pkt_info part. */
	    rdata->pkt_info.len += bytes_read;
	    rdata->pkt_info.zero = 0;
	    pj_gettimeofday(&rdata->pkt_info.timestamp);

	    /* Report to transport manager.
	     * The transport manager will tell us how many bytes of the packet
	     * have been processed (as valid SIP message).
	     */
	    size_eaten = 
		pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, 
					   rdata);

	    pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);

	    /* Move unprocessed data to the front of the buffer */
	    if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) {
		pj_memmove(rdata->pkt_info.packet,
			   rdata->pkt_info.packet + size_eaten,
			   rdata->pkt_info.len - size_eaten);
	    }
	    
	    rdata->pkt_info.len -= size_eaten;

	} else if (bytes_read == 0) {

	    /* Transport is closed */
	    PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
	    
	    /* We can not destroy the transport since high level objects may
	     * still keep reference to this transport. So we can only 
	     * instruct transport manager to gracefully start the shutdown
	     * procedure for this transport.
	     */
	    if (tcp->close_reason==PJ_SUCCESS) 
		tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
	    pjsip_transport_shutdown(&tcp->base);

	    return;

	//} else if (bytes_read < 0)  {
	} else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
		   -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && 
		   -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) 
	{

	    /* Socket error. */
	    PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));

	    /* We can not destroy the transport since high level objects may
	     * still keep reference to this transport. So we can only 
	     * instruct transport manager to gracefully start the shutdown
	     * procedure for this transport.
	     */
	    if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read;
	    pjsip_transport_shutdown(&tcp->base);

	    return;
	}

	if (i >= MAX_IMMEDIATE_PACKET) {
	    /* Receive quota reached. Force ioqueue_recv() to 
	     * return PJ_EPENDING 
	     */
	    flags = PJ_IOQUEUE_ALWAYS_ASYNC;
	} else {
	    flags = 0;
	}

	/* Reset pool. */
	pj_pool_reset(rdata->tp_info.pool);

	/* Read next packet. */
	bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len;
	rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
	status = pj_ioqueue_recv(key, op_key, 
				 rdata->pkt_info.packet+rdata->pkt_info.len,
				 &bytes_read, flags);

	if (status == PJ_SUCCESS) {

	    /* Continue loop. */
	    pj_assert(i < MAX_IMMEDIATE_PACKET);

	} else if (status == PJ_EPENDING) {
	    break;

	} else {
	    /* Socket error */
	    PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));

	    /* We can not destroy the transport since high level objects may
	     * still keep reference to this transport. So we can only 
	     * instruct transport manager to gracefully start the shutdown
	     * procedure for this transport.
	     */
	    if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
	    pjsip_transport_shutdown(&tcp->base);

	    return;
	}
    }
}


/* 
 * Callback from ioqueue when asynchronous connect() operation completes.
 */
static void on_connect_complete(pj_ioqueue_key_t *key, 
                                pj_status_t status)
{
    struct tcp_transport *tcp;
    pj_sockaddr_in addr;
    int addrlen;

    tcp = pj_ioqueue_get_user_data(key);

    /* Mark that pending connect() operation has completed. */
    tcp->has_pending_connect = PJ_FALSE;

    /* Check connect() status */
    if (status != PJ_SUCCESS) {

	tcp_perror(tcp->base.obj_name, "TCP connect() error", status);

	/* Cancel all delayed transmits */
	while (!pj_list_empty(&tcp->delayed_list)) {
	    struct delayed_tdata *pending_tx;
	    pj_ioqueue_op_key_t *op_key;

	    pending_tx = tcp->delayed_list.next;
	    pj_list_erase(pending_tx);

	    op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;

	    on_write_complete(tcp->key, op_key, -status);
	}

	/* We can not destroy the transport since high level objects may
	 * still keep reference to this transport. So we can only 
	 * instruct transport manager to gracefully start the shutdown
	 * procedure for this transport.
	 */
	if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
	pjsip_transport_shutdown(&tcp->base);
	return;
    }

    PJ_LOG(4,(tcp->base.obj_name, 
	      "TCP transport %.*s:%d is connected to %.*s:%d",
	      (int)tcp->base.local_name.host.slen,
	      tcp->base.local_name.host.ptr,
	      tcp->base.local_name.port,
	      (int)tcp->base.remote_name.host.slen,
	      tcp->base.remote_name.host.ptr,
	      tcp->base.remote_name.port));


    /* Update (again) local address, just in case local address currently
     * set is different now that the socket is connected (could happen
     * on some systems, like old Win32 probably?).
     */
    addrlen = sizeof(pj_sockaddr_in);
    if (pj_sock_getsockname(tcp->sock, &addr, &addrlen)==PJ_SUCCESS) {
	pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;

	if (tp_addr->sin_addr.s_addr != addr.sin_addr.s_addr) {
	    tp_addr->sin_addr.s_addr = addr.sin_addr.s_addr;
	    tp_addr->sin_port = addr.sin_port;
	    sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
				  tp_addr);
	}
    }

    /* Start pending read */
    status = tcp_start_read(tcp);
    if (status != PJ_SUCCESS) {
	/* We can not destroy the transport since high level objects may
	 * still keep reference to this transport. So we can only 
	 * instruct transport manager to gracefully start the shutdown
	 * procedure for this transport.
	 */
	if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
	pjsip_transport_shutdown(&tcp->base);
	return;
    }

    /* Flush all pending send operations */
    tcp_flush_pending_tx(tcp);
}

#endif	/* PJ_HAS_TCP */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -