⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sip_transport_tcp.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
/* Called by transport manager to destroy transport */
static pj_status_t tcp_destroy_transport(pjsip_transport *transport);

/* Utility to destroy transport */
static pj_status_t tcp_destroy(pjsip_transport *transport,
			       pj_status_t reason);

/* Callback from ioqueue on incoming packet */
static void on_read_complete(pj_ioqueue_key_t *key, 
                             pj_ioqueue_op_key_t *op_key, 
                             pj_ssize_t bytes_read);

/* Callback from ioqueue when packet is sent */
static void on_write_complete(pj_ioqueue_key_t *key, 
                              pj_ioqueue_op_key_t *op_key, 
                              pj_ssize_t bytes_sent);

/* Callback from ioqueue when connect completes */
static void on_connect_complete(pj_ioqueue_key_t *key, 
                                pj_status_t status);


/*
 * Common function to create TCP transport, called when pending accept() and
 * pending connect() complete.
 */
static pj_status_t tcp_create( struct tcp_listener *listener,
			       pj_pool_t *pool,
			       pj_sock_t sock, pj_bool_t is_server,
			       const pj_sockaddr_in *local,
			       const pj_sockaddr_in *remote,
			       struct tcp_transport **p_tcp)
{
    struct tcp_transport *tcp;
    pj_ioqueue_t *ioqueue;
    pj_ioqueue_callback tcp_callback;
    pj_status_t status;
    

    PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);


    if (pool == NULL) {
	pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
				       POOL_TP_INIT, POOL_TP_INC);
	PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
    }    

    /*
     * Create and initialize basic transport structure.
     */
    tcp = pj_pool_zalloc(pool, sizeof(*tcp));
    tcp->sock = sock;
    tcp->is_server = is_server;
    tcp->listener = listener;
    pj_list_init(&tcp->delayed_list);
    tcp->base.pool = pool;

    pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, 
		     (is_server ? "tcps%p" :"tcpc%p"), tcp);

    status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
    if (status != PJ_SUCCESS) {
	goto on_error;
    }

    status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
    if (status != PJ_SUCCESS) {
	goto on_error;
    }

    tcp->base.key.type = PJSIP_TRANSPORT_TCP;
    pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in));
    tcp->base.type_name = "tcp";
    tcp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);

    tcp->base.info = pj_pool_alloc(pool, 64);
    pj_ansi_snprintf(tcp->base.info, 64, "TCP to %s:%d",
		     pj_inet_ntoa(remote->sin_addr), 
		     (int)pj_ntohs(remote->sin_port));

    tcp->base.addr_len = sizeof(pj_sockaddr_in);
    pj_memcpy(&tcp->base.local_addr, local, sizeof(pj_sockaddr_in));
    sockaddr_to_host_port(pool, &tcp->base.local_name, local);
    sockaddr_to_host_port(pool, &tcp->base.remote_name, remote);

    tcp->base.endpt = listener->endpt;
    tcp->base.tpmgr = listener->tpmgr;
    tcp->base.send_msg = &tcp_send_msg;
    tcp->base.do_shutdown = &tcp_shutdown;
    tcp->base.destroy = &tcp_destroy_transport;


    /* Register socket to ioqueue */
    pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback));
    tcp_callback.on_read_complete = &on_read_complete;
    tcp_callback.on_write_complete = &on_write_complete;
    tcp_callback.on_connect_complete = &on_connect_complete;

    ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
    status = pj_ioqueue_register_sock(pool, ioqueue, sock, 
				      tcp, &tcp_callback, &tcp->key);
    if (status != PJ_SUCCESS) {
	goto on_error;
    }

    /* Register transport to transport manager */
    status = pjsip_transport_register(listener->tpmgr, &tcp->base);
    if (status != PJ_SUCCESS) {
	goto on_error;
    }

    tcp->is_registered = PJ_TRUE;

    /* Done setting up basic transport. */
    *p_tcp = tcp;

    PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
	      (tcp->is_server ? "server" : "client")));

    return PJ_SUCCESS;

on_error:
    tcp_destroy(&tcp->base, status);
    return status;
}


/* Flush all delayed transmision once the socket is connected. */
static void tcp_flush_pending_tx(struct tcp_transport *tcp)
{
    pj_lock_acquire(tcp->base.lock);
    while (!pj_list_empty(&tcp->delayed_list)) {
	struct delayed_tdata *pending_tx;
	pjsip_tx_data *tdata;
	pj_ioqueue_op_key_t *op_key;
	pj_ssize_t size;
	pj_status_t status;

	pending_tx = tcp->delayed_list.next;
	pj_list_erase(pending_tx);

	tdata = pending_tx->tdata_op_key->tdata;
	op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;

	/* send to ioqueue! */
	size = tdata->buf.cur - tdata->buf.start;
	status = pj_ioqueue_send(tcp->key, op_key,
				 tdata->buf.start, &size, 0);

	if (status != PJ_EPENDING) {
	    on_write_complete(tcp->key, op_key, size);
	}

    }
    pj_lock_release(tcp->base.lock);
}


/* Called by transport manager to destroy transport */
static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
{
    struct tcp_transport *tcp = (struct tcp_transport*)transport;

    /* Transport would have been unregistered by now since this callback
     * is called by transport manager.
     */
    tcp->is_registered = PJ_FALSE;

    return tcp_destroy(transport, tcp->close_reason);
}


/* Destroy TCP transport */
static pj_status_t tcp_destroy(pjsip_transport *transport, 
			       pj_status_t reason)
{
    struct tcp_transport *tcp = (struct tcp_transport*)transport;

    if (tcp->close_reason == 0)
	tcp->close_reason = reason;

    if (tcp->is_registered) {
	tcp->is_registered = PJ_FALSE;
	pjsip_transport_destroy(transport);

	/* pjsip_transport_destroy will recursively call this function
	 * again.
	 */
	return PJ_SUCCESS;
    }

    /* Mark transport as closing */
    tcp->is_closing = PJ_TRUE;

    /* Cancel all delayed transmits */
    while (!pj_list_empty(&tcp->delayed_list)) {
	struct delayed_tdata *pending_tx;
	pj_ioqueue_op_key_t *op_key;

	pending_tx = tcp->delayed_list.next;
	pj_list_erase(pending_tx);

	op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;

	on_write_complete(tcp->key, op_key, -reason);
    }

    if (tcp->rdata.tp_info.pool) {
	pj_pool_release(tcp->rdata.tp_info.pool);
	tcp->rdata.tp_info.pool = NULL;
    }

    if (tcp->key) {
	pj_ioqueue_unregister(tcp->key);
	tcp->key = NULL;
	tcp->sock = PJ_INVALID_SOCKET;
    }

    if (tcp->sock != PJ_INVALID_SOCKET) {
	pj_sock_close(tcp->sock);
	tcp->sock = PJ_INVALID_SOCKET;
    }

    if (tcp->base.lock) {
	pj_lock_destroy(tcp->base.lock);
	tcp->base.lock = NULL;
    }

    if (tcp->base.ref_cnt) {
	pj_atomic_destroy(tcp->base.ref_cnt);
	tcp->base.ref_cnt = NULL;
    }

    if (tcp->base.pool) {
	pj_pool_t *pool;

	if (reason != PJ_SUCCESS) {
	    char errmsg[PJ_ERR_MSG_SIZE];

	    pj_strerror(reason, errmsg, sizeof(errmsg));
	    PJ_LOG(4,(tcp->base.obj_name, 
		      "TCP transport destroyed with reason %d: %s", 
		      reason, errmsg));

	} else {

	    PJ_LOG(4,(tcp->base.obj_name, 
		      "TCP transport destroyed normally"));

	}

	pool = tcp->base.pool;
	tcp->base.pool = NULL;
	pj_pool_release(pool);
    }

    return PJ_SUCCESS;
}


/*
 * This utility function creates receive data buffers and start
 * asynchronous recv() operations from the socket. It is called after
 * accept() or connect() operation complete.
 */
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
{
    pj_pool_t *pool;
    pj_ssize_t size;
    pj_sockaddr_in *rem_addr;
    pj_status_t status;

    /* Init rdata */
    pool = pjsip_endpt_create_pool(tcp->listener->endpt,
				   "rtd%p",
				   PJSIP_POOL_RDATA_LEN,
				   PJSIP_POOL_RDATA_INC);
    if (!pool) {
	tcp_perror(tcp->base.obj_name, "Unable to create pool", PJ_ENOMEM);
	return PJ_ENOMEM;
    }

    tcp->rdata.tp_info.pool = pool;

    tcp->rdata.tp_info.transport = &tcp->base;
    tcp->rdata.tp_info.tp_data = tcp;
    tcp->rdata.tp_info.op_key.rdata = &tcp->rdata;
    pj_ioqueue_op_key_init(&tcp->rdata.tp_info.op_key.op_key, 
			   sizeof(pj_ioqueue_op_key_t));

    tcp->rdata.pkt_info.src_addr = tcp->base.key.rem_addr;
    tcp->rdata.pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
    rem_addr = (pj_sockaddr_in*) &tcp->base.key.rem_addr;
    pj_ansi_strcpy(tcp->rdata.pkt_info.src_name,
		   pj_inet_ntoa(rem_addr->sin_addr));
    tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port);

    size = sizeof(tcp->rdata.pkt_info.packet);
    status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key,
			     tcp->rdata.pkt_info.packet, &size,
			     PJ_IOQUEUE_ALWAYS_ASYNC);
    if (status != PJ_SUCCESS && status != PJ_EPENDING) {
	PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d", 
		   status));
	return status;
    }

    return PJ_SUCCESS;
}


/* This callback is called by transport manager for the TCP factory
 * to create outgoing transport to the specified destination.
 */
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
					pjsip_tpmgr *mgr,
					pjsip_endpoint *endpt,
					const pj_sockaddr *rem_addr,
					int addr_len,
					pjsip_transport **p_transport)
{
    struct tcp_listener *listener;
    struct tcp_transport *tcp;
    pj_sock_t sock;
    pj_sockaddr_in local_addr;
    pj_status_t status;

    /* Sanity checks */
    PJ_ASSERT_RETURN(factory && mgr && endpt && rem_addr &&
		     addr_len && p_transport, PJ_EINVAL);

    /* Check that address is a sockaddr_in */
    PJ_ASSERT_RETURN(rem_addr->sa_family == PJ_AF_INET &&
		     addr_len == sizeof(pj_sockaddr_in), PJ_EINVAL);


    listener = (struct tcp_listener*)factory;

    
    /* Create socket */
    status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock);
    if (status != PJ_SUCCESS)
	return status;

    /* Bind to any port */
    status = pj_sock_bind_in(sock, 0, 0);
    if (status != PJ_SUCCESS) {
	pj_sock_close(sock);
	return status;
    }

    /* Get the local port */
    addr_len = sizeof(pj_sockaddr_in);
    status = pj_sock_getsockname(sock, &local_addr, &addr_len);
    if (status != PJ_SUCCESS) {
	pj_sock_close(sock);
	return status;
    }

    /* Initially set the address from the listener's address */
    local_addr.sin_addr.s_addr = 
	((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr;

    /* Create the transport descriptor */
    status = tcp_create(listener, NULL, sock, PJ_FALSE, &local_addr, 
			(pj_sockaddr_in*)rem_addr, &tcp);
    if (status != PJ_SUCCESS)
	return status;


    /* Start asynchronous connect() operation */
    tcp->has_pending_connect = PJ_TRUE;
    status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in));
    if (status == PJ_SUCCESS) {
	tcp->has_pending_connect = PJ_FALSE;
    } else if (status != PJ_EPENDING) {
	tcp_destroy(&tcp->base, status);
	return status;
    }

    /* Update (again) local address, just in case local address currently
     * set is different now that asynchronous connect() is started.
     */
    addr_len = sizeof(pj_sockaddr_in);
    if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) {
	pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;

	/* Some systems (like old Win32 perhaps) may not set local address
	 * properly before socket is fully connected.
	 */
	if (tp_addr->sin_addr.s_addr != local_addr.sin_addr.s_addr &&
	    local_addr.sin_addr.s_addr != 0) 
	{
	    tp_addr->sin_addr.s_addr = local_addr.sin_addr.s_addr;
	    tp_addr->sin_port = local_addr.sin_port;
	    sockaddr_to_host_port(tcp->base.pool, &tcp->base.local_name,
				  &local_addr);
	}
    }

    if (tcp->has_pending_connect) {
	PJ_LOG(4,(tcp->base.obj_name, 
		  "TCP transport %.*s:%d is connecting to %.*s:%d...",
		  (int)tcp->base.local_name.host.slen,
		  tcp->base.local_name.host.ptr,
		  tcp->base.local_name.port,
		  (int)tcp->base.remote_name.host.slen,
		  tcp->base.remote_name.host.ptr,
		  tcp->base.remote_name.port));
    }

    /* Done */
    *p_transport = &tcp->base;

    return PJ_SUCCESS;
}


/*
 * This callback is called by ioqueue when pending accept() operation has
 * completed.
 */
static void on_accept_complete(	pj_ioqueue_key_t *key, 
				pj_ioqueue_op_key_t *op_key, 
				pj_sock_t sock, 
				pj_status_t status)
{
    struct tcp_listener *listener;
    struct tcp_transport *tcp;
    struct pending_accept *accept_op;
    int err_cnt = 0;

    listener = pj_ioqueue_get_user_data(key);
    accept_op = (struct pending_accept*) op_key;

    /*
     * Loop while there is immediate connection or when there is error.
     */
    do {
	if (status == PJ_EPENDING) {
	    /*
	     * This can only happen when this function is called during
	     * initialization to kick off asynchronous accept().
	     */

	} else if (status != PJ_SUCCESS) {

	    /*
	     * Error in accept().
	     */
	    tcp_perror(listener->factory.obj_name, "Error in accept()", 
		       status);

	    /*
	     * Prevent endless accept() error loop by limiting the
	     * number of consecutive errors. Once the number of errors
	     * is equal to maximum, we treat this as permanent error, and

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -