.#stream.c.1.21
来自「《jsp编程起步》里面的所有源代码」· 21 代码 · 共 809 行 · 第 1/2 页
21
809 行
* write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){ char temp[4]; temp[0] = code; temp[1] = (length >> 16) & 0xff; temp[2] = (length >> 8) & 0xff; temp[3] = (length) & 0xff; cse_write(s, temp, 4); if (length >= 0) cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){ if (buf) cse_write_packet(s, code, buf, strlen(buf));}intcse_read_string(stream_t *s, char *buf, int length){ int code; int l1, l2, l3; int read_length; length--; code = cse_read_byte(s); l1 = cse_read_byte(s) & 0xff; l2 = cse_read_byte(s) & 0xff; l3 = cse_read_byte(s) & 0xff; read_length = (l1 << 16) + (l2 << 8) + (l3); if (s->socket < 0) return -1; if (length > read_length) length = read_length; if (cse_read_all(s, buf, length) < 0) return -1; buf[length] = 0; /* scan extra */ for (read_length -= length; read_length > 0; read_length--) cse_read_byte(s); return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode_session(char *session){ int value = 0; int i; for (i = 2; i >= 0; i--) { int code = session[i]; if (code >= 'a' && code <= 'z') value = 64 * value + code - 'a'; else if (code >= 'A' && code <= 'Z') value = 64 * value + code - 'A' + 26; else if (code >= '0' && code <= '9') value = 64 * value + code - 'A' + 52; else if (code == '_') value = 64 * value + 62; else if (code == '/') value = 64 * value + 63; else return -1; } if (i > -1) return -1; else return value;}intcse_session_from_string(char *source, char *cookie){ char *match = strstr(source, cookie); if (match) return decode_session(match + strlen(cookie)); return -1;}/** * Adds a new host to the configuration */srun_t *cse_add_host_int(config_t *config, const char *hostname, int port, int is_backup, int is_default){ struct hostent *hostent; srun_t *srun; if (config->srun_size == 1 && (config->srun_list->is_default || ! config->srun_list->host)) config->srun_size = 0; /* Resize if too many hosts. */ if (config->srun_size >= config->srun_capacity) { int capacity = config->srun_capacity; srun_t *srun_list; if (capacity == 0) capacity = 4; srun_list = (srun_t *) cse_alloc(config->p, 2 * capacity * sizeof(srun_t)); memset(srun_list, 0, 2 * capacity * sizeof(srun_t)); if (config->srun_list) memcpy(srun_list, config->srun_list, capacity * sizeof(srun_t)); config->srun_capacity = 2 * capacity; config->srun_list = srun_list; } hostent = gethostbyname(hostname); if (hostent && hostent->h_addr) { srun = &config->srun_list[config->srun_size]; srun->hostname = cse_strdup(config->p, hostname); srun->host = (struct in_addr *) cse_alloc(config->p, sizeof (struct in_addr)); memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr)); srun->port = port; srun->is_backup = is_backup; srun->n_sockets = 0; srun->max_sockets = 32; srun->is_default = is_default && config->srun_size == 0; srun->connect_timeout = CONNECT_TIMEOUT; srun->live_time = LIVE_TIME; srun->dead_time = DEAD_TIME; /* * XXX: for dynamic srun * srun->session = cse_query_session(config, srun, config->pool); */ srun->session = config->srun_size; config->srun_size++; return srun; } else { cse_error(config, "CSE can't find host %s\n", hostname); return 0; }}/** * Select a client JVM randomly. */static intrandom_host(char *ip, int requestTime, int rand){ int host = requestTime * 23 + g_count++; host = 23 * host + rand; for (; *ip; ip++) { host = 23 * host + *ip; } if (host < 0) return -host; else return host;}/** * reuse the socket */static voidcse_reuse(stream_t *s, config_t *config, srun_t *srun, int request_time, void *web_pool){ memset(s, 0, sizeof(*s)); s->pool = config->p; s->web_pool = web_pool; s->config = config; s->update_count = config->update_count; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->socket = srun->sockets[--srun->n_sockets]; s->srun = srun; srun->is_dead = 0; srun->last_time = request_time; LOG(("reopen %d\n", s->socket));}/** * Try to recycle the socket so the next request can reuse it. */voidcse_recycle(stream_t *s){ int socket = s->socket; srun_t *srun = s->srun; cse_lock(s->config->lock); if (socket >= 0 && s->config->update_count == s->update_count && srun && srun->n_sockets < srun->max_sockets) { s->socket = -1; cse_kill_socket_cleanup(socket, s->web_pool); srun->sockets[srun->n_sockets++] = socket; cse_unlock(s->config->lock); LOG(("recycle %d\n", socket)); } else { cse_unlock(s->config->lock); LOG(("close2 %d update1:%d update2:%d n-sock:%d max-sock:%d\n", socket, s->config->update_count, s->update_count, srun ? srun->n_sockets : -1, srun ? srun->max_sockets : -1)); cse_write_packet(s, CSE_CLOSE, 0, 0); cse_close(s, "recycle"); }}/** * Try to reuse a socket */static intcse_reuse_socket(stream_t *s, config_t *config, srun_t *srun, unsigned int request_time, void *web_pool){ cse_lock(config->lock); if (request_time < srun->last_time) srun->last_time = request_time; if (srun->n_sockets > 0 && srun->last_time + srun->live_time >= request_time) { cse_reuse(s, config, srun, request_time, web_pool); cse_unlock(config->lock); return 1; } else if (srun->n_sockets > 0) { while (srun->n_sockets > 0) { int socket = srun->sockets[--srun->n_sockets]; send(socket, "X\0\0\0", 4, 0); closesocket(socket); LOG(("close reuse-timeout %d\n", socket)); } } cse_unlock(config->lock); return 0;}voidcse_close_sockets(config_t *config){ int i; for (i = 0; i < config->srun_size; i++) { srun_t *srun = config->srun_list + i; int j; for (j = 0; j < srun->n_sockets; j++) { int socket = srun->sockets[j]; if (socket >= 0) { send(socket, "X\0\0\0", 4, 0); closesocket(socket); } } srun->n_sockets = 0; } config->srun_size = 0;}static intopen_connection(stream_t *s, config_t *config, int session_index, char *ip, unsigned int request_time, int rand, void *web_pool){ int size; int i; int host; int incr; size = config->srun_size; if (size < 1) size = 1; if (session_index < 0) { host = random_host(ip, request_time, rand) % size; } else { for (host = 0; host < size; host++) { if (config->srun_list[host].session == session_index) break; } if (host == size) { host = random_host(ip, request_time, rand) % size; } } if (host < 0) host = -host; incr = 2 * (host & 1) - 1; /* // XXX: if the session belongs to a host, we should retry for a few seconds? */ for (i = 0; i < size; i++) { srun_t *srun = config->srun_list + (host + i * (incr + size)) % size; if (session_index < 0 && srun->is_backup) { } else if (cse_reuse_socket(s, config, srun, request_time, web_pool)) { return 1; } else if (srun->is_dead && srun->last_time + srun->dead_time >= request_time) { } else if (cse_open(s, config, srun, web_pool, 0)) { s->srun = srun; srun->is_dead = 0; srun->last_time = request_time; return 1; } } /* // Okay, the primaries failed. So try the secondaries. */ for (i = 0; i < size; i++) { srun_t *srun = config->srun_list + (host + i * (incr + size)) % size; if (cse_reuse_socket(s, config, srun, request_time, web_pool)) { return 1; } else if (cse_open(s, config, srun, web_pool, 1)) { s->srun = srun; srun->is_dead = 0; srun->last_time = request_time; return 1; } else { srun->is_dead = 1; srun->last_time = request_time; } } return 0;}intcse_open_connection(stream_t *s, config_t *config, int session_index, char *ip, unsigned int request_time, int rand, void *web_pool){ if (open_connection(s, config, session_index, ip, request_time, rand, web_pool)) { cse_set_socket_cleanup(s->socket, web_pool); return 1; } else { return 0; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?