⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 memcached.c

📁 memcached是一个高性能的分布式的内存对象缓存系统
💻 C
📖 第 1 页 / 共 5 页
字号:
    it = item_get(key, nkey);    if (it) {        if (exptime == 0) {            item_unlink(it);            item_remove(it);      /* release our reference */            out_string(c, "DELETED");        } else {            /* our reference will be transfered to the delete queue */            out_string(c, defer_delete(it, exptime));        }    } else {        out_string(c, "NOT_FOUND");    }}/* * Adds an item to the deferred-delete list so it can be reaped later. * * Returns the result to send to the client. */char *do_defer_delete(item *it, time_t exptime){    if (delcurr >= deltotal) {        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);        if (new_delete) {            todelete = new_delete;            deltotal *= 2;        } else {            /*             * can't delete it immediately, user wants a delay,             * but we ran out of memory for the delete queue             */            item_remove(it);    /* release reference */            return "SERVER_ERROR out of memory";        }    }    /* use its expiration time as its deletion time now */    it->exptime = realtime(exptime);    it->it_flags |= ITEM_DELETED;    todelete[delcurr++] = it;    return "DELETED";}static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {    unsigned int level;    assert(c != NULL);    level = strtoul(tokens[1].value, NULL, 10);    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;    out_string(c, "OK");    return;}static void process_command(conn *c, char *command) {    token_t tokens[MAX_TOKENS];    size_t ntokens;    int comm;    assert(c != NULL);    if (settings.verbose > 1)        fprintf(stderr, "<%d %s\n", c->sfd, command);    /*     * for commands set/add/replace, we build an item and read the data     * directly into it, then continue in nread_complete().     */    c->msgcurr = 0;    c->msgused = 0;    c->iovused = 0;    if (add_msghdr(c) != 0) {        out_string(c, "SERVER_ERROR out of memory");        return;    }    ntokens = tokenize_command(command, tokens, MAX_TOKENS);    if (ntokens >= 3 &&        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {        process_get_command(c, tokens, ntokens);    } else if (ntokens == 6 &&               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {        process_update_command(c, tokens, ntokens, comm);    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {        process_arithmetic_command(c, tokens, ntokens, 1);    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {        process_arithmetic_command(c, tokens, ntokens, 0);    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {        process_delete_command(c, tokens, ntokens);    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {        unsigned int bucket, gen;        if (!settings.managed) {            out_string(c, "CLIENT_ERROR not a managed instance");            return;        }        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {                out_string(c, "CLIENT_ERROR bucket number out of range");                return;            }            buckets[bucket] = gen;            out_string(c, "OWNED");            return;        } else {            out_string(c, "CLIENT_ERROR bad format");            return;        }    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {        int bucket;        if (!settings.managed) {            out_string(c, "CLIENT_ERROR not a managed instance");            return;        }        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {                out_string(c, "CLIENT_ERROR bucket number out of range");                return;            }            buckets[bucket] = 0;            out_string(c, "DISOWNED");            return;        } else {            out_string(c, "CLIENT_ERROR bad format");            return;        }    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {        int bucket, gen;        if (!settings.managed) {            out_string(c, "CLIENT_ERROR not a managed instance");            return;        }        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {            /* we never write anything back, even if input's wrong */            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {                /* do nothing, bad input */            } else {                c->bucket = bucket;                c->gen = gen;            }            conn_set_state(c, conn_read);            return;        } else {            out_string(c, "CLIENT_ERROR bad format");            return;        }    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {        process_stat(c, tokens, ntokens);    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {        time_t exptime = 0;        set_current_time();        if(ntokens == 2) {            settings.oldest_live = current_time - 1;            item_flush_expired();            out_string(c, "OK");            return;        }        exptime = strtol(tokens[1].value, NULL, 10);        if(errno == ERANGE) {            out_string(c, "CLIENT_ERROR bad command line format");            return;        }        settings.oldest_live = realtime(exptime) - 1;        item_flush_expired();        out_string(c, "OK");        return;    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {        out_string(c, "VERSION " VERSION);    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {        conn_set_state(c, conn_closing);    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {#ifdef ALLOW_SLABS_REASSIGN        int src, dst, rv;        src = strtol(tokens[2].value, NULL, 10);        dst  = strtol(tokens[3].value, NULL, 10);        if(errno == ERANGE) {            out_string(c, "CLIENT_ERROR bad command line format");            return;        }        rv = slabs_reassign(src, dst);        if (rv == 1) {            out_string(c, "DONE");            return;        }        if (rv == 0) {            out_string(c, "CANT");            return;        }        if (rv == -1) {            out_string(c, "BUSY");            return;        }#else        out_string(c, "CLIENT_ERROR Slab reassignment not supported");#endif    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {        process_verbosity_command(c, tokens, ntokens);    } else {        out_string(c, "ERROR");    }    return;}/* * if we have a complete line in the buffer, process it. */static int try_read_command(conn *c) {    char *el, *cont;    assert(c != NULL);    assert(c->rcurr <= (c->rbuf + c->rsize));    if (c->rbytes == 0)        return 0;    el = memchr(c->rcurr, '\n', c->rbytes);    if (!el)        return 0;    cont = el + 1;    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {        el--;    }    *el = '\0';    assert(cont <= (c->rcurr + c->rbytes));    process_command(c, c->rcurr);    c->rbytes -= (cont - c->rcurr);    c->rcurr = cont;    assert(c->rcurr <= (c->rbuf + c->rsize));    return 1;}/* * read a UDP request. * return 0 if there's nothing to read. */static int try_read_udp(conn *c) {    int res;    assert(c != NULL);    c->request_addr_size = sizeof(c->request_addr);    res = recvfrom(c->sfd, c->rbuf, c->rsize,                   0, &c->request_addr, &c->request_addr_size);    if (res > 8) {        unsigned char *buf = (unsigned char *)c->rbuf;        STATS_LOCK();        stats.bytes_read += res;        STATS_UNLOCK();        /* Beginning of UDP packet is the request ID; save it. */        c->request_id = buf[0] * 256 + buf[1];        /* If this is a multi-packet request, drop it. */        if (buf[4] != 0 || buf[5] != 1) {            out_string(c, "SERVER_ERROR multi-packet request not supported");            return 0;        }        /* Don't care about any of the rest of the header. */        res -= 8;        memmove(c->rbuf, c->rbuf + 8, res);        c->rbytes += res;        c->rcurr = c->rbuf;        return 1;    }    return 0;}/* * read from network as much as we can, handle buffer overflow and connection * close. * before reading, move the remaining incomplete fragment of a command * (if any) to the beginning of the buffer. * return 0 if there's nothing to read on the first read. */static int try_read_network(conn *c) {    int gotdata = 0;    int res;    assert(c != NULL);    if (c->rcurr != c->rbuf) {        if (c->rbytes != 0) /* otherwise there's nothing to copy */            memmove(c->rbuf, c->rcurr, c->rbytes);        c->rcurr = c->rbuf;    }    while (1) {        if (c->rbytes >= c->rsize) {            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);            if (!new_rbuf) {                if (settings.verbose > 0)                    fprintf(stderr, "Couldn't realloc input buffer\n");                c->rbytes = 0; /* ignore what we read */                out_string(c, "SERVER_ERROR out of memory");                c->write_and_go = conn_closing;                return 1;            }            c->rcurr = c->rbuf = new_rbuf;            c->rsize *= 2;        }        /* unix socket mode doesn't need this, so zeroed out.  but why         * is this done for every command?  presumably for UDP         * mode.  */        if (!settings.socketpath) {            c->request_addr_size = sizeof(c->request_addr);        } else {            c->request_addr_size = 0;        }        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);        if (res > 0) {            STATS_LOCK();            stats.bytes_read += res;            STATS_UNLOCK();            gotdata = 1;            c->rbytes += res;            continue;        }        if (res == 0) {            /* connection closed */            conn_set_state(c, conn_closing);            return 1;        }        if (res == -1) {            if (errno == EAGAIN || errno == EWOULDBLOCK) break;            else return 0;        }    }    return gotdata;}static bool update_event(conn *c, const int new_flags) {    assert(c != NULL);    struct event_base *base = c->event.ev_base;    if (c->ev_flags == new_flags)        return true;    if (event_del(&c->event) == -1) return false;    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);    event_base_set(base, &c->event);    c->ev_flags = new_flags;    if (event_add(&c->event, 0) == -1) return false;    return true;}/* * Sets whether we are listening for new connections or not. */void accept_new_conns(const bool do_accept) {    if (! is_listen_thread())        return;    if (do_accept) {        update_event(listen_conn, EV_READ | EV_PERSIST);        if (listen(listen_conn->sfd, 1024) != 0) {            perror("listen");        }    }    else {        update_event(listen_conn, 0);        if (listen(listen_conn->sfd, 0) != 0) {            perror("listen");        }    }}/* * Transmit the next chunk of data from our list of msgbuf structures. * * Returns: *   TRANSMIT_COMPLETE   All done writing. *   TRANSMIT_INCOMPLETE More data remaining to write. *   TRANSMIT_SOFT_ERROR Can't write any more right now. *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) */static int transmit(conn *c) {    assert(c != NULL);    if (c->msgcurr < c->msgused &&            c->msglist[c->msgcurr].msg_iovlen == 0) {        /* Finished writing the current msg; advance to the next. */        c->msgcurr++;    }    if (c->msgcurr < c->msgused) {        ssize_t res;        struct msghdr *m = &c->msglist[c->msgcurr];        res = sendmsg(c->sfd, m, 0);        if (res > 0) {            STATS_LOCK();            stats.bytes_written += res;            STATS_UNLOCK();            /* We've written some of the data. Remove the completed               iovec entries from the list of pending writes. */            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {                res -= m->msg_iov->iov_len;                m->msg_iovlen--;                m->msg_iov++;            }            /* Might have written just part of the last iovec entry;               adjust it so the next write will do the rest. */            if (res > 0) {                m->msg_iov->iov_base += res;                m->msg_iov->iov_len -= res;            }            return TRANSMIT_INCOMPLETE;        }        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -