⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_select.c

📁 一个开源的sip源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
    ++key->ref_count;
    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
}

/* Decrement the key's reference counter, and when the counter reach zero,
 * destroy the key.
 *
 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
 */
static void decrement_counter(pj_ioqueue_key_t *key)
{
    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
    --key->ref_count;
    if (key->ref_count == 0) {

	pj_assert(key->closing == 1);
	pj_gettimeofday(&key->free_time);
	key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
	pj_time_val_normalize(&key->free_time);

	pj_lock_acquire(key->ioqueue->lock);
	pj_list_erase(key);
	pj_list_push_back(&key->ioqueue->closing_list, key);
	/* Rescan fdset to get max descriptor */
	rescan_fdset(key->ioqueue);
	pj_lock_release(key->ioqueue->lock);
    }
    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
}
#endif


/*
 * pj_ioqueue_unregister()
 *
 * Unregister handle from ioqueue.
 */
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
{
    pj_ioqueue_t *ioqueue;

    PJ_ASSERT_RETURN(key, PJ_EINVAL);

    ioqueue = key->ioqueue;

    /* Lock the key to make sure no callback is simultaneously modifying
     * the key. We need to lock the key before ioqueue here to prevent
     * deadlock.
     */
    pj_mutex_lock(key->mutex);

    /* Also lock ioqueue */
    pj_lock_acquire(ioqueue->lock);

    pj_assert(ioqueue->count > 0);
    --ioqueue->count;
    pj_list_erase(key);
    PJ_FD_CLR(key->fd, &ioqueue->rfdset);
    PJ_FD_CLR(key->fd, &ioqueue->wfdset);
#if PJ_HAS_TCP
    PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif

    /* Close socket. */
    pj_sock_close(key->fd);

    /* Clear callback */
    key->cb.on_accept_complete = NULL;
    key->cb.on_connect_complete = NULL;
    key->cb.on_read_complete = NULL;
    key->cb.on_write_complete = NULL;

    /* Must release ioqueue lock first before decrementing counter, to
     * prevent deadlock.
     */
    pj_lock_release(ioqueue->lock);

#if PJ_IOQUEUE_HAS_SAFE_UNREG
    /* Mark key is closing. */
    key->closing = 1;

    /* Decrement counter. */
    decrement_counter(key);

    /* Done. */
    pj_mutex_unlock(key->mutex);
#else
    pj_mutex_destroy(key->mutex);
#endif

    return PJ_SUCCESS;
}


/* This supposed to check whether the fd_set values are consistent
 * with the operation currently set in each key.
 */
#if VALIDATE_FD_SET
static void validate_sets(const pj_ioqueue_t *ioqueue,
			  const pj_fd_set_t *rfdset,
			  const pj_fd_set_t *wfdset,
			  const pj_fd_set_t *xfdset)
{
    pj_ioqueue_key_t *key;

    /*
     * This basicly would not work anymore.
     * We need to lock key before performing the check, but we can't do
     * so because we're holding ioqueue mutex. If we acquire key's mutex
     * now, the will cause deadlock.
     */
    pj_assert(0);

    key = ioqueue->active_list.next;
    while (key != &ioqueue->active_list) {
	if (!pj_list_empty(&key->read_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
	    || !pj_list_empty(&key->accept_list)
#endif
	    ) 
	{
	    pj_assert(PJ_FD_ISSET(key->fd, rfdset));
	} 
	else {
	    pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
	}
	if (!pj_list_empty(&key->write_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
	    || key->connecting
#endif
	   )
	{
	    pj_assert(PJ_FD_ISSET(key->fd, wfdset));
	}
	else {
	    pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
	}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
	if (key->connecting)
	{
	    pj_assert(PJ_FD_ISSET(key->fd, xfdset));
	}
	else {
	    pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
	}
#endif /* PJ_HAS_TCP */

	key = key->next;
    }
}
#endif	/* VALIDATE_FD_SET */


/* ioqueue_remove_from_set()
 * This function is called from ioqueue_dispatch_event() to instruct
 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
 * set for the specified event.
 */
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
                                     pj_ioqueue_key_t *key, 
                                     enum ioqueue_event_type event_type)
{
    pj_lock_acquire(ioqueue->lock);

    if (event_type == READABLE_EVENT)
        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
    else if (event_type == WRITEABLE_EVENT)
        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
    else if (event_type == EXCEPTION_EVENT)
        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
#endif
    else
        pj_assert(0);

    pj_lock_release(ioqueue->lock);
}

/*
 * ioqueue_add_to_set()
 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
 * set for the specified event.
 */
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
                                pj_ioqueue_key_t *key,
                                enum ioqueue_event_type event_type )
{
    pj_lock_acquire(ioqueue->lock);

