⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 memcached.c

📁 memcached是一个高性能的分布式的内存对象缓存系统
💻 C
📖 第 1 页 / 共 5 页
字号:
            if (!update_event(c, EV_WRITE | EV_PERSIST)) {                if (settings.verbose > 0)                    fprintf(stderr, "Couldn't update event\n");                conn_set_state(c, conn_closing);                return TRANSMIT_HARD_ERROR;            }            return TRANSMIT_SOFT_ERROR;        }        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,           we have a real error, on which we close the connection */        if (settings.verbose > 0)            perror("Failed to write, and not due to blocking");        if (c->udp)            conn_set_state(c, conn_read);        else            conn_set_state(c, conn_closing);        return TRANSMIT_HARD_ERROR;    } else {        return TRANSMIT_COMPLETE;    }}static void drive_machine(conn *c) {    bool stop = false;    int sfd, flags = 1;    socklen_t addrlen;    struct sockaddr addr;    int res;    assert(c != NULL);    while (!stop) {        switch(c->state) {        case conn_listening:            addrlen = sizeof(addr);            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {                if (errno == EAGAIN || errno == EWOULDBLOCK) {                    /* these are transient, so don't log anything */                    stop = true;                } else if (errno == EMFILE) {                    if (settings.verbose > 0)                        fprintf(stderr, "Too many open connections\n");                    accept_new_conns(false);                    stop = true;                } else {                    perror("accept()");                    stop = true;                }                break;            }            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {                perror("setting O_NONBLOCK");                close(sfd);                break;            }            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,                                     DATA_BUFFER_SIZE, false);            break;        case conn_read:            if (try_read_command(c) != 0) {                continue;            }            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {                continue;            }            /* we have no command line and no data to read from network */            if (!update_event(c, EV_READ | EV_PERSIST)) {                if (settings.verbose > 0)                    fprintf(stderr, "Couldn't update event\n");                conn_set_state(c, conn_closing);                break;            }            stop = true;            break;        case conn_nread:            /* we are reading rlbytes into ritem; */            if (c->rlbytes == 0) {                complete_nread(c);                break;            }            /* first check if we have leftovers in the conn_read buffer */            if (c->rbytes > 0) {                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;                memcpy(c->ritem, c->rcurr, tocopy);                c->ritem += tocopy;                c->rlbytes -= tocopy;                c->rcurr += tocopy;                c->rbytes -= tocopy;                break;            }            /*  now try reading from the socket */            res = read(c->sfd, c->ritem, c->rlbytes);            if (res > 0) {                STATS_LOCK();                stats.bytes_read += res;                STATS_UNLOCK();                c->ritem += res;                c->rlbytes -= res;                break;            }            if (res == 0) { /* end of stream */                conn_set_state(c, conn_closing);                break;            }            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {                if (!update_event(c, EV_READ | EV_PERSIST)) {                    if (settings.verbose > 0)                        fprintf(stderr, "Couldn't update event\n");                    conn_set_state(c, conn_closing);                    break;                }                stop = true;                break;            }            /* otherwise we have a real error, on which we close the connection */            if (settings.verbose > 0)                fprintf(stderr, "Failed to read, and not due to blocking\n");            conn_set_state(c, conn_closing);            break;        case conn_swallow:            /* we are reading sbytes and throwing them away */            if (c->sbytes == 0) {                conn_set_state(c, conn_read);                break;            }            /* first check if we have leftovers in the conn_read buffer */            if (c->rbytes > 0) {                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;                c->sbytes -= tocopy;                c->rcurr += tocopy;                c->rbytes -= tocopy;                break;            }            /*  now try reading from the socket */            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);            if (res > 0) {                STATS_LOCK();                stats.bytes_read += res;                STATS_UNLOCK();                c->sbytes -= res;                break;            }            if (res == 0) { /* end of stream */                conn_set_state(c, conn_closing);                break;            }            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {                if (!update_event(c, EV_READ | EV_PERSIST)) {                    if (settings.verbose > 0)                        fprintf(stderr, "Couldn't update event\n");                    conn_set_state(c, conn_closing);                    break;                }                stop = true;                break;            }            /* otherwise we have a real error, on which we close the connection */            if (settings.verbose > 0)                fprintf(stderr, "Failed to read, and not due to blocking\n");            conn_set_state(c, conn_closing);            break;        case conn_write:            /*             * We want to write out a simple response. If we haven't already,             * assemble it into a msgbuf list (this will be a single-entry             * list for TCP or a two-entry list for UDP).             */            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||                    (c->udp && build_udp_headers(c) != 0)) {                    if (settings.verbose > 0)                        fprintf(stderr, "Couldn't build response\n");                    conn_set_state(c, conn_closing);                    break;                }            }            /* fall through... */        case conn_mwrite:            switch (transmit(c)) {            case TRANSMIT_COMPLETE:                if (c->state == conn_mwrite) {                    while (c->ileft > 0) {                        item *it = *(c->icurr);                        assert((it->it_flags & ITEM_SLABBED) == 0);                        item_remove(it);                        c->icurr++;                        c->ileft--;                    }                    conn_set_state(c, conn_read);                } else if (c->state == conn_write) {                    if (c->write_and_free) {                        free(c->write_and_free);                        c->write_and_free = 0;                    }                    conn_set_state(c, c->write_and_go);                } else {                    if (settings.verbose > 0)                        fprintf(stderr, "Unexpected state %d\n", c->state);                    conn_set_state(c, conn_closing);                }                break;            case TRANSMIT_INCOMPLETE:            case TRANSMIT_HARD_ERROR:                break;                   /* Continue in state machine. */            case TRANSMIT_SOFT_ERROR:                stop = true;                break;            }            break;        case conn_closing:            if (c->udp)                conn_cleanup(c);            else                conn_close(c);            stop = true;            break;        }    }    return;}void event_handler(const int fd, const short which, void *arg) {    conn *c;    c = (conn *)arg;    assert(c != NULL);    c->which = which;    /* sanity */    if (fd != c->sfd) {        if (settings.verbose > 0)            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");        conn_close(c);        return;    }    drive_machine(c);    /* wait for next event */    return;}static int new_socket(const bool is_udp) {    int sfd;    int flags;    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {        perror("socket()");        return -1;    }    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {        perror("setting O_NONBLOCK");        close(sfd);        return -1;    }    return sfd;}/* * Sets a socket's send buffer size to the maximum allowed by the system. */static void maximize_sndbuf(const int sfd) {    socklen_t intsize = sizeof(int);    int last_good = 0;    int min, max, avg;    int old_size;    /* Start with the default size. */    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {        if (settings.verbose > 0)            perror("getsockopt(SO_SNDBUF)");        return;    }    /* Binary-search for the real maximum. */    min = old_size;    max = MAX_SENDBUF_SIZE;    while (min <= max) {        avg = ((unsigned int)(min + max)) / 2;        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {            last_good = avg;            min = avg + 1;        } else {            max = avg - 1;        }    }    if (settings.verbose > 1)        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);}static int server_socket(const int port, const bool is_udp) {    int sfd;    struct linger ling = {0, 0};    struct sockaddr_in addr;    int flags =1;    if ((sfd = new_socket(is_udp)) == -1) {        return -1;    }    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));    if (is_udp) {        maximize_sndbuf(sfd);    } else {        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));    }    /*     * the memset call clears nonstandard fields in some impementations     * that otherwise mess things up.     */    memset(&addr, 0, sizeof(addr));    addr.sin_family = AF_INET;    addr.sin_port = htons(port);    addr.sin_addr = settings.interf;    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {        perror("bind()");        close(sfd);        return -1;    }    if (!is_udp && listen(sfd, 1024) == -1) {        perror("listen()");        close(sfd);        return -1;    }    return sfd;}static int new_socket_unix(void) {    int sfd;    int flags;    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {        perror("socket()");        return -1;    }    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {        perror("setting O_NONBLOCK");        close(sfd);        return -1;    }    return sfd;}static int server_socket_unix(const char *path) {    int sfd;    struct linger ling = {0, 0};    struct sockaddr_un addr;    struct stat tstat;    int flags =1;    if (!path) {        return -1;    }    if ((sfd = new_socket_unix()) == -1) {        return -1;    }    /*     * Clean up a previous socket file if we left it around     */    if (lstat(path, &tstat) == 0) {        if (S_ISSOCK(tstat.st_mode))            unlink(path);    }    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));    /*     * the memset call clears nonstandard fields in some impementations     * that otherwise mess things up.     */    memset(&addr, 0, sizeof(addr));    addr.sun_family = AF_UNIX;    strcpy(addr.sun_path, path);    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {        perror("bind()");        close(sfd);        return -1;    }    if (listen(sfd, 1024) == -1) {        perror("listen()");        close(sfd);        return -1;    }    return sfd;}/* listening socket */static int l_socket = 0;/* udp socket */static int u_socket = -1;/* invoke right before gdb is called, on assert */void pre_gdb(void) {    int i;    if (l_socket > -1) close(l_socket);    if (u_socket > -1) close(u_socket);    for (i = 3; i <= 500; i++) close(i); /* so lame */    kill(getpid(), SIGABRT);}/* * We keep the current time of day in a global variable that's updated by a * timer event. This saves us a bunch of time() system calls (we really only * need to get the time once a second, whereas there can be tens of thousands * of requests a second) and allows us to use server-start-relative timestamps * rather than absolute UNIX timestamps, a space savings on systems where * sizeof(time_t) > sizeof(unsigned int). */volatile rel_time_t current_time;static struct event clockevent;/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */static void set_current_time(void) {    current_time = (rel_time_t) (time(0) - stats.started);}static void clock_handler(const int fd, const short which, void *arg) {    struct timeval t = {.tv_sec = 1, .tv

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -