📄 memcached.c
字号:
if (!update_event(c, EV_WRITE | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); conn_set_state(c, conn_closing); return TRANSMIT_HARD_ERROR; } return TRANSMIT_SOFT_ERROR; } /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK, we have a real error, on which we close the connection */ if (settings.verbose > 0) perror("Failed to write, and not due to blocking"); if (c->udp) conn_set_state(c, conn_read); else conn_set_state(c, conn_closing); return TRANSMIT_HARD_ERROR; } else { return TRANSMIT_COMPLETE; }}static void drive_machine(conn *c) { bool stop = false; int sfd, flags = 1; socklen_t addrlen; struct sockaddr addr; int res; assert(c != NULL); while (!stop) { switch(c->state) { case conn_listening: addrlen = sizeof(addr); if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { /* these are transient, so don't log anything */ stop = true; } else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); stop = true; } else { perror("accept()"); stop = true; } break; } if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, false); break; case conn_read: if (try_read_command(c) != 0) { continue; } if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) { continue; } /* we have no command line and no data to read from network */ if (!update_event(c, EV_READ | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); conn_set_state(c, conn_closing); break; } stop = true; break; case conn_nread: /* we are reading rlbytes into ritem; */ if (c->rlbytes == 0) { complete_nread(c); break; } /* first check if we have leftovers in the conn_read buffer */ if (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; memcpy(c->ritem, c->rcurr, tocopy); c->ritem += tocopy; c->rlbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; break; } /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes); if (res > 0) { STATS_LOCK(); stats.bytes_read += res; STATS_UNLOCK(); c->ritem += res; c->rlbytes -= res; break; } if (res == 0) { /* end of stream */ conn_set_state(c, conn_closing); break; } if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { if (!update_event(c, EV_READ | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); conn_set_state(c, conn_closing); break; } stop = true; break; } /* otherwise we have a real error, on which we close the connection */ if (settings.verbose > 0) fprintf(stderr, "Failed to read, and not due to blocking\n"); conn_set_state(c, conn_closing); break; case conn_swallow: /* we are reading sbytes and throwing them away */ if (c->sbytes == 0) { conn_set_state(c, conn_read); break; } /* first check if we have leftovers in the conn_read buffer */ if (c->rbytes > 0) { int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes; c->sbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; break; } /* now try reading from the socket */ res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); if (res > 0) { STATS_LOCK(); stats.bytes_read += res; STATS_UNLOCK(); c->sbytes -= res; break; } if (res == 0) { /* end of stream */ conn_set_state(c, conn_closing); break; } if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { if (!update_event(c, EV_READ | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); conn_set_state(c, conn_closing); break; } stop = true; break; } /* otherwise we have a real error, on which we close the connection */ if (settings.verbose > 0) fprintf(stderr, "Failed to read, and not due to blocking\n"); conn_set_state(c, conn_closing); break; case conn_write: /* * We want to write out a simple response. If we haven't already, * assemble it into a msgbuf list (this will be a single-entry * list for TCP or a two-entry list for UDP). */ if (c->iovused == 0 || (c->udp && c->iovused == 1)) { if (add_iov(c, c->wcurr, c->wbytes) != 0 || (c->udp && build_udp_headers(c) != 0)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't build response\n"); conn_set_state(c, conn_closing); break; } } /* fall through... */ case conn_mwrite: switch (transmit(c)) { case TRANSMIT_COMPLETE: if (c->state == conn_mwrite) { while (c->ileft > 0) { item *it = *(c->icurr); assert((it->it_flags & ITEM_SLABBED) == 0); item_remove(it); c->icurr++; c->ileft--; } conn_set_state(c, conn_read); } else if (c->state == conn_write) { if (c->write_and_free) { free(c->write_and_free); c->write_and_free = 0; } conn_set_state(c, c->write_and_go); } else { if (settings.verbose > 0) fprintf(stderr, "Unexpected state %d\n", c->state); conn_set_state(c, conn_closing); } break; case TRANSMIT_INCOMPLETE: case TRANSMIT_HARD_ERROR: break; /* Continue in state machine. */ case TRANSMIT_SOFT_ERROR: stop = true; break; } break; case conn_closing: if (c->udp) conn_cleanup(c); else conn_close(c); stop = true; break; } } return;}void event_handler(const int fd, const short which, void *arg) { conn *c; c = (conn *)arg; assert(c != NULL); c->which = which; /* sanity */ if (fd != c->sfd) { if (settings.verbose > 0) fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n"); conn_close(c); return; } drive_machine(c); /* wait for next event */ return;}static int new_socket(const bool is_udp) { int sfd; int flags; if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { perror("socket()"); return -1; } if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); return -1; } return sfd;}/* * Sets a socket's send buffer size to the maximum allowed by the system. */static void maximize_sndbuf(const int sfd) { socklen_t intsize = sizeof(int); int last_good = 0; int min, max, avg; int old_size; /* Start with the default size. */ if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) { if (settings.verbose > 0) perror("getsockopt(SO_SNDBUF)"); return; } /* Binary-search for the real maximum. */ min = old_size; max = MAX_SENDBUF_SIZE; while (min <= max) { avg = ((unsigned int)(min + max)) / 2; if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) { last_good = avg; min = avg + 1; } else { max = avg - 1; } } if (settings.verbose > 1) fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);}static int server_socket(const int port, const bool is_udp) { int sfd; struct linger ling = {0, 0}; struct sockaddr_in addr; int flags =1; if ((sfd = new_socket(is_udp)) == -1) { return -1; } setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); if (is_udp) { maximize_sndbuf(sfd); } else { setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); } /* * the memset call clears nonstandard fields in some impementations * that otherwise mess things up. */ memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr = settings.interf; if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { perror("bind()"); close(sfd); return -1; } if (!is_udp && listen(sfd, 1024) == -1) { perror("listen()"); close(sfd); return -1; } return sfd;}static int new_socket_unix(void) { int sfd; int flags; if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("socket()"); return -1; } if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); return -1; } return sfd;}static int server_socket_unix(const char *path) { int sfd; struct linger ling = {0, 0}; struct sockaddr_un addr; struct stat tstat; int flags =1; if (!path) { return -1; } if ((sfd = new_socket_unix()) == -1) { return -1; } /* * Clean up a previous socket file if we left it around */ if (lstat(path, &tstat) == 0) { if (S_ISSOCK(tstat.st_mode)) unlink(path); } setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); /* * the memset call clears nonstandard fields in some impementations * that otherwise mess things up. */ memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strcpy(addr.sun_path, path); if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { perror("bind()"); close(sfd); return -1; } if (listen(sfd, 1024) == -1) { perror("listen()"); close(sfd); return -1; } return sfd;}/* listening socket */static int l_socket = 0;/* udp socket */static int u_socket = -1;/* invoke right before gdb is called, on assert */void pre_gdb(void) { int i; if (l_socket > -1) close(l_socket); if (u_socket > -1) close(u_socket); for (i = 3; i <= 500; i++) close(i); /* so lame */ kill(getpid(), SIGABRT);}/* * We keep the current time of day in a global variable that's updated by a * timer event. This saves us a bunch of time() system calls (we really only * need to get the time once a second, whereas there can be tens of thousands * of requests a second) and allows us to use server-start-relative timestamps * rather than absolute UNIX timestamps, a space savings on systems where * sizeof(time_t) > sizeof(unsigned int). */volatile rel_time_t current_time;static struct event clockevent;/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */static void set_current_time(void) { current_time = (rel_time_t) (time(0) - stats.started);}static void clock_handler(const int fd, const short which, void *arg) { struct timeval t = {.tv_sec = 1, .tv
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -