📄 ioqueue_select.c
字号:
++key->ref_count; pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);}/* Decrement the key's reference counter, and when the counter reach zero, * destroy the key. * * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. */static void decrement_counter(pj_ioqueue_key_t *key){ pj_mutex_lock(key->ioqueue->ref_cnt_mutex); --key->ref_count; if (key->ref_count == 0) { pj_assert(key->closing == 1); pj_gettimeofday(&key->free_time); key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; pj_time_val_normalize(&key->free_time); pj_lock_acquire(key->ioqueue->lock); pj_list_erase(key); pj_list_push_back(&key->ioqueue->closing_list, key); /* Rescan fdset to get max descriptor */ rescan_fdset(key->ioqueue); pj_lock_release(key->ioqueue->lock); } pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);}#endif/* * pj_ioqueue_unregister() * * Unregister handle from ioqueue. */PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key){ pj_ioqueue_t *ioqueue; PJ_ASSERT_RETURN(key, PJ_EINVAL); ioqueue = key->ioqueue; /* Lock the key to make sure no callback is simultaneously modifying * the key. We need to lock the key before ioqueue here to prevent * deadlock. */ pj_mutex_lock(key->mutex); /* Also lock ioqueue */ pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); --ioqueue->count; pj_list_erase(key); PJ_FD_CLR(key->fd, &ioqueue->rfdset); PJ_FD_CLR(key->fd, &ioqueue->wfdset);#if PJ_HAS_TCP PJ_FD_CLR(key->fd, &ioqueue->xfdset);#endif /* Close socket. */ pj_sock_close(key->fd); /* Clear callback */ key->cb.on_accept_complete = NULL; key->cb.on_connect_complete = NULL; key->cb.on_read_complete = NULL; key->cb.on_write_complete = NULL; /* Must release ioqueue lock first before decrementing counter, to * prevent deadlock. */ pj_lock_release(ioqueue->lock);#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Mark key is closing. */ key->closing = 1; /* Decrement counter. */ decrement_counter(key); /* Done. */ pj_mutex_unlock(key->mutex);#else pj_mutex_destroy(key->mutex);#endif return PJ_SUCCESS;}/* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */#if VALIDATE_FD_SETstatic void validate_sets(const pj_ioqueue_t *ioqueue, const pj_fd_set_t *rfdset, const pj_fd_set_t *wfdset, const pj_fd_set_t *xfdset){ pj_ioqueue_key_t *key; /* * This basicly would not work anymore. * We need to lock key before performing the check, but we can't do * so because we're holding ioqueue mutex. If we acquire key's mutex * now, the will cause deadlock. */ pj_assert(0); key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { if (!pj_list_empty(&key->read_list)#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 || !pj_list_empty(&key->accept_list)#endif ) { pj_assert(PJ_FD_ISSET(key->fd, rfdset)); } else { pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); } if (!pj_list_empty(&key->write_list)#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 || key->connecting#endif ) { pj_assert(PJ_FD_ISSET(key->fd, wfdset)); } else { pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0); }#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 if (key->connecting) { pj_assert(PJ_FD_ISSET(key->fd, xfdset)); } else { pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0); }#endif /* PJ_HAS_TCP */ key = key->next; }}#endif /* VALIDATE_FD_SET *//* ioqueue_remove_from_set() * This function is called from ioqueue_dispatch_event() to instruct * the ioqueue to remove the specified descriptor from ioqueue's descriptor * set for the specified event. */static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *key, enum ioqueue_event_type event_type){ pj_lock_acquire(ioqueue->lock); if (event_type == READABLE_EVENT) PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset); else if (event_type == WRITEABLE_EVENT) PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 else if (event_type == EXCEPTION_EVENT) PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);#endif else pj_assert(0); pj_lock_release(ioqueue->lock);}/* * ioqueue_add_to_set() * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc * to instruct the ioqueue to add the specified handle to ioqueue's descriptor * set for the specified event. */static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *key, enum ioqueue_event_type event_type ){ pj_lock_acquire(ioqueue->lock); if (event_type == READABLE_EVENT) PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset); else if (event_type == WRITEABLE_EVENT) PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 else if (event_type == EXCEPTION_EVENT) PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);#endif else pj_assert(0); pj_lock_release(ioqueue->lock);}#if PJ_IOQUEUE_HAS_SAFE_UNREG/* Scan closing keys to be put to free list again */static void scan_closing_keys(pj_ioqueue_t *ioqueue){ pj_time_val now; pj_ioqueue_key_t *h; pj_gettimeofday(&now); h = ioqueue->closing_list.next; while (h != &ioqueue->closing_list) { pj_ioqueue_key_t *next = h->next; pj_assert(h->closing != 0); if (PJ_TIME_VAL_GTE(now, h->free_time)) { pj_list_erase(h); pj_list_push_back(&ioqueue->free_list, h); } h = next; }}#endif/* * pj_ioqueue_poll() * * Few things worth written: * * - we used to do only one callback called per poll, but it didn't go * very well. The reason is because on some situation, the write * callback gets called all the time, thus doesn't give the read * callback to get called. This happens, for example, when user * submit write operation inside the write callback. * As the result, we changed the behaviour so that now multiple * callbacks are called in a single poll. It should be fast too, * just that we need to be carefull with the ioqueue data structs. * * - to guarantee preemptiveness etc, the poll function must strictly * work on fd_set copy of the ioqueue (not the original one). */PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout){ pj_fd_set_t rfdset, wfdset, xfdset; int count, counter; pj_ioqueue_key_t *h; struct event { pj_ioqueue_key_t *key; enum ioqueue_event_type event_type; } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); /* Lock ioqueue before making fd_set copies */ pj_lock_acquire(ioqueue->lock); /* We will only do select() when there are sockets to be polled. * Otherwise select() will return error. */ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && PJ_FD_COUNT(&ioqueue->wfdset)==0 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 && PJ_FD_COUNT(&ioqueue->xfdset)==0#endif ) {#if PJ_IOQUEUE_HAS_SAFE_UNREG scan_closing_keys(ioqueue);#endif pj_lock_release(ioqueue->lock); TRACE__((THIS_FILE, " poll: no fd is set")); if (timeout) pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); return 0; } /* Copy ioqueue's pj_fd_set_t to local variables. */ pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));#if PJ_HAS_TCP pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));#else PJ_FD_ZERO(&xfdset);#endif#if VALIDATE_FD_SET validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);#endif /* Unlock ioqueue before select(). */ pj_lock_release(ioqueue->lock); count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset, timeout); if (count <= 0) return -pj_get_netos_error(); else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; /* Scan descriptor sets for event and add the events in the event * array to be processed later in this function. We do this so that * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); counter = 0; /* Scan for writable sockets first to handle piggy-back data * coming with accept(). */ h = ioqueue->active_list.next; for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { if ( (key_has_pending_write(h) || key_has_pending_connect(h)) && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h)) {#if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h);#endif event[counter].key = h; event[counter].event_type = WRITEABLE_EVENT; ++counter; } /* Scan for readable socket. */ if ((key_has_pending_read(h) || key_has_pending_accept(h)) && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h)) {#if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h);#endif event[counter].key = h; event[counter].event_type = READABLE_EVENT; ++counter; }#if PJ_HAS_TCP if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && !IS_CLOSING(h)) {#if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h);#endif event[counter].key = h; event[counter].event_type = EXCEPTION_EVENT; ++counter; }#endif } pj_lock_release(ioqueue->lock); count = counter; /* Now process all events. The dispatch functions will take care * of locking in each of the key */ for (counter=0; counter<count; ++counter) { switch (event[counter].event_type) { case READABLE_EVENT: ioqueue_dispatch_read_event(ioqueue, event[counter].key); break; case WRITEABLE_EVENT: ioqueue_dispatch_write_event(ioqueue, event[counter].key); break; case EXCEPTION_EVENT: ioqueue_dispatch_exception_event(ioqueue, event[counter].key); break; case NO_EVENT: pj_assert(!"Invalid event!"); break; }#if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(event[counter].key);#endif } return count;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -