📄 stream.c
字号:
while (cluster->srun_capacity <= index) { int capacity = cluster->srun_capacity; cluster_srun_t *srun_list; if (capacity == 0) capacity = 16; srun_list = (cluster_srun_t *) cse_alloc(pool, 2 * capacity * sizeof(cluster_srun_t)); memset(srun_list, 0, 2 * capacity * sizeof(cluster_srun_t)); if (cluster->srun_list) { memcpy(srun_list, cluster->srun_list, capacity * sizeof(cluster_srun_t)); } cluster->srun_capacity = 2 * capacity; cluster->srun_list = srun_list; } srun = cse_add_srun(cluster, hostname, port, is_ssl); if (srun) { cluster_srun = &cluster->srun_list[index]; cluster_srun->srun = srun; cluster_srun->is_backup = is_backup; cluster_srun->id = cse_strdup(pool, id); cluster_srun->index = index; cluster_srun->is_valid = 1; if (cluster->srun_size <= index) cluster->srun_size = index + 1; cluster->round_robin_index = -1; return cluster_srun; } else { cse_error(config, "Resin can't find host %s\n", hostname); return 0; }}/** * reuse the socket */static voidcse_reuse(stream_t *s, cluster_t *cluster, cluster_srun_t *srun, int socket, void *ssl, time_t request_time, void *web_pool){ config_t *config = cluster->config; s->socket = socket; s->ssl = ssl; s->pool = config->p; s->web_pool = web_pool; s->config = config; s->update_count = config->update_count; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->cluster_srun = srun; s->sent_data = 0; srun->srun->is_dead = 0; LOG(("%s:%d:cse_reuse(): reopen %d\n", __FILE__, __LINE__, s->socket));}/** * Closes the idle sockets. * * Must be called from within a lock. */static voidcse_close_idle(srun_t *srun, time_t now){ int tail; int next_tail; if (! srun) return; for (tail = srun->conn_tail; tail != srun->conn_head; tail = next_tail) { struct conn_t *conn; next_tail = (tail + 1) % CONN_POOL_SIZE; conn = &srun->conn_pool[tail]; /* from here on, it's live connections. */ if (now < conn->last_time + srun->live_time) return; srun->conn_tail = next_tail; LOG(("%s:%d:cse_close_idle(): closing idle socket:%d\n", __FILE__, __LINE__, conn->socket)); srun->close(conn->socket, conn->ssl); }}/** * Try to recycle the socket so the next request can reuse it. */voidcse_recycle(stream_t *s, time_t now){ int socket = s->socket; cluster_srun_t *cluster_srun = s->cluster_srun; srun_t *srun = cluster_srun ? cluster_srun->srun : 0; if (! srun) { cse_close(s, "recycle"); return; } cse_lock(srun->lock); cse_close_idle(srun, now); if (socket >= 0 && s->config->update_count == s->update_count) { int head = srun->conn_head; int next_head = (head + 1) % CONN_POOL_SIZE; /* If there's room in the ring, add it. */ if (next_head != srun->conn_tail) { s->socket = -1; cse_kill_socket_cleanup(socket, s->web_pool); srun->conn_pool[head].socket = socket; srun->conn_pool[head].ssl = s->ssl; srun->conn_pool[head].last_time = now; srun->conn_head = next_head; cse_unlock(srun->lock); LOG(("%s:%d:cse_recycle(): recycle %d\n", __FILE__, __LINE__, socket)); return; } } cse_unlock(srun->lock); if (socket >= 0) { LOG(("%s:%d:cse_recycle(): close2 %d update1:%d update2:%d max-sock:%d\n", __FILE__, __LINE__, socket, s->config->update_count, s->update_count, srun ? srun->max_sockets : -1)); cse_close(s, "recycle"); }}voidclose_srun(srun_t *srun, time_t now){ int tail; cse_lock(srun->lock); for (tail = srun->conn_tail; tail != srun->conn_head; tail = (tail + 1) % CONN_POOL_SIZE) { struct conn_t *conn = &srun->conn_pool[tail]; srun->close(conn->socket, conn->ssl); LOG(("%s:%d:close_srun(): close timeout %d\n", __FILE__, __LINE__, srun->conn_pool[tail]));; } srun->conn_head = srun->conn_tail = 0; cse_unlock(srun->lock);}/** * Try to reuse a socket */static intcse_reuse_socket(stream_t *s, cluster_t *cluster, cluster_srun_t *cluster_srun, time_t now, void *web_pool){ int head; int next_head; srun_t *srun = cluster_srun->srun; LOG(("%s:%d:cse_reuse_socket(): reuse head:%d tail:%d\n", __FILE__, __LINE__, srun->conn_head, srun->conn_tail)); if (! srun || srun->conn_head == srun->conn_tail) return 0; cse_lock(srun->lock); for (head = srun->conn_head; head != srun->conn_tail; head = next_head) { struct conn_t *conn; next_head = (head + CONN_POOL_SIZE - 1) % CONN_POOL_SIZE; conn = &srun->conn_pool[next_head]; if (conn->last_time + srun->live_time < now) { LOG(("%s:%d:cse_reuse_socket(): closing idle socket:%d\n", __FILE__, __LINE__, conn->socket)); srun->close(conn->socket, conn->ssl); } else { int socket; void *ssl; socket = conn->socket; ssl = conn->ssl; srun->conn_head = next_head; cse_reuse(s, cluster, cluster_srun, socket, ssl, now, web_pool); cse_unlock(srun->lock); return 1; } } srun->conn_head = head; cse_unlock(srun->lock); return 0;}voidcse_close_sockets(config_t *config){}voidcse_close_all(){}static intselect_host(cluster_t *cluster, time_t now){ int size; int round_robin; int i; int best_srun; int best_cost = 0x7fffffff; size = cluster->srun_size; if (size < 1) return -1; if (cluster->round_robin_index < 0) { srand(65521 * time(0) + getpid() + (int) cluster); round_robin = rand(); if (round_robin < 0) round_robin = -round_robin; cluster->round_robin_index = round_robin % size; } round_robin = (cluster->round_robin_index + 1) % size; for (i = 0; i < size; i++) { cluster_srun_t *cluster_srun = &cluster->srun_list[round_robin]; if (! cluster_srun->is_backup) break; round_robin = (round_robin + 1) % size; } cluster->round_robin_index = round_robin; best_srun = round_robin; for (i = 0; i < size; i++) { int index = (i + round_robin) % size; cluster_srun_t *cluster_srun = &cluster->srun_list[index]; srun_t *srun = cluster_srun->srun; /* int tail; */ int cost; if (! srun) continue; cost = srun->active_sockets; if (cluster_srun->is_backup) cost += 10000; if (srun->is_dead && now < srun->fail_time + srun->dead_time) continue; else if (cost < best_cost) { best_srun = index; best_cost = cost; } } return best_srun;}/** * Opens any connection within the current group. */static intopen_connection_group(stream_t *s, cluster_t *cluster, cluster_srun_t *owner_item, int offset, time_t now, void *web_pool, int ignore_dead){ cluster_srun_t *cluster_srun = 0; srun_t *srun; if (offset < 0) cluster_srun = owner_item; /* else if (owner_item->group_size < 2) return 0; else { int delta = offset % (owner_item->group_size - 1) + 1; int index = (owner_item->group_index + delta) % owner_item->group_size; cluster_srun = owner_item->group[index]; } */ else cluster_srun = owner_item; srun = cluster_srun->srun; if (! srun) return 0; if (cse_reuse_socket(s, cluster, cluster_srun, now, web_pool)) { return 1; } else if (ignore_dead && srun->is_dead && now < srun->fail_time + srun->dead_time) { } else if (cse_open(s, cluster, cluster_srun, web_pool, 0)) { srun->is_dead = 0; return 1; } else { srun->is_dead = 1; srun->fail_time = now; } return 0;}static intopen_connection_any_host(stream_t *s, cluster_t *cluster, int host, time_t now, void *web_pool, int ignore_dead){ int i; int size = cluster->srun_size; /* * Okay, the primaries failed. So try the secondaries. */ for (i = 0; i < size; i++) { cluster_srun_t *cluster_srun = cluster->srun_list + (host + i) % size; srun_t *srun = cluster_srun->srun; if (! srun) { } else if (cse_reuse_socket(s, cluster, cluster_srun, now, web_pool)) { srun->is_dead = 0; return 1; } else if (ignore_dead && cluster_srun->is_backup) { } else if (ignore_dead && srun->is_dead && now < srun->fail_time + srun->dead_time) { } else if (cse_open(s, cluster, cluster_srun, web_pool, 0)) { srun->is_dead = 0; return 1; } else { srun->is_dead = 1; srun->fail_time = now; } } return 0;}static intopen_session_host(stream_t *s, cluster_t *cluster, int session_index, int backup_index, time_t now, void *web_pool){ int host; int size = cluster->srun_size; cluster_srun_t *owner = 0; cluster_srun_t *backup = 0; if (size > 0) { session_index = session_index % size; backup_index = backup_index % size; } for (host = 0; host < size; host++) { if (cluster->srun_list[host].index == session_index) owner = &cluster->srun_list[host]; else if (cluster->srun_list[host].index == backup_index) backup = &cluster->srun_list[host]; } /* try to open a connection to the session owner */ if (owner && open_connection_group(s, cluster, owner, -1, now, web_pool, 1)) { return 1; } /* or the backup */ else if (backup && open_connection_group(s, cluster, backup, -1, now, web_pool, 1)) { return 1; } else return 0;}static intopen_connection(stream_t *s, cluster_t *cluster, int session_index, int backup_index, time_t now, void *web_pool){ int size; int host; size = cluster->srun_size; if (session_index < 0) host = select_host(cluster, now); else if (open_session_host(s, cluster, session_index, backup_index, now, web_pool)) return 1; else host = select_host(cluster, now); if (host < 0) return 0; /* try opening while ignoring dead servers and backups */ if (open_connection_any_host(s, cluster, host, now, web_pool, 1) > 0) return 1; /* otherwise try the dead servers and backups too */ else return open_connection_any_host(s, cluster, host, now, web_pool, 0) > 0;}intcse_open_connection(stream_t *s, cluster_t *cluster, int session_index, int backup_index, time_t now, void *web_pool){ config_t *config = cluster->config; memset(s, 0, sizeof(stream_t)); s->config = config; s->socket = -1; s->update_count = config->update_count; s->pool = s->config->p; s->web_pool = web_pool; if (config->disable_sticky_sessions) session_index = -1; if (open_connection(s, cluster, session_index, backup_index, now, web_pool)) { cse_set_socket_cleanup(s->socket, web_pool); return 1; } else { return 0; }}intcse_open_any_connection(stream_t *s, cluster_t *cluster, time_t now){ return cse_open_connection(s, cluster, -1, -1, now, cluster->config->web_pool);}intcse_open_live_connection(stream_t *s, cluster_t *cluster, time_t now){ int host; void *web_pool = cluster->config->web_pool; host = select_host(cluster, now); if (host < 0) return 0; /* open, but ignore dead servers and backups */ return open_connection_any_host(s, cluster, host, now, web_pool, 1) > 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -