📄 ioqueue_select.c
字号:
++key->ref_count;
pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
}
/* Decrement the key's reference counter, and when the counter reach zero,
* destroy the key.
*
* Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
*/
static void decrement_counter(pj_ioqueue_key_t *key)
{
pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
--key->ref_count;
if (key->ref_count == 0) {
pj_assert(key->closing == 1);
pj_gettimeofday(&key->free_time);
key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
pj_time_val_normalize(&key->free_time);
pj_lock_acquire(key->ioqueue->lock);
pj_list_erase(key);
pj_list_push_back(&key->ioqueue->closing_list, key);
/* Rescan fdset to get max descriptor */
rescan_fdset(key->ioqueue);
pj_lock_release(key->ioqueue->lock);
}
pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
}
#endif
/*
* pj_ioqueue_unregister()
*
* Unregister handle from ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
{
pj_ioqueue_t *ioqueue;
PJ_ASSERT_RETURN(key, PJ_EINVAL);
ioqueue = key->ioqueue;
/* Lock the key to make sure no callback is simultaneously modifying
* the key. We need to lock the key before ioqueue here to prevent
* deadlock.
*/
pj_mutex_lock(key->mutex);
/* Also lock ioqueue */
pj_lock_acquire(ioqueue->lock);
pj_assert(ioqueue->count > 0);
--ioqueue->count;
pj_list_erase(key);
PJ_FD_CLR(key->fd, &ioqueue->rfdset);
PJ_FD_CLR(key->fd, &ioqueue->wfdset);
#if PJ_HAS_TCP
PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
/* Close socket. */
pj_sock_close(key->fd);
/* Clear callback */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
key->cb.on_read_complete = NULL;
key->cb.on_write_complete = NULL;
/* Must release ioqueue lock first before decrementing counter, to
* prevent deadlock.
*/
pj_lock_release(ioqueue->lock);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Mark key is closing. */
key->closing = 1;
/* Decrement counter. */
decrement_counter(key);
/* Done. */
pj_mutex_unlock(key->mutex);
#else
pj_mutex_destroy(key->mutex);
#endif
return PJ_SUCCESS;
}
/* This supposed to check whether the fd_set values are consistent
* with the operation currently set in each key.
*/
#if VALIDATE_FD_SET
static void validate_sets(const pj_ioqueue_t *ioqueue,
const pj_fd_set_t *rfdset,
const pj_fd_set_t *wfdset,
const pj_fd_set_t *xfdset)
{
pj_ioqueue_key_t *key;
/*
* This basicly would not work anymore.
* We need to lock key before performing the check, but we can't do
* so because we're holding ioqueue mutex. If we acquire key's mutex
* now, the will cause deadlock.
*/
pj_assert(0);
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
if (!pj_list_empty(&key->read_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|| !pj_list_empty(&key->accept_list)
#endif
)
{
pj_assert(PJ_FD_ISSET(key->fd, rfdset));
}
else {
pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
}
if (!pj_list_empty(&key->write_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|| key->connecting
#endif
)
{
pj_assert(PJ_FD_ISSET(key->fd, wfdset));
}
else {
pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
if (key->connecting)
{
pj_assert(PJ_FD_ISSET(key->fd, xfdset));
}
else {
pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
}
#endif /* PJ_HAS_TCP */
key = key->next;
}
}
#endif /* VALIDATE_FD_SET */
/* ioqueue_remove_from_set()
* This function is called from ioqueue_dispatch_event() to instruct
* the ioqueue to remove the specified descriptor from ioqueue's descriptor
* set for the specified event.
*/
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type)
{
pj_lock_acquire(ioqueue->lock);
if (event_type == READABLE_EVENT)
PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
else if (event_type == WRITEABLE_EVENT)
PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
else if (event_type == EXCEPTION_EVENT)
PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
#endif
else
pj_assert(0);
pj_lock_release(ioqueue->lock);
}
/*
* ioqueue_add_to_set()
* This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
* to instruct the ioqueue to add the specified handle to ioqueue's descriptor
* set for the specified event.
*/
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type )
{
pj_lock_acquire(ioqueue->lock);
if (event_type == READABLE_EVENT)
PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
else if (event_type == WRITEABLE_EVENT)
PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
else if (event_type == EXCEPTION_EVENT)
PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
#endif
else
pj_assert(0);
pj_lock_release(ioqueue->lock);
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Scan closing keys to be put to free list again */
static void scan_closing_keys(pj_ioqueue_t *ioqueue)
{
pj_time_val now;
pj_ioqueue_key_t *h;
pj_gettimeofday(&now);
h = ioqueue->closing_list.next;
while (h != &ioqueue->closing_list) {
pj_ioqueue_key_t *next = h->next;
pj_assert(h->closing != 0);
if (PJ_TIME_VAL_GTE(now, h->free_time)) {
pj_list_erase(h);
pj_list_push_back(&ioqueue->free_list, h);
}
h = next;
}
}
#endif
/*
* pj_ioqueue_poll()
*
* Few things worth written:
*
* - we used to do only one callback called per poll, but it didn't go
* very well. The reason is because on some situation, the write
* callback gets called all the time, thus doesn't give the read
* callback to get called. This happens, for example, when user
* submit write operation inside the write callback.
* As the result, we changed the behaviour so that now multiple
* callbacks are called in a single poll. It should be fast too,
* just that we need to be carefull with the ioqueue data structs.
*
* - to guarantee preemptiveness etc, the poll function must strictly
* work on fd_set copy of the ioqueue (not the original one).
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
int count, counter;
pj_ioqueue_key_t *h;
struct event
{
pj_ioqueue_key_t *key;
enum ioqueue_event_type event_type;
} event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
/* Lock ioqueue before making fd_set copies */
pj_lock_acquire(ioqueue->lock);
/* We will only do select() when there are sockets to be polled.
* Otherwise select() will return error.
*/
if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
PJ_FD_COUNT(&ioqueue->wfdset)==0
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
&& PJ_FD_COUNT(&ioqueue->xfdset)==0
#endif
)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
scan_closing_keys(ioqueue);
#endif
pj_lock_release(ioqueue->lock);
TRACE__((THIS_FILE, " poll: no fd is set"));
if (timeout)
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
return 0;
}
/* Copy ioqueue's pj_fd_set_t to local variables. */
pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
#if PJ_HAS_TCP
pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
#else
PJ_FD_ZERO(&xfdset);
#endif
#if VALIDATE_FD_SET
validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
#endif
/* Unlock ioqueue before select(). */
pj_lock_release(ioqueue->lock);
count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset,
timeout);
if (count <= 0)
return -pj_get_netos_error();
else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
/* Scan descriptor sets for event and add the events in the event
* array to be processed later in this function. We do this so that
* events can be processed in parallel without holding ioqueue lock.
*/
pj_lock_acquire(ioqueue->lock);
counter = 0;
/* Scan for writable sockets first to handle piggy-back data
* coming with accept().
*/
h = ioqueue->active_list.next;
for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
if ( (key_has_pending_write(h) || key_has_pending_connect(h))
&& PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
event[counter].key = h;
event[counter].event_type = WRITEABLE_EVENT;
++counter;
}
/* Scan for readable socket. */
if ((key_has_pending_read(h) || key_has_pending_accept(h))
&& PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h))
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
event[counter].key = h;
event[counter].event_type = READABLE_EVENT;
++counter;
}
#if PJ_HAS_TCP
if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
!IS_CLOSING(h))
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
event[counter].key = h;
event[counter].event_type = EXCEPTION_EVENT;
++counter;
}
#endif
}
pj_lock_release(ioqueue->lock);
count = counter;
/* Now process all events. The dispatch functions will take care
* of locking in each of the key
*/
for (counter=0; counter<count; ++counter) {
switch (event[counter].event_type) {
case READABLE_EVENT:
ioqueue_dispatch_read_event(ioqueue, event[counter].key);
break;
case WRITEABLE_EVENT:
ioqueue_dispatch_write_event(ioqueue, event[counter].key);
break;
case EXCEPTION_EVENT:
ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
break;
case NO_EVENT:
pj_assert(!"Invalid event!");
break;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(event[counter].key);
#endif
}
return count;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -