📄 memcached.c
字号:
* periodic large "get" requests from permanently chewing lots of server * memory. * * This should only be called in between requests since it can wipe output * buffers! */static void conn_shrink(conn *c) { assert(c != NULL); if (c->udp) return; if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { char *newbuf; if (c->rcurr != c->rbuf) memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); if (newbuf) { c->rbuf = newbuf; c->rsize = DATA_BUFFER_SIZE; } /* TODO check other branch... */ c->rcurr = c->rbuf; } if (c->isize > ITEM_LIST_HIGHWAT) { item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0])); if (newbuf) { c->ilist = newbuf; c->isize = ITEM_LIST_INITIAL; } /* TODO check error condition? */ } if (c->msgsize > MSG_LIST_HIGHWAT) { struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0])); if (newbuf) { c->msglist = newbuf; c->msgsize = MSG_LIST_INITIAL; } /* TODO check error condition? */ } if (c->iovsize > IOV_LIST_HIGHWAT) { struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])); if (newbuf) { c->iov = newbuf; c->iovsize = IOV_LIST_INITIAL; } /* TODO check return value */ }}/* * Sets a connection's current state in the state machine. Any special * processing that needs to happen on certain state transitions can * happen here. */static void conn_set_state(conn *c, int state) { assert(c != NULL); if (state != c->state) { if (state == conn_read) { conn_shrink(c); assoc_move_next_bucket(); } c->state = state; }}/* * Ensures that there is room for another struct iovec in a connection's * iov list. * * Returns 0 on success, -1 on out-of-memory. */static int ensure_iov_space(conn *c) { assert(c != NULL); if (c->iovused >= c->iovsize) { int i, iovnum; struct iovec *new_iov = (struct iovec *)realloc(c->iov, (c->iovsize * 2) * sizeof(struct iovec)); if (! new_iov) return -1; c->iov = new_iov; c->iovsize *= 2; /* Point all the msghdr structures at the new list. */ for (i = 0, iovnum = 0; i < c->msgused; i++) { c->msglist[i].msg_iov = &c->iov[iovnum]; iovnum += c->msglist[i].msg_iovlen; } } return 0;}/* * Adds data to the list of pending data that will be written out to a * connection. * * Returns 0 on success, -1 on out-of-memory. */static int add_iov(conn *c, const void *buf, int len) { struct msghdr *m; int leftover; bool limit_to_mtu; assert(c != NULL); do { m = &c->msglist[c->msgused - 1]; /* * Limit UDP packets, and the first payloads of TCP replies, to * UDP_MAX_PAYLOAD_SIZE bytes. */ limit_to_mtu = c->udp || (1 == c->msgused); /* We may need to start a new msghdr if this one is full. */ if (m->msg_iovlen == IOV_MAX || (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) { add_msghdr(c); m = &c->msglist[c->msgused - 1]; } if (ensure_iov_space(c) != 0) return -1; /* If the fragment is too big to fit in the datagram, split it up */ if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) { leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE; len -= leftover; } else { leftover = 0; } m = &c->msglist[c->msgused - 1]; m->msg_iov[m->msg_iovlen].iov_base = (void *)buf; m->msg_iov[m->msg_iovlen].iov_len = len; c->msgbytes += len; c->iovused++; m->msg_iovlen++; buf = ((char *)buf) + len; len = leftover; } while (leftover > 0); return 0;}/* * Constructs a set of UDP headers and attaches them to the outgoing messages. */static int build_udp_headers(conn *c) { int i; unsigned char *hdr; assert(c != NULL); if (c->msgused > c->hdrsize) { void *new_hdrbuf; if (c->hdrbuf) new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE); else new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE); if (! new_hdrbuf) return -1; c->hdrbuf = (unsigned char *)new_hdrbuf; c->hdrsize = c->msgused * 2; } hdr = c->hdrbuf; for (i = 0; i < c->msgused; i++) { c->msglist[i].msg_iov[0].iov_base = hdr; c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE; *hdr++ = c->request_id / 256; *hdr++ = c->request_id % 256; *hdr++ = i / 256; *hdr++ = i % 256; *hdr++ = c->msgused / 256; *hdr++ = c->msgused % 256; *hdr++ = 0; *hdr++ = 0; assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE); } return 0;}static void out_string(conn *c, const char *str) { size_t len; assert(c != NULL); if (settings.verbose > 1) fprintf(stderr, ">%d %s\n", c->sfd, str); len = strlen(str); if ((len + 2) > c->wsize) { /* ought to be always enough. just fail for simplicity */ str = "SERVER_ERROR output line too long"; len = strlen(str); } memcpy(c->wbuf, str, len); memcpy(c->wbuf + len, "\r\n", 3); c->wbytes = len + 2; c->wcurr = c->wbuf; conn_set_state(c, conn_write); c->write_and_go = conn_read; return;}/* * we get here after reading the value in set/add/replace commands. The command * has been stored in c->item_comm, and the item is ready in c->item. */static void complete_nread(conn *c) { assert(c != NULL); item *it = c->item; int comm = c->item_comm; STATS_LOCK(); stats.set_cmds++; STATS_UNLOCK(); if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); } else { if (store_item(it, comm)) { out_string(c, "STORED"); } else { out_string(c, "NOT_STORED"); } } item_remove(c->item); /* release the c->item reference */ c->item = 0;}/* * Stores an item in the cache according to the semantics of one of the set * commands. In threaded mode, this is protected by the cache lock. * * Returns true if the item was stored. */int do_store_item(item *it, int comm) { char *key = ITEM_key(it); bool delete_locked = false; item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked); int stored = 0; if (old_it != NULL && comm == NREAD_ADD) { /* add only adds a nonexistent item, but promote to head of LRU */ do_item_update(old_it); } else if (!old_it && comm == NREAD_REPLACE) { /* replace only replaces an existing value; don't store */ } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) { /* replace and add can't override delete locks; don't store */ } else { /* "set" commands can override the delete lock window... in which case we have to find the old hidden item that's in the namespace/LRU but wasn't returned by item_get.... because we need to replace it */ if (delete_locked) old_it = do_item_get_nocheck(key, it->nkey); if (old_it != NULL) do_item_replace(old_it, it); else do_item_link(it); stored = 1; } if (old_it) do_item_remove(old_it); /* release our reference */ return stored;}typedef struct token_s { char *value; size_t length;} token_t;#define COMMAND_TOKEN 0#define SUBCOMMAND_TOKEN 1#define KEY_TOKEN 1#define KEY_MAX_LENGTH 250#define MAX_TOKENS 6/* * Tokenize the command string by replacing whitespace with '\0' and update * the token array tokens with pointer to start of each token and length. * Returns total number of tokens. The last valid token is the terminal * token (value points to the first unprocessed character of the string and * length zero). * * Usage example: * * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) { * for(int ix = 0; tokens[ix].length != 0; ix++) { * ... * } * ncommand = tokens[ix].value - command; * command = tokens[ix].value; * } */static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { char *s, *e; size_t ntokens = 0; assert(command != NULL && tokens != NULL && max_tokens > 1); for (s = e = command; ntokens < max_tokens - 1; ++e) { if (*e == ' ') { if (s != e) { tokens[ntokens].value = s; tokens[ntokens].length = e - s; ntokens++; *e = '\0'; } s = e + 1; } else if (*e == '\0') { if (s != e) { tokens[ntokens].value = s; tokens[ntokens].length = e - s; ntokens++; } break; /* string end */ } } /* * If we scanned the whole string, the terminal value pointer is null, * otherwise it is the first unprocessed character. */ tokens[ntokens].value = *e == '\0' ? NULL : e; tokens[ntokens].length = 0; ntokens++; return ntokens;}inline void process_stats_detail(conn *c, const char *command) { assert(c != NULL); if (strcmp(command, "on") == 0) { settings.detail_enabled = 1; out_string(c, "OK"); } else if (strcmp(command, "off") == 0) { settings.detail_enabled = 0; out_string(c, "OK"); } else if (strcmp(command, "dump") == 0) { int len; char *stats = stats_prefix_dump(&len); if (NULL != stats) { c->write_and_free = stats; c->wcurr = stats; c->wbytes = len; conn_set_state(c, conn_write); c->write_and_go = conn_read; } else { out_string(c, "SERVER_ERROR"); } } else { out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump"); }}static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { rel_time_t now = current_time; char *command; char *subcommand; assert(c != NULL); if(ntokens < 2) { out_string(c, "CLIENT_ERROR bad command line"); return; } command = tokens[COMMAND_TOKEN].value; if (ntokens == 2 && strcmp(command, "stats") == 0) { char temp[1024]; pid_t pid = getpid(); char *pos = temp; struct rusage usage; getrusage(RUSAGE_SELF, &usage); STATS_LOCK(); pos += sprintf(pos, "STAT pid %u\r\n", pid); pos += sprintf(pos, "STAT uptime %u\r\n", now); pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started); pos += sprintf(pos, "STAT version " VERSION "\r\n"); pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *)); pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec); pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec); pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items); pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items); pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes); pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */ pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns); pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs); pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds); pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds); pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits); pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses); pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions); pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read); pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written); pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes); pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads); pos += sprintf(pos, "END"); STATS_UNLOCK(); out_string(c, temp); return; } subcommand = tokens[SUBCOMMAND_TOKEN].value; if (strcmp(subcommand, "reset") == 0) { stats_reset(); out_string(c, "RESET"); return; }#ifdef HAVE_MALLOC_H#ifdef HAVE_STRUCT_MALLINFO if (strcmp(subcommand, "malloc") == 0) { char temp[512];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -