⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_common_abs.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
/* $Id: ioqueue_common_abs.c 1158 2007-04-06 10:25:23Z bennylp $ */
/* 
 * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 */

/*
 * ioqueue_common_abs.c
 *
 * This contains common functionalities to emulate proactor pattern with
 * various event dispatching mechanisms (e.g. select, epoll).
 *
 * This file will be included by the appropriate ioqueue implementation.
 * This file is NOT supposed to be compiled as stand-alone source.
 */

#define PENDING_RETRY	2

static void ioqueue_init( pj_ioqueue_t *ioqueue )
{
    ioqueue->lock = NULL;
    ioqueue->auto_delete_lock = 0;
}

static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
    if (ioqueue->auto_delete_lock && ioqueue->lock ) {
	pj_lock_release(ioqueue->lock);
        return pj_lock_destroy(ioqueue->lock);
    } else
        return PJ_SUCCESS;
}

/*
 * pj_ioqueue_set_lock()
 */
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, 
					 pj_lock_t *lock,
					 pj_bool_t auto_delete )
{
    PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);

    if (ioqueue->auto_delete_lock && ioqueue->lock) {
        pj_lock_destroy(ioqueue->lock);
    }

    ioqueue->lock = lock;
    ioqueue->auto_delete_lock = auto_delete;

    return PJ_SUCCESS;
}

static pj_status_t ioqueue_init_key( pj_pool_t *pool,
                                     pj_ioqueue_t *ioqueue,
                                     pj_ioqueue_key_t *key,
                                     pj_sock_t sock,
                                     void *user_data,
                                     const pj_ioqueue_callback *cb)
{
    pj_status_t rc;
    int optlen;

    PJ_UNUSED_ARG(pool);

    key->ioqueue = ioqueue;
    key->fd = sock;
    key->user_data = user_data;
    pj_list_init(&key->read_list);
    pj_list_init(&key->write_list);
#if PJ_HAS_TCP
    pj_list_init(&key->accept_list);
    key->connecting = 0;
#endif

    /* Save callback. */
    pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Set initial reference count to 1 */
    pj_assert(key->ref_count == 0);
    ++key->ref_count;

    key->closing = 0;
#endif

    /* Get socket type. When socket type is datagram, some optimization
     * will be performed during send to allow parallel send operations.
     */
    optlen = sizeof(key->fd_type);
    rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
                            &key->fd_type, &optlen);
    if (rc != PJ_SUCCESS)
        key->fd_type = PJ_SOCK_STREAM;

    /* Create mutex for the key. */
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
    rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
#endif
    
    return rc;
}

/*
 * pj_ioqueue_get_user_data()
 *
 * Obtain value associated with a key.
 */
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
    PJ_ASSERT_RETURN(key != NULL, NULL);
    return key->user_data;
}

/*
 * pj_ioqueue_set_user_data()
 */
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
                                              void *user_data,
                                              void **old_data)
{
    PJ_ASSERT_RETURN(key, PJ_EINVAL);

    if (old_data)
        *old_data = key->user_data;
    key->user_data = user_data;

    return PJ_SUCCESS;
}

PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
{
    return !pj_list_empty(&key->write_list);
}

PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
{
    return !pj_list_empty(&key->read_list);
}

PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
{
#if PJ_HAS_TCP
    return !pj_list_empty(&key->accept_list);
#else
    PJ_UNUSED_ARG(key);
    return 0;
#endif
}

PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
{
    return key->connecting;
}


#if PJ_IOQUEUE_HAS_SAFE_UNREG
#   define IS_CLOSING(key)  (key->closing)
#else
#   define IS_CLOSING(key)  (0)
#endif


/*
 * ioqueue_dispatch_event()
 *
 * Report occurence of an event in the key to be processed by the
 * framework.
 */
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
    /* Lock the key. */
    pj_mutex_lock(h->mutex);

    if (IS_CLOSING(h)) {
	pj_mutex_unlock(h->mutex);
	return;
    }

#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
    if (h->connecting) {
	/* Completion of connect() operation */
	pj_ssize_t bytes_transfered;

	/* Clear operation. */
	h->connecting = 0;

        ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
        ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);


#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
	/* from connect(2): 
	 * On Linux, use getsockopt to read the SO_ERROR option at
	 * level SOL_SOCKET to determine whether connect() completed
	 * successfully (if SO_ERROR is zero).
	 */
	{
	  int value;
	  int vallen = sizeof(value);
	  int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 
					 &value, &vallen);
	  if (gs_rc != 0) {
	    /* Argh!! What to do now??? 
	     * Just indicate that the socket is connected. The
	     * application will get error as soon as it tries to use
	     * the socket to send/receive.
	     */
	    bytes_transfered = 0;
	  } else {
            bytes_transfered = value;
	  }
 	}
#elif defined(PJ_WIN32) && PJ_WIN32!=0
	bytes_transfered = 0; /* success */
#else
	/* Excellent information in D.J. Bernstein page:
	 * http://cr.yp.to/docs/connect.html
	 *
	 * Seems like the most portable way of detecting connect()
	 * failure is to call getpeername(). If socket is connected,
	 * getpeername() will return 0. If the socket is not connected,
	 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
	 * the right errno through error slippage. This is a combination
	 * of suggestions from Douglas C. Schmidt and Ken Keys.
	 */
	{
	    int gp_rc;
	    struct sockaddr_in addr;
	    socklen_t addrlen = sizeof(addr);

	    gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
	    bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
	}
#endif

        /* Unlock; from this point we don't need to hold key's mutex. */
        pj_mutex_unlock(h->mutex);

	/* Call callback. */
        if (h->cb.on_connect_complete && !IS_CLOSING(h))
	    (*h->cb.on_connect_complete)(h, bytes_transfered);

        /* Done. */

    } else 
#endif /* PJ_HAS_TCP */
    if (key_has_pending_write(h)) {
	/* Socket is writable. */
        struct write_operation *write_op;
        pj_ssize_t sent;
        pj_status_t send_rc;

        /* Get the first in the queue. */
        write_op = h->write_list.next;

        /* For datagrams, we can remove the write_op from the list
         * so that send() can work in parallel.
         */
        if (h->fd_type == PJ_SOCK_DGRAM) {
            pj_list_erase(write_op);

            if (pj_list_empty(&h->write_list))
                ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);

        }

        /* Send the data. 
         * Unfortunately we must do this while holding key's mutex, thus
         * preventing parallel write on a single key.. :-((
         */
        sent = write_op->size - write_op->written;
        if (write_op->op == PJ_IOQUEUE_OP_SEND) {
            send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
                                   &sent, write_op->flags);
	    /* Can't do this. We only clear "op" after we're finished sending
	     * the whole buffer.
	     */
	    //write_op->op = 0;
        } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
            send_rc = pj_sock_sendto(h->fd, 
                                     write_op->buf+write_op->written,
                                     &sent, write_op->flags,
                                     &write_op->rmt_addr, 
                                     write_op->rmt_addrlen);
	    /* Can't do this. We only clear "op" after we're finished sending
	     * the whole buffer.
	     */
	    //write_op->op = 0;
        } else {
            pj_assert(!"Invalid operation type!");
	    write_op->op = 0;
            send_rc = PJ_EBUG;
        }

        if (send_rc == PJ_SUCCESS) {
            write_op->written += sent;
        } else {
            pj_assert(send_rc > 0);
            write_op->written = -send_rc;
        }

        /* Are we finished with this buffer? */
        if (send_rc!=PJ_SUCCESS || 
            write_op->written == (pj_ssize_t)write_op->size ||
            h->fd_type == PJ_SOCK_DGRAM) 
        {

	    write_op->op = 0;

            if (h->fd_type != PJ_SOCK_DGRAM) {
                /* Write completion of the whole stream. */
                pj_list_erase(write_op);

                /* Clear operation if there's no more data to send. */
                if (pj_list_empty(&h->write_list))
                    ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);

            }

            /* No need to hold mutex anymore */
            pj_mutex_unlock(h->mutex);

	    /* Call callback. */
            if (h->cb.on_write_complete && !IS_CLOSING(h)) {
	        (*h->cb.on_write_complete)(h, 
                                           (pj_ioqueue_op_key_t*)write_op,
                                           write_op->written);
            }

        } else {
            pj_mutex_unlock(h->mutex);
        }

        /* Done. */
    } else {
        /*
         * This is normal; execution may fall here when multiple threads
         * are signalled for the same event, but only one thread eventually
         * able to process the event.
         */
        pj_mutex_unlock(h->mutex);
    }
}

void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
{
    pj_status_t rc;

    /* Lock the key. */
    pj_mutex_lock(h->mutex);

    if (IS_CLOSING(h)) {
	pj_mutex_unlock(h->mutex);
	return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -