.#stream.c.1.21

来自「《jsp编程起步》里面的所有源代码」· 21 代码 · 共 809 行 · 第 1/2 页

21
809
字号
 * write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){  char temp[4];  temp[0] = code;  temp[1] = (length >> 16) & 0xff;  temp[2] = (length >> 8) & 0xff;  temp[3] = (length) & 0xff;  cse_write(s, temp, 4);  if (length >= 0)    cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){  if (buf)    cse_write_packet(s, code, buf, strlen(buf));}intcse_read_string(stream_t *s, char *buf, int length){  int code;  int l1, l2, l3;  int read_length;  length--;  code = cse_read_byte(s);  l1 = cse_read_byte(s) & 0xff;  l2 = cse_read_byte(s) & 0xff;  l3 = cse_read_byte(s) & 0xff;  read_length = (l1 << 16) + (l2 << 8) + (l3);  if (s->socket < 0)    return -1;  if (length > read_length)    length = read_length;  if (cse_read_all(s, buf, length) < 0)    return -1;  buf[length] = 0;  /* scan extra */  for (read_length -= length; read_length > 0; read_length--)    cse_read_byte(s);  return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode_session(char *session){  int value = 0;  int i;  for (i = 2; i >= 0; i--) {    int code = session[i];    if (code >= 'a' && code <= 'z')      value = 64 * value + code - 'a';    else if (code >= 'A' && code <= 'Z')      value = 64 * value + code - 'A' + 26;    else if (code >= '0' && code <= '9')      value = 64 * value + code - 'A' + 52;    else if (code == '_')      value = 64 * value + 62;    else if (code == '/')      value = 64 * value + 63;    else      return -1;  }  if (i > -1)    return -1;  else    return value;}intcse_session_from_string(char *source, char *cookie){  char *match = strstr(source, cookie);    if (match)    return decode_session(match + strlen(cookie));  return -1;}/** * Adds a new host to the configuration */srun_t *cse_add_host_int(config_t *config, const char *hostname,		 int port, int is_backup, int is_default){  struct hostent *hostent;  srun_t *srun;  if (config->srun_size == 1 &&      (config->srun_list->is_default || ! config->srun_list->host))    config->srun_size = 0;  /* Resize if too many hosts. */  if (config->srun_size >= config->srun_capacity) {    int capacity = config->srun_capacity;    srun_t *srun_list;    if (capacity == 0)      capacity = 4;    srun_list = (srun_t *) cse_alloc(config->p, 2 * capacity * sizeof(srun_t));    memset(srun_list, 0, 2 * capacity * sizeof(srun_t));    if (config->srun_list)      memcpy(srun_list, config->srun_list, capacity * sizeof(srun_t));    config->srun_capacity = 2 * capacity;    config->srun_list = srun_list;  }  hostent = gethostbyname(hostname);  if (hostent && hostent->h_addr) {    srun = &config->srun_list[config->srun_size];    srun->hostname = cse_strdup(config->p, hostname);    srun->host = (struct in_addr *) cse_alloc(config->p,                                              sizeof (struct in_addr));    memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr));    srun->port = port;    srun->is_backup = is_backup;    srun->n_sockets = 0;    srun->max_sockets = 32;    srun->is_default = is_default && config->srun_size == 0;    srun->connect_timeout = CONNECT_TIMEOUT;    srun->live_time = LIVE_TIME;    srun->dead_time = DEAD_TIME;    /*     * XXX: for dynamic srun     * srun->session = cse_query_session(config, srun, config->pool);     */    srun->session = config->srun_size;    config->srun_size++;    return srun;  }  else {    cse_error(config, "CSE can't find host %s\n", hostname);    return 0;  }}/** * Select a client JVM randomly. */static intrandom_host(char *ip, int requestTime, int rand){  int host = requestTime * 23 + g_count++;  host = 23 * host + rand;  for (; *ip; ip++) {    host = 23 * host + *ip;  }  if (host < 0)    return -host;  else    return host;}/** * reuse the socket */static voidcse_reuse(stream_t *s, config_t *config, srun_t *srun,	  int request_time, void *web_pool){  memset(s, 0, sizeof(*s));  s->pool = config->p;  s->web_pool = web_pool;  s->config = config;  s->update_count = config->update_count;  s->write_length = 0;  s->read_length = 0;  s->read_offset = 0;  s->socket = srun->sockets[--srun->n_sockets];  s->srun = srun;    srun->is_dead = 0;  srun->last_time = request_time;    LOG(("reopen %d\n", s->socket));}/** * Try to recycle the socket so the next request can reuse it. */voidcse_recycle(stream_t *s){  int socket = s->socket;  srun_t *srun = s->srun;  cse_lock(s->config->lock);    if (socket >= 0 && s->config->update_count == s->update_count &&      srun && srun->n_sockets < srun->max_sockets) {    s->socket = -1;    cse_kill_socket_cleanup(socket, s->web_pool);    srun->sockets[srun->n_sockets++] = socket;    cse_unlock(s->config->lock);    LOG(("recycle %d\n", socket));  }  else {    cse_unlock(s->config->lock);        LOG(("close2 %d update1:%d update2:%d n-sock:%d max-sock:%d\n",         socket,         s->config->update_count, s->update_count,         srun ? srun->n_sockets : -1,         srun ? srun->max_sockets : -1));        cse_write_packet(s, CSE_CLOSE, 0, 0);    cse_close(s, "recycle");  }}/** * Try to reuse a socket */static intcse_reuse_socket(stream_t *s, config_t *config, srun_t *srun,		 unsigned int request_time, void *web_pool){  cse_lock(config->lock);  if (request_time < srun->last_time)    srun->last_time = request_time;  if (srun->n_sockets > 0 &&      srun->last_time + srun->live_time >= request_time) {    cse_reuse(s, config, srun, request_time, web_pool);    cse_unlock(config->lock);    return 1;  }  else if (srun->n_sockets > 0) {    while (srun->n_sockets > 0) {      int socket = srun->sockets[--srun->n_sockets];      send(socket, "X\0\0\0", 4, 0);      closesocket(socket);      LOG(("close reuse-timeout %d\n", socket));    }  }  cse_unlock(config->lock);  return 0;}voidcse_close_sockets(config_t *config){  int i;    for (i = 0; i < config->srun_size; i++) {    srun_t *srun = config->srun_list + i;    int j;    for (j = 0; j < srun->n_sockets; j++) {      int socket = srun->sockets[j];      if (socket >= 0) {        send(socket, "X\0\0\0", 4, 0);        closesocket(socket);      }    }    srun->n_sockets = 0;  }  config->srun_size = 0;}static intopen_connection(stream_t *s, config_t *config,                int session_index, char *ip,                unsigned int request_time, int rand,                void *web_pool){  int size;  int i;  int host;  int incr;  size = config->srun_size;  if (size < 1)    size = 1;  if (session_index < 0) {    host = random_host(ip, request_time, rand) % size;  }  else {    for (host = 0; host < size; host++) {      if (config->srun_list[host].session == session_index)	break;    }    if (host == size) {      host = random_host(ip, request_time, rand) % size;    }  }    if (host < 0)    host = -host;    incr = 2 * (host & 1) - 1;  /*  // XXX: if the session belongs to a host, we should retry for a few seconds?  */  for (i = 0; i < size; i++) {    srun_t *srun = config->srun_list + (host + i * (incr + size)) % size;    if (session_index < 0 && srun->is_backup) {    }    else if (cse_reuse_socket(s, config, srun, request_time, web_pool)) {      return 1;    }    else if (srun->is_dead &&             srun->last_time + srun->dead_time >= request_time) {    }    else if (cse_open(s, config, srun, web_pool, 0)) {      s->srun = srun;      srun->is_dead = 0;      srun->last_time = request_time;      return 1;    }  }  /*  // Okay, the primaries failed.  So try the secondaries.  */  for (i = 0; i < size; i++) {    srun_t *srun = config->srun_list + (host + i * (incr + size)) % size;    if (cse_reuse_socket(s, config, srun, request_time, web_pool)) {      return 1;    }    else if (cse_open(s, config, srun, web_pool, 1)) {      s->srun = srun;      srun->is_dead = 0;      srun->last_time = request_time;      return 1;    }    else {      srun->is_dead = 1;      srun->last_time = request_time;    }  }    return 0;}intcse_open_connection(stream_t *s, config_t *config,                    int session_index, char *ip,                    unsigned int request_time, int rand,                    void *web_pool){  if (open_connection(s, config, session_index,                      ip, request_time, rand, web_pool)) {    cse_set_socket_cleanup(s->socket, web_pool);    return 1;  }  else {    return 0;  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?