⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stream.c

📁 RESIN 3.2 最新源码
💻 C
📖 第 1 页 / 共 3 页
字号:
  s->write_length = 0;  return 0;}/** * Flushes the output buffer and fills the read buffer.  The two buffers * are combined so we can try another srun if the request fails. */intcse_fill_buffer(stream_t *s){  int len = 0;  int retry;    if (s->socket < 0)    return -1;  /* flush the buffer */  if (s->write_length > 0) {    LOG(("%s:%d:cse_fill_buffer(): write %d %d\n",	 __FILE__, __LINE__, s->socket, s->write_length));    /* config read/save has no cluster_srun */    if (s->cluster_srun)      len = s->cluster_srun->srun->write(s, s->write_buf, s->write_length);    else      len = write(s->socket, s->write_buf, s->write_length);    if (len != s->write_length) {      cse_close(s, "flush");      return -1;    }  }  s->read_offset = 0;  retry = 3;  do {    /* config read/save has no cluster_srun */    if (s->cluster_srun)      s->read_length = s->cluster_srun->srun->read(s, s->read_buf, BUF_LENGTH);    else      s->read_length = read(s->socket, s->read_buf, BUF_LENGTH);    // repeat for EINTR, EAGAIN  } while (s->read_length < 0	   && errno != EPIPE && errno != ECONNRESET	   && retry-- > 0);    if (s->read_length <= 0) {    cse_close(s, "fill_buffer");        return -1;  }  s->sent_data = 1;  s->write_length = 0;    return s->read_length;}intcse_read_byte(stream_t *s){  if (s->read_offset >= s->read_length) {    if (cse_fill_buffer(s) < 0)      return -1;  }  return s->read_buf[s->read_offset++];}voidcse_write(stream_t *s, const char *buf, int length){  int write_length = s->write_length;    /* XXX: writev??? */  if (BUF_LENGTH < write_length + length) {    if (write_length > 0) {      if (cse_flush(s) < 0) {        s->sent_data = 1;        return;      }      write_length = 0;    }    if (BUF_LENGTH <= length) {      int len;      /* config read/save has no cluster_srun */      if (s->cluster_srun)	len = s->cluster_srun->srun->write(s, buf, length);      else	len = write(s->socket, buf, length);      s->sent_data = 1;			             if (len < 0)	cse_close(s, "write");            return;    }  }  memcpy(s->write_buf + write_length, buf, length);  s->write_length = write_length + length;}voidcse_write_byte(stream_t *s, int ch){  /* XXX: writev??? */  if (BUF_LENGTH < s->write_length + 1) {    if (s->write_length > 0) {      if (cse_flush(s) < 0) {        s->sent_data = 1;        return;      }    }  }  s->write_buf[s->write_length++] = ch;}intcse_read_all(stream_t *s, char *buf, int len){  while (len > 0) {    int sublen;    if (s->read_offset >= s->read_length) {      if (cse_fill_buffer(s) < 0)        return -1;    }    sublen = s->read_length - s->read_offset;    if (len < sublen)      sublen = len;    memcpy(buf, s->read_buf + s->read_offset, sublen);    buf += sublen;    len -= sublen;    s->read_offset += sublen;  }  return 1;}intcse_skip(stream_t *s, int len){  while (len > 0) {    int sublen;    if (s->read_offset >= s->read_length) {      if (cse_fill_buffer(s) < 0)	return -1;    }    sublen = s->read_length - s->read_offset;    if (len < sublen)      sublen = len;    len -= sublen;    s->read_offset += sublen;  }  return 1;}intcse_read_limit(stream_t *s, char *buf, int buflen, int readlen){  int result;    if (readlen <= buflen) {    result = cse_read_all(s, buf, readlen);    buf[readlen] = 0;  }  else {    result = cse_read_all(s, buf, buflen);    buf[buflen - 1] = 0;    cse_skip(s, readlen - buflen);  }  return result > 0 ? readlen : 0;}inthmux_read_len(stream_t *s){  int l1 = cse_read_byte(s) & 0xff;  int l2 = cse_read_byte(s) & 0xff;  return (l1 << 8) + l2;}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){  char temp[4];  temp[0] = code;  temp[1] = (length >> 8) & 0xff;  temp[2] = (length) & 0xff;  cse_write(s, temp, 3);  if (length >= 0)    cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){  if (buf)    cse_write_packet(s, code, buf, strlen(buf));}/** * writes a string to srun */voidhmux_write_string(stream_t *s, char code, const char *buf){  if (buf)    cse_write_packet(s, code, buf, strlen(buf));}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param int data int */voidhmux_write_int(stream_t *s, char code, int i){  char temp[8];  temp[0] = code;  temp[1] = 0;  temp[2] = 4;  temp[3] = (char) (i >> 24);  temp[4] = (char) (i >> 16);  temp[5] = (char) (i >> 8);  temp[6] = (char) (i);  cse_write(s, temp, 7);}voidhmux_start_channel(stream_t *s, unsigned short channel){  cse_write_byte(s, HMUX_CHANNEL);  cse_write_byte(s, channel >> 8);  cse_write_byte(s, channel);}voidhmux_write_close(stream_t *s){  cse_write_byte(s, HMUX_QUIT);}voidhmux_write_exit(stream_t *s){  cse_write_byte(s, HMUX_EXIT);}inthmux_read_string(stream_t *s, char *buf, int length){  int l1, l2;  int read_length;  length--;  l1 = cse_read_byte(s) & 0xff;  l2 = cse_read_byte(s) & 0xff;  read_length = (l1 << 8) + l2;  if (s->socket < 0) {    *buf = 0;    return -1;  }  if (length > read_length)    length = read_length;  if (cse_read_all(s, buf, length) < 0) {    *buf = 0;    return -1;  }  buf[length] = 0;  /* scan extra */  for (read_length -= length; read_length > 0; read_length--)    cse_read_byte(s);  return length;}intcse_read_string(stream_t *s, char *buf, int length){  int code;  int read_length;  length--;  code = cse_read_byte(s);  read_length = hmux_read_string(s, buf, length);  if (read_length < 0)    return -1;  else    return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode(char code){  if ('a' <= code && code <= 'z')    return code - 'a';  else if ('A' <= code && code <= 'Z')    return code - 'A' + 26;  else if ('0' <= code && code <= '9')    return code - '0' + 52;  else if (code == '_')    return 62;  else if (code == '-')    return 63;  else    return -1;}/** * Returns the session id from a cookie. */intcse_session_from_string(char *source, char *cookie, int *backup){  char *match = strstr(source, cookie);  if (match) {    int len = strlen(cookie);    if (match[len] == '=')      len++;    *backup = decode(match[len + 1]);        return decode(match[len]);  }  return -1;}static srun_t *cse_find_config_srun(config_t *config, const char *hostname, int port, int ssl){  int i;  srun_t *srun;    for (i = 0; i < config->srun_capacity; i++) {    srun = config->srun_list[i];        if (! strcmp(srun->hostname, hostname)	&& srun->port == port	&& (srun->ssl != 0) == ssl) {      return srun;    }  }  return 0;}static voidcse_add_config_srun(config_t *config, srun_t *srun){  int size;  srun_t **srun_list;  size = config->srun_capacity;  srun_list = malloc((size + 1) * sizeof(srun_t *));  memcpy(srun_list, config->srun_list, size * sizeof(srun_t *));  srun_list[size] = srun;  config->srun_list = srun_list;  config->srun_capacity = size + 1;}static srun_t *cse_add_srun(cluster_t *cluster, const char *hostname, int port, int ssl){  struct hostent *hostent = 0;  srun_t *srun = 0;  config_t *config = cluster->config;  int i;  LOG(("%s:%d:cse_add_srun(): adding host %s:%d cluster=%p\n",       __FILE__, __LINE__, hostname, port, cluster));  srun = cse_find_config_srun(config, hostname, port, ssl);  if (srun)    return srun;    srun = malloc(sizeof(srun_t));  memset(srun, 0, sizeof(srun_t));  hostent = gethostbyname(hostname);  if (hostent && hostent->h_addr) {    srun->hostname = strdup(hostname);    srun->host = (struct in_addr *) malloc(sizeof (struct in_addr));    memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr));    srun->port = port;    srun->conn_head = 0;    srun->conn_tail = 0;    srun->max_sockets = 32;    srun->connect_timeout = CONNECT_TIMEOUT;    srun->live_time = LIVE_TIME;    srun->dead_time = DEAD_TIME;    srun->read_timeout = WINDOWS_READ_TIMEOUT;    srun->open = std_open;    srun->read = std_read;    srun->write = std_write;    srun->close = std_close;#ifdef OPENSSL    if (ssl) {      SSL_CTX* ctx;      SSL_METHOD *meth;      SSLeay_add_ssl_algorithms();      meth = SSLv3_client_method();      SSL_load_error_strings();      ctx = SSL_CTX_new(meth);      if (ctx) {        srun->ssl = ctx;        srun->open = ssl_open;        srun->read = ssl_read;        srun->write = ssl_write;        srun->close = ssl_close;      }      else {        ERR(("%s:%d:cse_add_srun(): can't initialize ssl",	     __FILE__, __LINE__));      }    }#endif    srun->lock = cse_create_lock(config);    LOG(("%s:%d:cse_add_srun(): srun lock %x\n",	 __FILE__, __LINE__, srun->lock));    cse_add_config_srun(config, srun);        return srun;  }  return 0;}/** * Adds a new host to the configuration */cluster_srun_t *cse_add_cluster_server(mem_pool_t *pool, cluster_t *cluster,		       const char *hostname, int port, const char *id,		       int index, int is_backup, int is_ssl){  config_t *config = cluster->config;  srun_t *srun;  cluster_srun_t *cluster_srun;  if (index < 0)    index = cluster->srun_size;  /* Resize if too many hosts. */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -