⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stream.c

📁 RESIN 3.2 最新源码
💻 C
📖 第 1 页 / 共 3 页
字号:
  while (cluster->srun_capacity <= index) {    int capacity = cluster->srun_capacity;    cluster_srun_t *srun_list;    if (capacity == 0)      capacity = 16;    srun_list =      (cluster_srun_t *) cse_alloc(pool,				   2 * capacity * sizeof(cluster_srun_t));        memset(srun_list, 0, 2 * capacity * sizeof(cluster_srun_t));        if (cluster->srun_list) {      memcpy(srun_list, cluster->srun_list,	     capacity * sizeof(cluster_srun_t));    }        cluster->srun_capacity = 2 * capacity;    cluster->srun_list = srun_list;  }  srun = cse_add_srun(cluster, hostname, port, is_ssl);  if (srun) {    cluster_srun = &cluster->srun_list[index];    cluster_srun->srun = srun;    cluster_srun->is_backup = is_backup;    cluster_srun->id = cse_strdup(pool, id);    cluster_srun->index = index;    cluster_srun->is_valid = 1;    if (cluster->srun_size <= index)      cluster->srun_size = index + 1;    cluster->round_robin_index = -1;        return cluster_srun;  }  else {    cse_error(config, "Resin can't find host %s\n", hostname);    return 0;  }}/** * reuse the socket */static voidcse_reuse(stream_t *s, cluster_t *cluster, cluster_srun_t *srun,          int socket, void *ssl,	  time_t request_time, void *web_pool){  config_t *config = cluster->config;    s->socket = socket;  s->ssl = ssl;                       s->pool = config->p;  s->web_pool = web_pool;  s->config = config;  s->update_count = config->update_count;  s->write_length = 0;  s->read_length = 0;  s->read_offset = 0;  s->cluster_srun = srun;  s->sent_data = 0;    srun->srun->is_dead = 0;    LOG(("%s:%d:cse_reuse(): reopen %d\n", __FILE__, __LINE__, s->socket));}/** * Closes the idle sockets. * * Must be called from within a lock. */static voidcse_close_idle(srun_t *srun, time_t now){  int tail;  int next_tail;    if (! srun)    return;  for (tail = srun->conn_tail;       tail != srun->conn_head;       tail = next_tail) {    struct conn_t *conn;        next_tail = (tail + 1) % CONN_POOL_SIZE;    conn = &srun->conn_pool[tail];    /* from here on, it's live connections. */    if (now < conn->last_time + srun->live_time)      return;        srun->conn_tail = next_tail;    LOG(("%s:%d:cse_close_idle(): closing idle socket:%d\n",	 __FILE__, __LINE__, conn->socket));    srun->close(conn->socket, conn->ssl);  }}/** * Try to recycle the socket so the next request can reuse it. */voidcse_recycle(stream_t *s, time_t now){  int socket = s->socket;  cluster_srun_t *cluster_srun = s->cluster_srun;  srun_t *srun = cluster_srun ? cluster_srun->srun : 0;  if (! srun) {    cse_close(s, "recycle");    return;  }    cse_lock(srun->lock);  cse_close_idle(srun, now);    if (socket >= 0 && s->config->update_count == s->update_count) {    int head = srun->conn_head;    int next_head = (head + 1) % CONN_POOL_SIZE;    /* If there's room in the ring, add it. */    if (next_head != srun->conn_tail) {      s->socket = -1;      cse_kill_socket_cleanup(socket, s->web_pool);      srun->conn_pool[head].socket = socket;      srun->conn_pool[head].ssl = s->ssl;      srun->conn_pool[head].last_time = now;      srun->conn_head = next_head;      cse_unlock(srun->lock);      LOG(("%s:%d:cse_recycle(): recycle %d\n", __FILE__, __LINE__, socket));      return;    }  }    cse_unlock(srun->lock);    if (socket >= 0) {    LOG(("%s:%d:cse_recycle(): close2 %d update1:%d update2:%d max-sock:%d\n",	 __FILE__, __LINE__,         socket, s->config->update_count, s->update_count,         srun ? srun->max_sockets : -1));        cse_close(s, "recycle");  }}voidclose_srun(srun_t *srun, time_t now){  int tail;  cse_lock(srun->lock);  for (tail = srun->conn_tail;       tail != srun->conn_head;       tail = (tail + 1) % CONN_POOL_SIZE) {    struct conn_t *conn = &srun->conn_pool[tail];    srun->close(conn->socket, conn->ssl);    LOG(("%s:%d:close_srun(): close timeout %d\n",	 __FILE__, __LINE__, srun->conn_pool[tail]));;  }  srun->conn_head = srun->conn_tail = 0;    cse_unlock(srun->lock);}/** * Try to reuse a socket */static intcse_reuse_socket(stream_t *s, cluster_t *cluster, cluster_srun_t *cluster_srun,		 time_t now, void *web_pool){  int head;  int next_head;  srun_t *srun = cluster_srun->srun;  LOG(("%s:%d:cse_reuse_socket(): reuse head:%d tail:%d\n",       __FILE__, __LINE__, srun->conn_head, srun->conn_tail));  if (! srun || srun->conn_head == srun->conn_tail)    return 0;    cse_lock(srun->lock);  for (head = srun->conn_head;       head != srun->conn_tail;       head = next_head) {    struct conn_t *conn;        next_head = (head + CONN_POOL_SIZE - 1) % CONN_POOL_SIZE;    conn = &srun->conn_pool[next_head];        if (conn->last_time + srun->live_time < now) {      LOG(("%s:%d:cse_reuse_socket(): closing idle socket:%d\n",	   __FILE__, __LINE__, conn->socket));      srun->close(conn->socket, conn->ssl);    }    else {      int socket;      void *ssl;      socket = conn->socket;      ssl = conn->ssl;      srun->conn_head = next_head;      cse_reuse(s, cluster, cluster_srun, socket, ssl, now, web_pool);      cse_unlock(srun->lock);            return 1;    }  }  srun->conn_head = head;  cse_unlock(srun->lock);  return 0;}voidcse_close_sockets(config_t *config){}voidcse_close_all(){}static intselect_host(cluster_t *cluster, time_t now){  int size;  int round_robin;  int i;  int best_srun;  int best_cost = 0x7fffffff;    size = cluster->srun_size;    if (size < 1)    return -1;  if (cluster->round_robin_index < 0) {    srand(65521 * time(0) + getpid() + (int) cluster);    round_robin = rand();    if (round_robin < 0)      round_robin = -round_robin;        cluster->round_robin_index = round_robin % size;  }  round_robin = (cluster->round_robin_index + 1) % size;  for (i = 0; i < size; i++) {    cluster_srun_t *cluster_srun = &cluster->srun_list[round_robin];    if (! cluster_srun->is_backup)      break;    round_robin = (round_robin + 1) % size;  }    cluster->round_robin_index = round_robin;  best_srun = round_robin;  for (i = 0; i < size; i++) {    int index = (i + round_robin) % size;    cluster_srun_t *cluster_srun = &cluster->srun_list[index];    srun_t *srun = cluster_srun->srun;    /* int tail; */    int cost;    if (! srun)      continue;    cost = srun->active_sockets;        if (cluster_srun->is_backup)      cost += 10000;        if (srun->is_dead && now < srun->fail_time + srun->dead_time)      continue;    else if (cost < best_cost) {      best_srun = index;      best_cost = cost;    }  }  return best_srun;}/** * Opens any connection within the current group. */static intopen_connection_group(stream_t *s, cluster_t *cluster,                      cluster_srun_t *owner_item, int offset,                      time_t now, void *web_pool,                      int ignore_dead){  cluster_srun_t *cluster_srun = 0;  srun_t *srun;  if (offset < 0)    cluster_srun = owner_item;  /*  else if (owner_item->group_size < 2)    return 0;  else {    int delta = offset % (owner_item->group_size - 1) + 1;    int index = (owner_item->group_index + delta) % owner_item->group_size;    cluster_srun = owner_item->group[index];  }  */  else    cluster_srun = owner_item;    srun = cluster_srun->srun;  if (! srun)    return 0;  if (cse_reuse_socket(s, cluster, cluster_srun, now, web_pool)) {    return 1;  }  else if (ignore_dead &&           srun->is_dead && now < srun->fail_time + srun->dead_time) {  }  else if (cse_open(s, cluster, cluster_srun, web_pool, 0)) {    srun->is_dead = 0;    return 1;  }  else {    srun->is_dead = 1;    srun->fail_time = now;  }  return 0;}static intopen_connection_any_host(stream_t *s, cluster_t *cluster, int host,                         time_t now, void *web_pool, int ignore_dead){  int i;  int size = cluster->srun_size;  /*   * Okay, the primaries failed.  So try the secondaries.   */  for (i = 0; i < size; i++) {    cluster_srun_t *cluster_srun = cluster->srun_list + (host + i) % size;    srun_t *srun = cluster_srun->srun;    if (! srun) {    }    else if (cse_reuse_socket(s, cluster, cluster_srun, now, web_pool)) {      srun->is_dead = 0;      return 1;    }    else if (ignore_dead && cluster_srun->is_backup) {    }    else if (ignore_dead &&             srun->is_dead && now < srun->fail_time + srun->dead_time) {    }    else if (cse_open(s, cluster, cluster_srun, web_pool, 0)) {      srun->is_dead = 0;      return 1;    }    else {      srun->is_dead = 1;      srun->fail_time = now;    }  }  return 0;}static intopen_session_host(stream_t *s, cluster_t *cluster,                  int session_index, int backup_index,                  time_t now, void *web_pool){  int host;  int size = cluster->srun_size;  cluster_srun_t *owner = 0;  cluster_srun_t *backup = 0;  if (size > 0) {    session_index = session_index % size;    backup_index = backup_index % size;  }  for (host = 0; host < size; host++) {    if (cluster->srun_list[host].index == session_index)      owner = &cluster->srun_list[host];    else if (cluster->srun_list[host].index == backup_index)      backup = &cluster->srun_list[host];  }  /* try to open a connection to the session owner */  if (owner      && open_connection_group(s, cluster, owner, -1, now, web_pool, 1)) {        return 1;  }  /* or the backup */  else if (backup	   && open_connection_group(s, cluster, backup, -1,				    now, web_pool, 1)) {    return 1;  }  else    return 0;}static intopen_connection(stream_t *s, cluster_t *cluster,                int session_index, int backup_index,                time_t now, void *web_pool){  int size;  int host;  size = cluster->srun_size;  if (session_index < 0)    host = select_host(cluster, now);  else if (open_session_host(s, cluster,                             session_index, backup_index,                             now, web_pool))    return 1;  else    host = select_host(cluster, now);  if (host < 0)    return 0;  /* try opening while ignoring dead servers and backups */  if (open_connection_any_host(s, cluster, host, now, web_pool, 1) > 0)    return 1;  /* otherwise try the dead servers and backups too */  else    return open_connection_any_host(s, cluster, host, now, web_pool, 0) > 0;}intcse_open_connection(stream_t *s, cluster_t *cluster,                    int session_index, int backup_index,                    time_t now, void *web_pool){  config_t *config = cluster->config;  memset(s, 0, sizeof(stream_t));    s->config = config;  s->socket = -1;  s->update_count = config->update_count;  s->pool = s->config->p;  s->web_pool = web_pool;  if (config->disable_sticky_sessions)    session_index = -1;    if (open_connection(s, cluster, session_index, backup_index, now, web_pool)) {    cse_set_socket_cleanup(s->socket, web_pool);    return 1;  }  else {    return 0;  }}intcse_open_any_connection(stream_t *s, cluster_t *cluster, time_t now){  return cse_open_connection(s, cluster, -1, -1, now,			     cluster->config->web_pool);}intcse_open_live_connection(stream_t *s, cluster_t *cluster, time_t now){  int host;  void *web_pool = cluster->config->web_pool;  host = select_host(cluster, now);  if (host < 0)    return 0;  /* open, but ignore dead servers and backups */  return open_connection_any_host(s, cluster, host, now, web_pool, 1) > 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -