📄 memcached.c
字号:
it = item_get(key, nkey); if (it) { if (exptime == 0) { item_unlink(it); item_remove(it); /* release our reference */ out_string(c, "DELETED"); } else { /* our reference will be transfered to the delete queue */ out_string(c, defer_delete(it, exptime)); } } else { out_string(c, "NOT_FOUND"); }}/* * Adds an item to the deferred-delete list so it can be reaped later. * * Returns the result to send to the client. */char *do_defer_delete(item *it, time_t exptime){ if (delcurr >= deltotal) { item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2); if (new_delete) { todelete = new_delete; deltotal *= 2; } else { /* * can't delete it immediately, user wants a delay, * but we ran out of memory for the delete queue */ item_remove(it); /* release reference */ return "SERVER_ERROR out of memory"; } } /* use its expiration time as its deletion time now */ it->exptime = realtime(exptime); it->it_flags |= ITEM_DELETED; todelete[delcurr++] = it; return "DELETED";}static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) { unsigned int level; assert(c != NULL); level = strtoul(tokens[1].value, NULL, 10); settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level; out_string(c, "OK"); return;}static void process_command(conn *c, char *command) { token_t tokens[MAX_TOKENS]; size_t ntokens; int comm; assert(c != NULL); if (settings.verbose > 1) fprintf(stderr, "<%d %s\n", c->sfd, command); /* * for commands set/add/replace, we build an item and read the data * directly into it, then continue in nread_complete(). */ c->msgcurr = 0; c->msgused = 0; c->iovused = 0; if (add_msghdr(c) != 0) { out_string(c, "SERVER_ERROR out of memory"); return; } ntokens = tokenize_command(command, tokens, MAX_TOKENS); if (ntokens >= 3 && ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { process_get_command(c, tokens, ntokens); } else if (ntokens == 6 && ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) { process_update_command(c, tokens, ntokens, comm); } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 1); } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 0); } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) { process_delete_command(c, tokens, ntokens); } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) { unsigned int bucket, gen; if (!settings.managed) { out_string(c, "CLIENT_ERROR not a managed instance"); return; } if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) { if ((bucket < 0) || (bucket >= MAX_BUCKETS)) { out_string(c, "CLIENT_ERROR bucket number out of range"); return; } buckets[bucket] = gen; out_string(c, "OWNED"); return; } else { out_string(c, "CLIENT_ERROR bad format"); return; } } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) { int bucket; if (!settings.managed) { out_string(c, "CLIENT_ERROR not a managed instance"); return; } if (sscanf(tokens[1].value, "%u", &bucket) == 1) { if ((bucket < 0) || (bucket >= MAX_BUCKETS)) { out_string(c, "CLIENT_ERROR bucket number out of range"); return; } buckets[bucket] = 0; out_string(c, "DISOWNED"); return; } else { out_string(c, "CLIENT_ERROR bad format"); return; } } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) { int bucket, gen; if (!settings.managed) { out_string(c, "CLIENT_ERROR not a managed instance"); return; } if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) { /* we never write anything back, even if input's wrong */ if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) { /* do nothing, bad input */ } else { c->bucket = bucket; c->gen = gen; } conn_set_state(c, conn_read); return; } else { out_string(c, "CLIENT_ERROR bad format"); return; } } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) { process_stat(c, tokens, ntokens); } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) { time_t exptime = 0; set_current_time(); if(ntokens == 2) { settings.oldest_live = current_time - 1; item_flush_expired(); out_string(c, "OK"); return; } exptime = strtol(tokens[1].value, NULL, 10); if(errno == ERANGE) { out_string(c, "CLIENT_ERROR bad command line format"); return; } settings.oldest_live = realtime(exptime) - 1; item_flush_expired(); out_string(c, "OK"); return; } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) { out_string(c, "VERSION " VERSION); } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) { conn_set_state(c, conn_closing); } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 && strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {#ifdef ALLOW_SLABS_REASSIGN int src, dst, rv; src = strtol(tokens[2].value, NULL, 10); dst = strtol(tokens[3].value, NULL, 10); if(errno == ERANGE) { out_string(c, "CLIENT_ERROR bad command line format"); return; } rv = slabs_reassign(src, dst); if (rv == 1) { out_string(c, "DONE"); return; } if (rv == 0) { out_string(c, "CANT"); return; } if (rv == -1) { out_string(c, "BUSY"); return; }#else out_string(c, "CLIENT_ERROR Slab reassignment not supported");#endif } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) { process_verbosity_command(c, tokens, ntokens); } else { out_string(c, "ERROR"); } return;}/* * if we have a complete line in the buffer, process it. */static int try_read_command(conn *c) { char *el, *cont; assert(c != NULL); assert(c->rcurr <= (c->rbuf + c->rsize)); if (c->rbytes == 0) return 0; el = memchr(c->rcurr, '\n', c->rbytes); if (!el) return 0; cont = el + 1; if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { el--; } *el = '\0'; assert(cont <= (c->rcurr + c->rbytes)); process_command(c, c->rcurr); c->rbytes -= (cont - c->rcurr); c->rcurr = cont; assert(c->rcurr <= (c->rbuf + c->rsize)); return 1;}/* * read a UDP request. * return 0 if there's nothing to read. */static int try_read_udp(conn *c) { int res; assert(c != NULL); c->request_addr_size = sizeof(c->request_addr); res = recvfrom(c->sfd, c->rbuf, c->rsize, 0, &c->request_addr, &c->request_addr_size); if (res > 8) { unsigned char *buf = (unsigned char *)c->rbuf; STATS_LOCK(); stats.bytes_read += res; STATS_UNLOCK(); /* Beginning of UDP packet is the request ID; save it. */ c->request_id = buf[0] * 256 + buf[1]; /* If this is a multi-packet request, drop it. */ if (buf[4] != 0 || buf[5] != 1) { out_string(c, "SERVER_ERROR multi-packet request not supported"); return 0; } /* Don't care about any of the rest of the header. */ res -= 8; memmove(c->rbuf, c->rbuf + 8, res); c->rbytes += res; c->rcurr = c->rbuf; return 1; } return 0;}/* * read from network as much as we can, handle buffer overflow and connection * close. * before reading, move the remaining incomplete fragment of a command * (if any) to the beginning of the buffer. * return 0 if there's nothing to read on the first read. */static int try_read_network(conn *c) { int gotdata = 0; int res; assert(c != NULL); if (c->rcurr != c->rbuf) { if (c->rbytes != 0) /* otherwise there's nothing to copy */ memmove(c->rbuf, c->rcurr, c->rbytes); c->rcurr = c->rbuf; } while (1) { if (c->rbytes >= c->rsize) { char *new_rbuf = realloc(c->rbuf, c->rsize * 2); if (!new_rbuf) { if (settings.verbose > 0) fprintf(stderr, "Couldn't realloc input buffer\n"); c->rbytes = 0; /* ignore what we read */ out_string(c, "SERVER_ERROR out of memory"); c->write_and_go = conn_closing; return 1; } c->rcurr = c->rbuf = new_rbuf; c->rsize *= 2; } /* unix socket mode doesn't need this, so zeroed out. but why * is this done for every command? presumably for UDP * mode. */ if (!settings.socketpath) { c->request_addr_size = sizeof(c->request_addr); } else { c->request_addr_size = 0; } res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); if (res > 0) { STATS_LOCK(); stats.bytes_read += res; STATS_UNLOCK(); gotdata = 1; c->rbytes += res; continue; } if (res == 0) { /* connection closed */ conn_set_state(c, conn_closing); return 1; } if (res == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; else return 0; } } return gotdata;}static bool update_event(conn *c, const int new_flags) { assert(c != NULL); struct event_base *base = c->event.ev_base; if (c->ev_flags == new_flags) return true; if (event_del(&c->event) == -1) return false; event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c); event_base_set(base, &c->event); c->ev_flags = new_flags; if (event_add(&c->event, 0) == -1) return false; return true;}/* * Sets whether we are listening for new connections or not. */void accept_new_conns(const bool do_accept) { if (! is_listen_thread()) return; if (do_accept) { update_event(listen_conn, EV_READ | EV_PERSIST); if (listen(listen_conn->sfd, 1024) != 0) { perror("listen"); } } else { update_event(listen_conn, 0); if (listen(listen_conn->sfd, 0) != 0) { perror("listen"); } }}/* * Transmit the next chunk of data from our list of msgbuf structures. * * Returns: * TRANSMIT_COMPLETE All done writing. * TRANSMIT_INCOMPLETE More data remaining to write. * TRANSMIT_SOFT_ERROR Can't write any more right now. * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) */static int transmit(conn *c) { assert(c != NULL); if (c->msgcurr < c->msgused && c->msglist[c->msgcurr].msg_iovlen == 0) { /* Finished writing the current msg; advance to the next. */ c->msgcurr++; } if (c->msgcurr < c->msgused) { ssize_t res; struct msghdr *m = &c->msglist[c->msgcurr]; res = sendmsg(c->sfd, m, 0); if (res > 0) { STATS_LOCK(); stats.bytes_written += res; STATS_UNLOCK(); /* We've written some of the data. Remove the completed iovec entries from the list of pending writes. */ while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) { res -= m->msg_iov->iov_len; m->msg_iovlen--; m->msg_iov++; } /* Might have written just part of the last iovec entry; adjust it so the next write will do the rest. */ if (res > 0) { m->msg_iov->iov_base += res; m->msg_iov->iov_len -= res; } return TRANSMIT_INCOMPLETE; } if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -