📄 stream.c
字号:
for (read_length -= length; read_length > 0; read_length--) cse_read_byte(s); return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode(char code){ if (code >= 'a' && code <= 'z') return code - 'a'; else if (code >= 'A' && code <= 'Z') return code - 'A' + 26; else if (code >= '0' && code <= '9') return code - '0' + 52; else if (code == '_') return 62; else if (code == '-') return 63; else return -1;}/** * Returns the session id from a cookie. */intcse_session_from_string(char *source, char *cookie, int *backup){ char *match = strstr(source, cookie); if (match) { int len = strlen(cookie); *backup = decode(match[len + 1]); return decode(match[len]); } return -1;}static srun_t *cse_add_common_host(config_t *config, const char *hostname, int port, int ssl){ int i; struct hostent *hostent; srun_t *srun; LOG(("adding host %s:%d %s\n", hostname, port, ssl ? "ssl" : "")); for (i = 0; i < g_srun_count; i++) { srun_t *srun = g_srun_list[i]; if (srun && ! strcmp(srun->hostname, hostname) && srun->port == port) { LOG(("old host %d %x\n", i, srun)); return srun; } } if (g_srun_count >= 4096) { ERR(("too many hosts\n")); return 0; } srun = g_srun_list[g_srun_count]; if (! srun) { srun = malloc(sizeof(srun_t)); memset(srun, 0, sizeof(srun_t)); g_srun_list[g_srun_count] = srun; } hostent = gethostbyname(hostname); if (hostent && hostent->h_addr) { srun->hostname = strdup(hostname); srun->host = (struct in_addr *) malloc(sizeof (struct in_addr)); memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr)); srun->port = port; srun->conn_head = 0; srun->conn_tail = 0; srun->max_sockets = 32; srun->connect_timeout = CONNECT_TIMEOUT; srun->live_time = LIVE_TIME; srun->dead_time = DEAD_TIME; srun->open = std_open; srun->read = std_read; srun->write = std_write; srun->close = std_close;#ifdef OPENSSL if (ssl) { SSL_CTX* ctx; SSL_METHOD *meth; SSLeay_add_ssl_algorithms(); meth = TLSv1_client_method(); SSL_load_error_strings(); ctx = SSL_CTX_new(meth); if (ctx) { srun->ssl = ctx; srun->open = ssl_open; srun->read = ssl_read; srun->write = ssl_write; srun->close = ssl_close; } else { ERR(("can't initialize ssl")); } }#endif srun->lock = cse_create_lock(config); g_srun_count++; LOG(("new host %d %x%s\n", g_srun_count, srun, ssl ? " ssl" : "")); return srun; } return 0;}/** * Adds a new host to the configuration */srun_item_t *cse_add_host_int(config_t *config, const char *hostname, int port, int session, char *id, char *group_id, int is_backup, int is_ssl){ srun_t *srun; srun_item_t *srun_item; int index; index = session >= 0 ? session : config->srun_size; /* Resize if too many hosts. */ while (index >= config->srun_capacity) { int capacity = config->srun_capacity; srun_item_t *srun_list; if (capacity == 0) capacity = 16; srun_list = (srun_item_t *) cse_alloc(config->p, 2 * capacity * sizeof(srun_item_t)); memset(srun_list, 0, 2 * capacity * sizeof(srun_item_t)); if (config->srun_list) memcpy(srun_list, config->srun_list, capacity * sizeof(srun_item_t)); config->srun_capacity = 2 * capacity; config->srun_list = srun_list; } srun = cse_add_common_host(config, hostname, port, is_ssl); if (srun) { int i, j; int group_size = 1; srun_item_t **group; if (! group_id) group_id = ""; for (i = 0; i < config->srun_size; i++) { if (config->srun_list[i].group_id && ! strcmp(config->srun_list[i].group_id, group_id)) { group_id = config->srun_list[i].group_id; group_size++; } } srun_item = &config->srun_list[index]; srun_item->srun = srun; srun_item->is_backup = is_backup; srun_item->id = id; srun_item->group_id = group_id; srun_item->session = index; srun_item->is_valid = 1; if (index >= config->srun_size) config->srun_size = index + 1; /* Configure all servers in the group. */ group = (srun_item_t **) cse_alloc(config->p, group_size * sizeof(srun_item_t *)); j = 0; for (i = 0; i < config->srun_size; i++) { if (config->srun_list[i].group_id == group_id) { config->srun_list[i].group = group; config->srun_list[i].group_index = j; config->srun_list[i].group_size = group_size; group[j++] = &config->srun_list[i]; } } return srun_item; } else { cse_error(config, "Resin can't find host %s\n", hostname); return 0; }}/** * reuse the socket */static voidcse_reuse(stream_t *s, config_t *config, srun_item_t *srun, int socket, void *ssl, int request_time, void *web_pool){ s->socket = socket; s->ssl = ssl; s->pool = config->p; s->web_pool = web_pool; s->config = config; s->update_count = config->update_count; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->srun = srun; s->sent_data = 0; srun->srun->is_dead = 0; LOG(("reopen %d\n", s->socket));}/** * Try to recycle the socket so the next request can reuse it. */voidcse_recycle(stream_t *s, unsigned int now){ int socket = s->socket; srun_item_t *srun_item = s->srun; srun_t *srun = srun_item ? srun_item->srun : 0; if (! srun) { cse_close(s, "recycle"); return; } cse_lock(srun->lock); if (socket >= 0 && s->config->update_count == s->update_count) { int head = srun->conn_head; int next_head = (head + 1) % CONN_POOL_SIZE; /* If there's room in the ring, add it. */ if (next_head != srun->conn_tail) { s->socket = -1; cse_kill_socket_cleanup(socket, s->web_pool); srun->conn_pool[head].socket = socket; srun->conn_pool[head].ssl = s->ssl; srun->conn_pool[head].last_time = now; srun->conn_head = next_head; cse_unlock(srun->lock); LOG(("recycle %d\n", socket)); return; } } cse_unlock(srun->lock); if (socket >= 0) { LOG(("close2 %d update1:%d update2:%d max-sock:%d\n", socket, s->config->update_count, s->update_count, srun ? srun->max_sockets : -1)); cse_close(s, "recycle"); }}voidclose_srun(config_t *config, srun_t *srun, unsigned int now){ int tail; cse_lock(srun->lock); for (tail = srun->conn_tail; tail != srun->conn_head; tail = (tail + 1) % CONN_POOL_SIZE) { struct conn_t *conn = &srun->conn_pool[tail]; srun->close(conn->socket, conn->ssl); LOG(("close timeout %d\n", srun->conn_pool[tail]));; } srun->conn_head = srun->conn_tail = 0; cse_unlock(srun->lock);}/** * Try to reuse a socket */static intcse_reuse_socket(stream_t *s, config_t *config, srun_item_t *srun_item, unsigned int now, void *web_pool){ int head; int next_head; srun_t *srun = srun_item->srun; LOG(("reuse head:%d tail:%d\n", srun->conn_head, srun->conn_tail)); if (! srun || srun->conn_head == srun->conn_tail) return 0; cse_lock(srun->lock); for (head = srun->conn_head; head != srun->conn_tail; head = next_head) { struct conn_t *conn; next_head = (head + CONN_POOL_SIZE - 1) % CONN_POOL_SIZE; conn = &srun->conn_pool[next_head]; if (now > conn->last_time + srun->live_time) { LOG(("closing idle socket:%d\n", conn->socket)); srun->close(conn->socket, conn->ssl); } else { int socket; void *ssl; socket = conn->socket; ssl = conn->ssl; srun->conn_head = next_head; cse_reuse(s, config, srun_item, socket, ssl, now, web_pool); cse_unlock(srun->lock); return 1; } } srun->conn_head = head; cse_unlock(srun->lock); return 0;}voidcse_close_sockets(config_t *config){ int i; for (i = 0; i < config->srun_size; i++) { srun_item_t *srun_item = config->srun_list + i; srun_t *srun = srun_item->srun; int tail; if (! srun) continue; cse_lock(srun->lock); for (tail = srun->conn_tail; tail != srun->conn_head; tail = (tail + 1) % CONN_POOL_SIZE) { struct conn_t *conn = &srun->conn_pool[tail]; int socket = conn->socket; if (socket >= 0) srun->close(socket, conn->ssl); } srun->conn_head = 0; srun->conn_tail = 0; cse_unlock(srun->lock); } config->srun_size = 0;}voidcse_close_all(){ int i; for (i = 0; i < g_srun_count; i++) { srun_t *srun = g_srun_list[i]; int tail; if (! srun) continue; cse_lock(srun->lock); for (tail = srun->conn_tail; tail != srun->conn_head; tail = (tail + 1) % CONN_POOL_SIZE) { struct conn_t *conn = &srun->conn_pool[tail]; int socket = conn->socket; if (socket >= 0) srun->close(socket, conn->ssl); } srun->conn_head = 0; srun->conn_tail = 0; cse_unlock(srun->lock); }}static intselect_host(config_t *config, unsigned int now){ int size; int round_robin; int i; int best_srun; int best_cost = 0x7fffffff; size = config->srun_size; if (size < 1) size = 1; if (config->round_robin_index < 0) { srand(65521 * time(0) + getpid() + (int) config); round_robin = rand(); if (round_robin < 0) round_robin = -round_robin; config->round_robin_index = round_robin % size; } round_robin = (config->round_robin_index + 1) % size; for (i = 0; i < size; i++) { srun_item_t *srun_item = &config->srun_list[round_robin]; if (! srun_item->is_backup) break; round_robin = (round_robin + 1) % size; } config->round_robin_index = round_robin; best_srun = round_robin; for (i = 0; i < size; i++) { int index = (i + round_robin) % size; srun_item_t *srun_item = &config->srun_list[index]; srun_t *srun = srun_item->srun; int tail; int cost; if (! srun) continue; cost = srun->active_sockets; if (srun->is_dead && now < srun->fail_time + srun->dead_time) continue; else if (cost < best_cost) { best_srun = index; best_cost = cost; } /* Close idle connections. */ for (tail = srun->conn_tail; tail != srun->conn_head; tail = (tail + 1) % CONN_POOL_SIZE) { struct conn_t *conn = &srun->conn_pool[tail]; if (now < conn->last_time + srun->live_time) break; srun->close(conn->socket, conn->ssl); srun->conn_tail = (tail + 1) % CONN_POOL_SIZE; } } return best_srun;}/** * Opens any connection within the current group. */static intopen_connection_group(stream_t *s, config_t *config, srun_item_t *owner_item, int offset, unsigned int now, void *web_pool, int ignore_dead){ srun_item_t *srun_item = 0; srun_t *srun; if (offset < 0) srun_item = owner_item; else if (owner_item->group_size < 2) return 0; else { int delta = offset % (owner_item->group_size - 1) + 1; int index = (owner_item->group_index + delta) % owner_item->group_size; srun_item = owner_item->group[index]; } srun = srun_item->srun; if (! srun) return 0; if (cse_reuse_socket(s, config, srun_item, now, web_pool)) { srun->is_dead = 0; return 1; } else if (ignore_dead && srun->is_dead && now < srun->fail_time + srun->dead_time) { } else if (cse_open(s, config, srun_item, web_pool, ! ignore_dead)) { s->srun = srun_item; srun->is_dead = 0; return 1; } else { srun->is_dead = 1; srun->fail_time = now; } return 0;}static intopen_connection_any_host(stream_t *s, config_t *config, int host, unsigned int now, void *web_pool, int ignore_dead){ int i; int size = config->srun_size; if (size < 1) size = 1; /* * Okay, the primaries failed. So try the secondaries. */ for (i = 0; i < size; i++) { srun_item_t *srun_item = config->srun_list + (host + i) % size; srun_t *srun = srun_item->srun; if (! srun) { } else if (cse_reuse_socket(s, config, srun_item, now, web_pool)) { srun->is_dead = 0; return 1; } else if (ignore_dead && srun_item->is_backup) { } else if (ignore_dead && srun->is_dead && now < srun->fail_time + srun->dead_time) { } else if (cse_open(s, config, srun_item, web_pool, ! ignore_dead)) { s->srun = srun_item; srun->is_dead = 0; return 1; } else { srun->is_dead = 1; srun->fail_time = now; } } return 0;}static intopen_session_host(stream_t *s, config_t *config, int session_index, int backup_index, unsigned int now, void *web_pool){ int host; int size = config->srun_size; if (size < 1) size = 1; for (host = 0; host < size; host++) { if (config->srun_list[host].session == session_index) { srun_item_t *owner = &config->srun_list[host]; /* try to open a connection to the session owner */ if (open_connection_group(s, config, owner, -1, now, web_pool, 1)) return 1; /* or the backup */ else if (open_connection_group(s, config, owner, backup_index, now, web_pool, 1)) return 1; /* try the original, but force a connect */ else if (open_connection_group(s, config, owner, -1, now, web_pool, 0)) return 1; /* try the backup, but force a connect */ else if (open_connection_group(s, config, owner, backup_index, now, web_pool, 0)) return 1; return 0; } } return 0;}static intopen_connection(stream_t *s, config_t *config, int session_index, int backup_index, unsigned int now, void *web_pool){ int size; int host; size = config->srun_size; if (size < 1) size = 1; if (session_index < 0) host = select_host(config, now); else if (open_session_host(s, config, session_index, backup_index, now, web_pool)) return 1; else host = select_host(config, now); if (host < 0) host = -host; /* try opening while ignoring dead servers and backups */ if (open_connection_any_host(s, config, host, now, web_pool, 1)) return 1; /* otherwise try the dead servers and backups too */ else return open_connection_any_host(s, config, host, now, web_pool, 0);}intcse_open_connection(stream_t *s, config_t *config, int session_index, int backup_index, unsigned int now, void *web_pool){ s->config = config; s->socket = -1; s->update_count = config->update_count; s->pool = config->p; s->web_pool = web_pool; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->srun = 0; s->sent_data = 0; if (config->disable_sticky_sessions) session_index = -1; if (open_connection(s, config, session_index, backup_index, now, web_pool)) { cse_set_socket_cleanup(s->socket, web_pool); return 1; } else { return 0; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -