fdset.c

来自「mms client」· C语言 代码 · 共 463 行

C
463
字号
/* * fdset.c - module for managing a large collection of file descriptors */#include <stdlib.h>#include <unistd.h>#include <errno.h>#include "gwlib/gwlib.h"struct FDSet{    /* Thread ID of the set's internal thread, which will spend most     * of its time blocking on poll().  This is set when the thread     * is created, and not changed after that.  It's not protected     * by any lock. */    long poll_thread;    /* The following fields are for use by the polling thread only.     * No-one else may touch them.  It's not protected by any lock. */    /* Array for use with poll().  Elements 0 through size-1 are allocated.     * Elements 0 through entries-1 are in use. */    struct pollfd *pollinfo;    int size;    int entries;    /* Arrays of callback and data fields.  They are kept in sync with     * the pollinfo array, and are basically extra fields that we couldn't     * put in struct pollfd because that structure is defined externally. */    fdset_callback_t **callbacks;    void **datafields;    /* The poller function loops over the table after poll() returns,     * and calls callback functions that may modify the table that is     * being scanned.  We can't just copy the table to avoid interference,     * because fdset_unregister and fdset_listen guarantee that their     * operations are complete when they return -- that does not work     * if poller() is scanning an outdated copy of the table.     * To solve this, we have a field that marks when the table is     * being scanned.  If this field is true, fdset_unregister merely     * sets the fd to -1 instead of deleting the whole entry.     * fdset_listen will takes care to modify revents as well as     * events. fdset_register always adds to the end of the table,     * so it does not have to do anything special.     */    int scanning;    /* This field keeps track of how many fds were set to -1 by     * fdset_unregister while "scanning" is true.  That way we can     * efficiently check if we need to scan the table to really      * delete those entries. */    int deleted_entries;        /* The following fields are for general use, and are of types that     * have internal locks. */    /* List of struct action.  Used by other threads to make requests     * of the polling thread. */    List *actions;};/* Datatype to describe changes to the fdset fields that only the polling * thread may touch.  Other threads use this type to submit requests to * change those fields. *//* Action life cycle: Created, then pushed on set->actions list by * action_submit.  Poller thread wakes up and takes it from the list, * then calls handle_action, which performs the action and pushes it * on the action's done list.  action_submit then takes it back and * destroys it. *//* If no synchronization is needed, action_submit_nosync can be used. * In that case handle_action will destroy the action itself instead * of putting it on any list. */struct action{    enum { REGISTER, LISTEN, UNREGISTER, DESTROY } type;    int fd;                     /* Used by REGISTER, LISTEN, and UNREGISTER */    int mask;                   /* Used by LISTEN */    int events;                 /* Used by REGISTER and LISTEN */    fdset_callback_t *callback; /* Used by REGISTER */    void *data;                 /* Used by REGISTER */    /* When the request has been handled, an element is produced on this     * list, so that the submitter can synchronize.  Can be left NULL. */    List *done;                 /* Used by LISTEN, UNREGISTER, and DESTROY */};/* Return a new action structure of the given type, with all fields empty. */static struct action *action_create(int type){    struct action *new;    new = gw_malloc(sizeof(*new));    new->type = type;    new->fd = -1;    new->mask = 0;    new->events = 0;    new->callback = NULL;    new->data = NULL;    new->done = NULL;    return new;}static void action_destroy(struct action *action){    if (action == NULL)        return;    list_destroy(action->done, NULL);    gw_free(action);}/* For use with list_destroy */static void action_destroy_item(void *action){    action_destroy(action);}/* * Submit an action for this set, and wait for the polling thread to * confirm that it's been done, by pushing the action on its done list. */static void submit_action(FDSet *set, struct action *action){    List *done;    void *sync;    gw_assert(set != NULL);    gw_assert(action != NULL);    done = list_create();    list_add_producer(done);    action->done = done;    list_append(set->actions, action);    gwthread_wakeup(set->poll_thread);    sync = list_consume(done);    gw_assert(sync == action);    action_destroy(action);}/*  * As above, but don't wait for confirmation. */static void submit_action_nosync(FDSet *set, struct action *action){    list_append(set->actions, action);    gwthread_wakeup(set->poll_thread);}/* Do one action for this thread and confirm that it's been done by * appending the action to its done list.  May only be called by * the polling thread.  Returns 0 normally, and returns -1 if the * action destroyed the set. */static int handle_action(FDSet *set, struct action *action){    int result;    gw_assert(set != NULL);    gw_assert(set->poll_thread == gwthread_self());    gw_assert(action != NULL);    result = 0;    switch (action->type) {    case REGISTER:        fdset_register(set, action->fd, action->events,                       action->callback, action->data);        break;    case LISTEN:        fdset_listen(set, action->fd, action->mask, action->events);        break;    case UNREGISTER:        fdset_unregister(set, action->fd);        break;    case DESTROY:        fdset_destroy(set);        result = -1;        break;    default:        panic(0, "fdset: handle_action got unknown action type %d.",              action->type);    }    if (action->done == NULL)	action_destroy(action);    else        list_produce(action->done, action);    return result;}/* Look up the entry number in the pollinfo array for this fd. * Right now it's a linear search, this may have to be improved. */static int find_entry(FDSet *set, int fd){    int i;    gw_assert(set != NULL);    gw_assert(gwthread_self() == set->poll_thread);    for (i = 0; i < set->entries; i++) {        if (set->pollinfo[i].fd == fd)            return i;    }    return -1;}static void remove_entry(FDSet *set, int entry){    if (entry != set->entries - 1) {        /* We need to keep the array contiguous, so move the last element         * to fill in the hole. */        set->pollinfo[entry] = set->pollinfo[set->entries - 1];        set->callbacks[entry] = set->callbacks[set->entries - 1];        set->datafields[entry] = set->datafields[set->entries - 1];    }    set->entries--;}static void remove_deleted_entries(FDSet *set){    int i;    i = 0;    while (i < set->entries && set->deleted_entries > 0) {        if (set->pollinfo[i].fd < 0) {            remove_entry(set, i);	    set->deleted_entries--;	} else {	    i++;        }    }}/* Main function for polling thread.  Most its time is spent blocking * in poll().  No-one else is allowed to change the fields it uses, * so other threads just put something on the actions list and wake * up this thread.  That's why it checks the actions list every time * it goes through the loop. */static void poller(void *arg){    FDSet *set = arg;    struct action *action;    int ret;    int i;    gw_assert(set != NULL);    for (;;) {        while ((action = list_extract_first(set->actions)) != NULL) {            /* handle_action returns -1 if the set was destroyed. */            if (handle_action(set, action) < 0)                return;        }        /* Block indefinitely, waiting for activity */        ret = gwthread_poll(set->pollinfo, set->entries, -1.0);        if (ret < 0) {	    if (errno != EINTR) {                error(0, "Poller: can't handle error; sleeping 1 second.");                gwthread_sleep(1.0);            }            continue;        }	/* Callbacks may modify the table while we scan it, so be careful. */	set->scanning = 1;        for (i = 0; i < set->entries; i++) {            if (set->pollinfo[i].revents != 0)                set->callbacks[i](set->pollinfo[i].fd,                                  set->pollinfo[i].revents,                                  set->datafields[i]);        }	set->scanning = 0;	if (set->deleted_entries > 0)	    remove_deleted_entries(set);    }}FDSet *fdset_create(void){    FDSet *new;    new = gw_malloc(sizeof(*new));    /* Start off with space for one element because we can't malloc 0 bytes     * and we don't want to worry about these pointers being NULL. */    new->size = 1;    new->entries = 0;    new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size);    new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size);    new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size);    new->scanning = 0;    new->deleted_entries = 0;    new->actions = list_create();    new->poll_thread = gwthread_create(poller, new);    if (new->poll_thread < 0) {        error(0, "Could not start internal thread for fdset.");        fdset_destroy(new);        return NULL;    }    return new;}void fdset_destroy(FDSet *set){    if (set == NULL)        return;    if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) {        if (set->entries > 0) {            warning(0, "Destroying fdset with %d active entries.",                    set->entries);        }        gw_free(set->pollinfo);        gw_free(set->callbacks);        gw_free(set->datafields);        if (list_len(set->actions) > 0) {            error(0, "Destroying fdset with %ld pending actions.",                  list_len(set->actions));        }        list_destroy(set->actions, action_destroy_item);        gw_free(set);    } else {        long thread = set->poll_thread;        submit_action(set, action_create(DESTROY));	gwthread_join(thread);    }}void fdset_register(FDSet *set, int fd, int events,                    fdset_callback_t callback, void *data){    int new;    gw_assert(set != NULL);    if (gwthread_self() != set->poll_thread) {        struct action *action;        action = action_create(REGISTER);        action->fd = fd;        action->events = events;        action->callback = callback;        action->data = data;	submit_action_nosync(set, action);        return;    }    gw_assert(set->entries <= set->size);    if (set->entries >= set->size) {        int newsize = set->entries + 1;        set->pollinfo = gw_realloc(set->pollinfo,                                   sizeof(set->pollinfo[0]) * newsize);        set->callbacks = gw_realloc(set->callbacks,                                   sizeof(set->callbacks[0]) * newsize);        set->datafields = gw_realloc(set->datafields,                                   sizeof(set->datafields[0]) * newsize);        set->size = newsize;    }    /* We don't check set->scanning.  Adding new entries is not harmful     * because their revents fields are 0. */    new = set->entries++;    set->pollinfo[new].fd = fd;    set->pollinfo[new].events = events;    set->pollinfo[new].revents = 0;    set->callbacks[new] = callback;    set->datafields[new] = data;}void fdset_listen(FDSet *set, int fd, int mask, int events){    int entry;    gw_assert(set != NULL);    if (gwthread_self() != set->poll_thread) {        struct action *action;        action = action_create(LISTEN);        action->fd = fd;	action->mask = mask;        action->events = events;        submit_action(set, action);        return;    }    entry = find_entry(set, fd);       if (entry < 0) {        warning(0, "fdset_listen called on unregistered fd %d.", fd);        return;    }    /* Copy the bits from events specified by the mask, and preserve the     * bits not specified by the mask. */    set->pollinfo[entry].events =	(set->pollinfo[entry].events & ~mask) | (events & mask);    /* If poller is currently scanning the array, then change the     * revents field so that the callback function will not be called     * for events we should no longer listen for.  The idea is the     * same as for the events field, except that we only turn bits off. */    if (set->scanning) {        set->pollinfo[entry].revents =            set->pollinfo[entry].revents & (events | ~mask);    }}void fdset_unregister(FDSet *set, int fd){    int entry;    gw_assert(set != NULL);    if (gwthread_self() != set->poll_thread) {        struct action *action;        action = action_create(UNREGISTER);        action->fd = fd;        submit_action(set, action);        return;    }    /* Remove the entry from the pollinfo array */    entry = find_entry(set, fd);    if (entry < 0) {        warning(0, "fdset_listen called on unregistered fd %d.", fd);        return;    }    if (entry == set->entries - 1) {        /* It's the last entry.  We can safely remove it even while         * the array is being scanned, because the scan checks set->entries. */        set->entries--;    } else if (set->scanning) {        /* We can't remove entries because the array is being         * scanned.  Mark it as deleted.  */        set->pollinfo[entry].fd = -1;        set->deleted_entries++;    } else {        remove_entry(set, entry);    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?