⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_select.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 2 页
字号:
    ++key->ref_count;    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);}/* Decrement the key's reference counter, and when the counter reach zero, * destroy the key. * * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. */static void decrement_counter(pj_ioqueue_key_t *key){    pj_mutex_lock(key->ioqueue->ref_cnt_mutex);    --key->ref_count;    if (key->ref_count == 0) {	pj_assert(key->closing == 1);	pj_gettimeofday(&key->free_time);	key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;	pj_time_val_normalize(&key->free_time);	pj_lock_acquire(key->ioqueue->lock);	pj_list_erase(key);	pj_list_push_back(&key->ioqueue->closing_list, key);	/* Rescan fdset to get max descriptor */	rescan_fdset(key->ioqueue);	pj_lock_release(key->ioqueue->lock);    }    pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);}#endif/* * pj_ioqueue_unregister() * * Unregister handle from ioqueue. */PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key){    pj_ioqueue_t *ioqueue;    PJ_ASSERT_RETURN(key, PJ_EINVAL);    ioqueue = key->ioqueue;    /* Lock the key to make sure no callback is simultaneously modifying     * the key. We need to lock the key before ioqueue here to prevent     * deadlock.     */    pj_mutex_lock(key->mutex);    /* Also lock ioqueue */    pj_lock_acquire(ioqueue->lock);    pj_assert(ioqueue->count > 0);    --ioqueue->count;    pj_list_erase(key);    PJ_FD_CLR(key->fd, &ioqueue->rfdset);    PJ_FD_CLR(key->fd, &ioqueue->wfdset);#if PJ_HAS_TCP    PJ_FD_CLR(key->fd, &ioqueue->xfdset);#endif    /* Close socket. */    pj_sock_close(key->fd);    /* Clear callback */    key->cb.on_accept_complete = NULL;    key->cb.on_connect_complete = NULL;    key->cb.on_read_complete = NULL;    key->cb.on_write_complete = NULL;    /* Must release ioqueue lock first before decrementing counter, to     * prevent deadlock.     */    pj_lock_release(ioqueue->lock);#if PJ_IOQUEUE_HAS_SAFE_UNREG    /* Mark key is closing. */    key->closing = 1;    /* Decrement counter. */    decrement_counter(key);    /* Done. */    pj_mutex_unlock(key->mutex);#else    pj_mutex_destroy(key->mutex);#endif    return PJ_SUCCESS;}/* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */#if VALIDATE_FD_SETstatic void validate_sets(const pj_ioqueue_t *ioqueue,			  const pj_fd_set_t *rfdset,			  const pj_fd_set_t *wfdset,			  const pj_fd_set_t *xfdset){    pj_ioqueue_key_t *key;    /*     * This basicly would not work anymore.     * We need to lock key before performing the check, but we can't do     * so because we're holding ioqueue mutex. If we acquire key's mutex     * now, the will cause deadlock.     */    pj_assert(0);    key = ioqueue->active_list.next;    while (key != &ioqueue->active_list) {	if (!pj_list_empty(&key->read_list)#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0	    || !pj_list_empty(&key->accept_list)#endif	    ) 	{	    pj_assert(PJ_FD_ISSET(key->fd, rfdset));	} 	else {	    pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);	}	if (!pj_list_empty(&key->write_list)#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0	    || key->connecting#endif	   )	{	    pj_assert(PJ_FD_ISSET(key->fd, wfdset));	}	else {	    pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);	}#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0	if (key->connecting)	{	    pj_assert(PJ_FD_ISSET(key->fd, xfdset));	}	else {	    pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);	}#endif /* PJ_HAS_TCP */	key = key->next;    }}#endif	/* VALIDATE_FD_SET *//* ioqueue_remove_from_set() * This function is called from ioqueue_dispatch_event() to instruct * the ioqueue to remove the specified descriptor from ioqueue's descriptor * set for the specified event. */static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,                                     pj_ioqueue_key_t *key,                                      enum ioqueue_event_type event_type){    pj_lock_acquire(ioqueue->lock);    if (event_type == READABLE_EVENT)        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);    else if (event_type == WRITEABLE_EVENT)        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0    else if (event_type == EXCEPTION_EVENT)        PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);#endif    else        pj_assert(0);    pj_lock_release(ioqueue->lock);}/* * ioqueue_add_to_set() * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc * to instruct the ioqueue to add the specified handle to ioqueue's descriptor * set for the specified event. */static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,                                pj_ioqueue_key_t *key,                                enum ioqueue_event_type event_type ){    pj_lock_acquire(ioqueue->lock);    if (event_type == READABLE_EVENT)        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);    else if (event_type == WRITEABLE_EVENT)        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0    else if (event_type == EXCEPTION_EVENT)        PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);#endif    else        pj_assert(0);    pj_lock_release(ioqueue->lock);}#if PJ_IOQUEUE_HAS_SAFE_UNREG/* Scan closing keys to be put to free list again */static void scan_closing_keys(pj_ioqueue_t *ioqueue){    pj_time_val now;    pj_ioqueue_key_t *h;    pj_gettimeofday(&now);    h = ioqueue->closing_list.next;    while (h != &ioqueue->closing_list) {	pj_ioqueue_key_t *next = h->next;	pj_assert(h->closing != 0);	if (PJ_TIME_VAL_GTE(now, h->free_time)) {	    pj_list_erase(h);	    pj_list_push_back(&ioqueue->free_list, h);	}	h = next;    }}#endif/* * pj_ioqueue_poll() * * Few things worth written: * *  - we used to do only one callback called per poll, but it didn't go *    very well. The reason is because on some situation, the write  *    callback gets called all the time, thus doesn't give the read *    callback to get called. This happens, for example, when user *    submit write operation inside the write callback. *    As the result, we changed the behaviour so that now multiple *    callbacks are called in a single poll. It should be fast too, *    just that we need to be carefull with the ioqueue data structs. * *  - to guarantee preemptiveness etc, the poll function must strictly *    work on fd_set copy of the ioqueue (not the original one). */PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout){    pj_fd_set_t rfdset, wfdset, xfdset;    int count, counter;    pj_ioqueue_key_t *h;    struct event    {        pj_ioqueue_key_t	*key;        enum ioqueue_event_type  event_type;    } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];    PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);    /* Lock ioqueue before making fd_set copies */    pj_lock_acquire(ioqueue->lock);    /* We will only do select() when there are sockets to be polled.     * Otherwise select() will return error.     */    if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&        PJ_FD_COUNT(&ioqueue->wfdset)==0 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0        && PJ_FD_COUNT(&ioqueue->xfdset)==0#endif	)    {#if PJ_IOQUEUE_HAS_SAFE_UNREG	scan_closing_keys(ioqueue);#endif	pj_lock_release(ioqueue->lock);	TRACE__((THIS_FILE, "     poll: no fd is set"));        if (timeout)            pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));        return 0;    }    /* Copy ioqueue's pj_fd_set_t to local variables. */    pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));    pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));#if PJ_HAS_TCP    pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));#else    PJ_FD_ZERO(&xfdset);#endif#if VALIDATE_FD_SET    validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);#endif    /* Unlock ioqueue before select(). */    pj_lock_release(ioqueue->lock);    count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset, 			   timeout);        if (count <= 0)	return -pj_get_netos_error();    else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)        count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;    /* Scan descriptor sets for event and add the events in the event     * array to be processed later in this function. We do this so that     * events can be processed in parallel without holding ioqueue lock.     */    pj_lock_acquire(ioqueue->lock);    counter = 0;    /* Scan for writable sockets first to handle piggy-back data     * coming with accept().     */    h = ioqueue->active_list.next;    for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {	if ( (key_has_pending_write(h) || key_has_pending_connect(h))	     && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))        {#if PJ_IOQUEUE_HAS_SAFE_UNREG	    increment_counter(h);#endif            event[counter].key = h;            event[counter].event_type = WRITEABLE_EVENT;            ++counter;        }        /* Scan for readable socket. */	if ((key_has_pending_read(h) || key_has_pending_accept(h))            && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h))        {#if PJ_IOQUEUE_HAS_SAFE_UNREG	    increment_counter(h);#endif            event[counter].key = h;            event[counter].event_type = READABLE_EVENT;            ++counter;	}#if PJ_HAS_TCP        if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&	    !IS_CLOSING(h)) 	{#if PJ_IOQUEUE_HAS_SAFE_UNREG	    increment_counter(h);#endif            event[counter].key = h;            event[counter].event_type = EXCEPTION_EVENT;            ++counter;        }#endif    }    pj_lock_release(ioqueue->lock);    count = counter;    /* Now process all events. The dispatch functions will take care     * of locking in each of the key     */    for (counter=0; counter<count; ++counter) {        switch (event[counter].event_type) {        case READABLE_EVENT:            ioqueue_dispatch_read_event(ioqueue, event[counter].key);            break;        case WRITEABLE_EVENT:            ioqueue_dispatch_write_event(ioqueue, event[counter].key);            break;        case EXCEPTION_EVENT:            ioqueue_dispatch_exception_event(ioqueue, event[counter].key);            break;        case NO_EVENT:            pj_assert(!"Invalid event!");            break;        }#if PJ_IOQUEUE_HAS_SAFE_UNREG	decrement_counter(event[counter].key);#endif    }    return count;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -