fdset.c
来自「mms client」· C语言 代码 · 共 463 行
C
463 行
/* * fdset.c - module for managing a large collection of file descriptors */#include <stdlib.h>#include <unistd.h>#include <errno.h>#include "gwlib/gwlib.h"struct FDSet{ /* Thread ID of the set's internal thread, which will spend most * of its time blocking on poll(). This is set when the thread * is created, and not changed after that. It's not protected * by any lock. */ long poll_thread; /* The following fields are for use by the polling thread only. * No-one else may touch them. It's not protected by any lock. */ /* Array for use with poll(). Elements 0 through size-1 are allocated. * Elements 0 through entries-1 are in use. */ struct pollfd *pollinfo; int size; int entries; /* Arrays of callback and data fields. They are kept in sync with * the pollinfo array, and are basically extra fields that we couldn't * put in struct pollfd because that structure is defined externally. */ fdset_callback_t **callbacks; void **datafields; /* The poller function loops over the table after poll() returns, * and calls callback functions that may modify the table that is * being scanned. We can't just copy the table to avoid interference, * because fdset_unregister and fdset_listen guarantee that their * operations are complete when they return -- that does not work * if poller() is scanning an outdated copy of the table. * To solve this, we have a field that marks when the table is * being scanned. If this field is true, fdset_unregister merely * sets the fd to -1 instead of deleting the whole entry. * fdset_listen will takes care to modify revents as well as * events. fdset_register always adds to the end of the table, * so it does not have to do anything special. */ int scanning; /* This field keeps track of how many fds were set to -1 by * fdset_unregister while "scanning" is true. That way we can * efficiently check if we need to scan the table to really * delete those entries. */ int deleted_entries; /* The following fields are for general use, and are of types that * have internal locks. */ /* List of struct action. Used by other threads to make requests * of the polling thread. */ List *actions;};/* Datatype to describe changes to the fdset fields that only the polling * thread may touch. Other threads use this type to submit requests to * change those fields. *//* Action life cycle: Created, then pushed on set->actions list by * action_submit. Poller thread wakes up and takes it from the list, * then calls handle_action, which performs the action and pushes it * on the action's done list. action_submit then takes it back and * destroys it. *//* If no synchronization is needed, action_submit_nosync can be used. * In that case handle_action will destroy the action itself instead * of putting it on any list. */struct action{ enum { REGISTER, LISTEN, UNREGISTER, DESTROY } type; int fd; /* Used by REGISTER, LISTEN, and UNREGISTER */ int mask; /* Used by LISTEN */ int events; /* Used by REGISTER and LISTEN */ fdset_callback_t *callback; /* Used by REGISTER */ void *data; /* Used by REGISTER */ /* When the request has been handled, an element is produced on this * list, so that the submitter can synchronize. Can be left NULL. */ List *done; /* Used by LISTEN, UNREGISTER, and DESTROY */};/* Return a new action structure of the given type, with all fields empty. */static struct action *action_create(int type){ struct action *new; new = gw_malloc(sizeof(*new)); new->type = type; new->fd = -1; new->mask = 0; new->events = 0; new->callback = NULL; new->data = NULL; new->done = NULL; return new;}static void action_destroy(struct action *action){ if (action == NULL) return; list_destroy(action->done, NULL); gw_free(action);}/* For use with list_destroy */static void action_destroy_item(void *action){ action_destroy(action);}/* * Submit an action for this set, and wait for the polling thread to * confirm that it's been done, by pushing the action on its done list. */static void submit_action(FDSet *set, struct action *action){ List *done; void *sync; gw_assert(set != NULL); gw_assert(action != NULL); done = list_create(); list_add_producer(done); action->done = done; list_append(set->actions, action); gwthread_wakeup(set->poll_thread); sync = list_consume(done); gw_assert(sync == action); action_destroy(action);}/* * As above, but don't wait for confirmation. */static void submit_action_nosync(FDSet *set, struct action *action){ list_append(set->actions, action); gwthread_wakeup(set->poll_thread);}/* Do one action for this thread and confirm that it's been done by * appending the action to its done list. May only be called by * the polling thread. Returns 0 normally, and returns -1 if the * action destroyed the set. */static int handle_action(FDSet *set, struct action *action){ int result; gw_assert(set != NULL); gw_assert(set->poll_thread == gwthread_self()); gw_assert(action != NULL); result = 0; switch (action->type) { case REGISTER: fdset_register(set, action->fd, action->events, action->callback, action->data); break; case LISTEN: fdset_listen(set, action->fd, action->mask, action->events); break; case UNREGISTER: fdset_unregister(set, action->fd); break; case DESTROY: fdset_destroy(set); result = -1; break; default: panic(0, "fdset: handle_action got unknown action type %d.", action->type); } if (action->done == NULL) action_destroy(action); else list_produce(action->done, action); return result;}/* Look up the entry number in the pollinfo array for this fd. * Right now it's a linear search, this may have to be improved. */static int find_entry(FDSet *set, int fd){ int i; gw_assert(set != NULL); gw_assert(gwthread_self() == set->poll_thread); for (i = 0; i < set->entries; i++) { if (set->pollinfo[i].fd == fd) return i; } return -1;}static void remove_entry(FDSet *set, int entry){ if (entry != set->entries - 1) { /* We need to keep the array contiguous, so move the last element * to fill in the hole. */ set->pollinfo[entry] = set->pollinfo[set->entries - 1]; set->callbacks[entry] = set->callbacks[set->entries - 1]; set->datafields[entry] = set->datafields[set->entries - 1]; } set->entries--;}static void remove_deleted_entries(FDSet *set){ int i; i = 0; while (i < set->entries && set->deleted_entries > 0) { if (set->pollinfo[i].fd < 0) { remove_entry(set, i); set->deleted_entries--; } else { i++; } }}/* Main function for polling thread. Most its time is spent blocking * in poll(). No-one else is allowed to change the fields it uses, * so other threads just put something on the actions list and wake * up this thread. That's why it checks the actions list every time * it goes through the loop. */static void poller(void *arg){ FDSet *set = arg; struct action *action; int ret; int i; gw_assert(set != NULL); for (;;) { while ((action = list_extract_first(set->actions)) != NULL) { /* handle_action returns -1 if the set was destroyed. */ if (handle_action(set, action) < 0) return; } /* Block indefinitely, waiting for activity */ ret = gwthread_poll(set->pollinfo, set->entries, -1.0); if (ret < 0) { if (errno != EINTR) { error(0, "Poller: can't handle error; sleeping 1 second."); gwthread_sleep(1.0); } continue; } /* Callbacks may modify the table while we scan it, so be careful. */ set->scanning = 1; for (i = 0; i < set->entries; i++) { if (set->pollinfo[i].revents != 0) set->callbacks[i](set->pollinfo[i].fd, set->pollinfo[i].revents, set->datafields[i]); } set->scanning = 0; if (set->deleted_entries > 0) remove_deleted_entries(set); }}FDSet *fdset_create(void){ FDSet *new; new = gw_malloc(sizeof(*new)); /* Start off with space for one element because we can't malloc 0 bytes * and we don't want to worry about these pointers being NULL. */ new->size = 1; new->entries = 0; new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size); new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size); new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size); new->scanning = 0; new->deleted_entries = 0; new->actions = list_create(); new->poll_thread = gwthread_create(poller, new); if (new->poll_thread < 0) { error(0, "Could not start internal thread for fdset."); fdset_destroy(new); return NULL; } return new;}void fdset_destroy(FDSet *set){ if (set == NULL) return; if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) { if (set->entries > 0) { warning(0, "Destroying fdset with %d active entries.", set->entries); } gw_free(set->pollinfo); gw_free(set->callbacks); gw_free(set->datafields); if (list_len(set->actions) > 0) { error(0, "Destroying fdset with %ld pending actions.", list_len(set->actions)); } list_destroy(set->actions, action_destroy_item); gw_free(set); } else { long thread = set->poll_thread; submit_action(set, action_create(DESTROY)); gwthread_join(thread); }}void fdset_register(FDSet *set, int fd, int events, fdset_callback_t callback, void *data){ int new; gw_assert(set != NULL); if (gwthread_self() != set->poll_thread) { struct action *action; action = action_create(REGISTER); action->fd = fd; action->events = events; action->callback = callback; action->data = data; submit_action_nosync(set, action); return; } gw_assert(set->entries <= set->size); if (set->entries >= set->size) { int newsize = set->entries + 1; set->pollinfo = gw_realloc(set->pollinfo, sizeof(set->pollinfo[0]) * newsize); set->callbacks = gw_realloc(set->callbacks, sizeof(set->callbacks[0]) * newsize); set->datafields = gw_realloc(set->datafields, sizeof(set->datafields[0]) * newsize); set->size = newsize; } /* We don't check set->scanning. Adding new entries is not harmful * because their revents fields are 0. */ new = set->entries++; set->pollinfo[new].fd = fd; set->pollinfo[new].events = events; set->pollinfo[new].revents = 0; set->callbacks[new] = callback; set->datafields[new] = data;}void fdset_listen(FDSet *set, int fd, int mask, int events){ int entry; gw_assert(set != NULL); if (gwthread_self() != set->poll_thread) { struct action *action; action = action_create(LISTEN); action->fd = fd; action->mask = mask; action->events = events; submit_action(set, action); return; } entry = find_entry(set, fd); if (entry < 0) { warning(0, "fdset_listen called on unregistered fd %d.", fd); return; } /* Copy the bits from events specified by the mask, and preserve the * bits not specified by the mask. */ set->pollinfo[entry].events = (set->pollinfo[entry].events & ~mask) | (events & mask); /* If poller is currently scanning the array, then change the * revents field so that the callback function will not be called * for events we should no longer listen for. The idea is the * same as for the events field, except that we only turn bits off. */ if (set->scanning) { set->pollinfo[entry].revents = set->pollinfo[entry].revents & (events | ~mask); }}void fdset_unregister(FDSet *set, int fd){ int entry; gw_assert(set != NULL); if (gwthread_self() != set->poll_thread) { struct action *action; action = action_create(UNREGISTER); action->fd = fd; submit_action(set, action); return; } /* Remove the entry from the pollinfo array */ entry = find_entry(set, fd); if (entry < 0) { warning(0, "fdset_listen called on unregistered fd %d.", fd); return; } if (entry == set->entries - 1) { /* It's the last entry. We can safely remove it even while * the array is being scanned, because the scan checks set->entries. */ set->entries--; } else if (set->scanning) { /* We can't remove entries because the array is being * scanned. Mark it as deleted. */ set->pollinfo[entry].fd = -1; set->deleted_entries++; } else { remove_entry(set, entry); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?