    if (event_type == READABLE_EVENT)
        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
    else if (event_type == WRITEABLE_EVENT)
        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
    else if (event_type == EXCEPTION_EVENT)
        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
#endif
    else
        pj_assert(0);

    pj_lock_release(ioqueue->lock);
}

#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan closing keys to be put to free list again */
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
{
    pj_time_val now;
    pj_ioqueue_key_t *h;

    pj_gettimeofday(&now);
    h = ioqueue->closing_list.next;
    while (h != &ioqueue->closing_list) {
	pj_ioqueue_key_t *next = h->next;

	pj_assert(h->closing != 0);

	if (PJ_TIME_VAL_GTE(now, h->free_time)) {
	    pj_list_erase(h);
	    pj_list_push_back(&ioqueue->free_list, h);
	}
	h = next;
    }
}
#endif


/*
 * pj_ioqueue_poll()
 *
 * Few things worth written:
 *
 *  - we used to do only one callback called per poll, but it didn't go
 *    very well. The reason is because on some situation, the write 
 *    callback gets called all the time, thus doesn't give the read
 *    callback to get called. This happens, for example, when user
 *    submit write operation inside the write callback.
 *    As the result, we changed the behaviour so that now multiple
 *    callbacks are called in a single poll. It should be fast too,
 *    just that we need to be carefull with the ioqueue data structs.
 *
 *  - to guarantee preemptiveness etc, the poll function must strictly
 *    work on fd_set copy of the ioqueue (not the original one).
 */
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
    pj_fd_set_t rfdset, wfdset, xfdset;
    int count, counter;
    pj_ioqueue_key_t *h;
    struct event
    {
        pj_ioqueue_key_t	*key;
        enum ioqueue_event_type  event_type;
    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];

    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);

    /* Lock ioqueue before making fd_set copies */
    pj_lock_acquire(ioqueue->lock);

    /* We will only do select() when there are sockets to be polled.
     * Otherwise select() will return error.
     */
    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
        PJ_FD_COUNT(&ioqueue->wfdset)==0 
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
        && PJ_FD_COUNT(&ioqueue->xfdset)==0
#endif
	)
    {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
	scan_closing_keys(ioqueue);
#endif
	pj_lock_release(ioqueue->lock);
	TRACE__((THIS_FILE, "     poll: no fd is set"));
        if (timeout)
            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
        return 0;
    }

    /* Copy ioqueue's pj_fd_set_t to local variables. */
    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
#if PJ_HAS_TCP
    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
#else
    PJ_FD_ZERO(&xfdset);
#endif

#if VALIDATE_FD_SET
    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
#endif

    /* Unlock ioqueue before select(). */
    pj_lock_release(ioqueue->lock);

    count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset, 
			   timeout);
    
    if (count <= 0)
	return -pj_get_netos_error();
    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;

    /* Scan descriptor sets for event and add the events in the event
     * array to be processed later in this function. We do this so that
     * events can be processed in parallel without holding ioqueue lock.
     */
    pj_lock_acquire(ioqueue->lock);

    counter = 0;

    /* Scan for writable sockets first to handle piggy-back data
     * coming with accept().
     */
    h = ioqueue->active_list.next;
    for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {

	if ( (key_has_pending_write(h) || key_has_pending_connect(h))
	     && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
        {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
	    increment_counter(h);
#endif
            event[counter].key = h;
            event[counter].event_type = WRITEABLE_EVENT;
            ++counter;
        }

        /* Scan for readable socket. */
	if ((key_has_pending_read(h) || key_has_pending_accept(h))
            && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h))
        {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
	    increment_counter(h);
#endif
            event[counter].key = h;
            event[counter].event_type = READABLE_EVENT;
            ++counter;
	}

#if PJ_HAS_TCP
        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
	    !IS_CLOSING(h)) 
	{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
	    increment_counter(h);
#endif
            event[counter].key = h;
            event[counter].event_type = EXCEPTION_EVENT;
            ++counter;
        }
#endif
    }

    pj_lock_release(ioqueue->lock);

    count = counter;

    /* Now process all events. The dispatch functions will take care
     * of locking in each of the key
     */
    for (counter=0; counter<count; ++counter) {
        switch (event[counter].event_type) {
        case READABLE_EVENT:
            ioqueue_dispatch_read_event(ioqueue, event[counter].key);
            break;
        case WRITEABLE_EVENT:
            ioqueue_dispatch_write_event(ioqueue, event[counter].key);
            break;
        case EXCEPTION_EVENT:
            ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
            break;
        case NO_EVENT:
            pj_assert(!"Invalid event!");
            break;
        }

#if PJ_IOQUEUE_HAS_SAFE_UNREG
	decrement_counter(event[counter].key);
#endif
    }


    return count;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -