stream.c

来自「《jsp编程起步》里面的所有源代码」· C语言 代码 · 共 562 行

C
562
字号
/* * Copyright (c) 1999 Caucho Technology.  All rights reserved. * * Caucho Technology permits redistribution, modification and use * of this file in source and binary form ("the Software") under the * Caucho Developer Source License ("the License").  In particular, the following * conditions must be met: * * 1. Each copy or derived work of the Software must preserve the copyright *    notice and this notice unmodified. * * 2. Redistributions of the Software in source or binary form must include  *    an unmodified copy of the License, normally in a plain ASCII text * * 3. The names "Resin" or "Caucho" are trademarks of Caucho Technology and *    may not be used to endorse products derived from this software. *    "Resin" or "Caucho" may not appear in the names of products derived *    from this software. * * 4. Caucho Technology requests that attribution be given to Resin *    in any manner possible.  We suggest using the "Resin Powered" *    button or creating a "powered by Resin(tm)" link to *    http://www.caucho.com for each page served by Resin. * * This Software is provided "AS IS," without a warranty of any kind.  * ALL EXPRESS OR IMPLIED REPRESENTATIONS AND WARRANTIES, INCLUDING ANY * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE * OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. * CAUCHO TECHNOLOGY AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES * SUFFERED BY LICENSEE OR ANY THIRD PARTY AS A RESULT OF USING OR * DISTRIBUTING SOFTWARE. IN NO EVENT WILL CAUCHO OR ITS LICENSORS BE LIABLE * FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, * CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND * REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF OR * INABILITY TO USE SOFTWARE, EVEN IF HE HAS BEEN ADVISED OF THE POSSIBILITY * OF SUCH DAMAGES. * * @author Scott Ferguson * * $Id: stream.c,v 1.6 2000/10/13 01:51:35 ferg Exp $ */#include <stdlib.h>#include <errno.h>#include <stdio.h>#include <string.h>#ifdef WIN32#include <winsock2.h>#else#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <unistd.h>#endif#include "cse.h"#define DEFAULT_PORT 6802#define DEAD_TIME 5#define LIVE_TIME 5static int g_count;voidcse_close(stream_t *s, char *msg){  int socket = s->socket;  LOG(("close %d %s\n", s->socket, msg));  s->socket = -1;  cse_kill_socket_cleanup(socket, s->pool);  closesocket(socket);}intcse_open(stream_t *s, config_t *config, srun_t *srun, void *p){  struct sockaddr_in sin;  memset(s, 0, sizeof(*s));  s->config = config;  s->pool = p;  s->write_length = 0;  s->read_length = 0;  s->read_offset = 0;  s->srun = srun;  s->socket = socket(AF_INET, SOCK_STREAM, 0);  if (s->socket < 0) {    LOG(("mod_caucho can't create socket.\n"));    return 0; // bad socket  }  LOG(("open %d\n", s->socket));  sin.sin_family = AF_INET;  if (srun->host)    sin.sin_addr = *srun->host;  else    sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK);  if (srun->port <= 0)    srun->port = DEFAULT_PORT;  sin.sin_port = htons((short) srun->port);  if (connect(s->socket, (const struct sockaddr *) &sin,	      sizeof(struct sockaddr_in)) < 0) {    LOG(("connect %x %d %d\n", sin.sin_addr.s_addr,	 ntohs(sin.sin_port), errno));    // XXX: logging doesn't make sense with load balancing(?)    //if (server)    //  ap_log_printf(server, "mod_caucho can't connect.\n");    cse_close(s, "connect");    return 0;  }  return 1;}intcse_flush(stream_t *s){  int len;  if (s->write_length > 0) {    len = send(s->socket, s->write_buf, s->write_length, 0);    if (len != s->write_length)      return -1;    s->write_length = 0;  }  return 0;}intcse_fill_buffer(stream_t *s){  int i = 0;    if (s->socket < 0 || cse_flush(s) < 0)    return -1;  s->read_offset = 0;  do {    s->read_length = recv(s->socket, s->read_buf, BUF_LENGTH, 0);  } while (s->read_length < 0 && errno == EINTR && i++ < 10);  if (s->read_length <= 0) {    cse_close(s, "fill_buffer");        return -1;  }    return 0;}intcse_read_byte(stream_t *s){  if (s->read_offset >= s->read_length) {    if (cse_fill_buffer(s) < 0)      return -1;  }  return s->read_buf[s->read_offset++];}voidcse_write(stream_t *s, const char *buf, int length){  /* XXX: writev??? */  if (s->write_length + length > BUF_LENGTH) {    if (s->write_length > 0) {      int len;      len = send(s->socket, s->write_buf, s->write_length, 0);			             s->write_length = 0;            if (len < 0) {	cse_close(s, "write");	return;      }    }    if (length > BUF_LENGTH) {      int len;      len = send(s->socket, s->write_buf, s->write_length, 0);			             if (len < 0)	cse_close(s, "write");      return;    }  }  memcpy(s->write_buf + s->write_length, buf, length);  s->write_length += length;}intcse_read_all(stream_t *s, char *buf, int len){  while (len > 0) {    int sublen;    if (s->read_offset >= s->read_length) {      if (cse_fill_buffer(s) < 0)	return -1;    }    sublen = s->read_length - s->read_offset;    if (len < sublen)      sublen = len;    memcpy(buf, s->read_buf + s->read_offset, sublen);    buf += sublen;    len -= sublen;    s->read_offset += sublen;  }  return 1;}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){  char temp[4];  temp[0] = code;  temp[1] = (length >> 16) & 0xff;  temp[2] = (length >> 8) & 0xff;  temp[3] = (length) & 0xff;  cse_write(s, temp, 4);  if (length > 0)    cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){  if (buf)    cse_write_packet(s, code, buf, strlen(buf));}intcse_read_string(stream_t *s, char *buf, int length){  int code;  int l1, l2, l3;  int read_length;  length--;  code = cse_read_byte(s);  l1 = cse_read_byte(s) & 0xff;  l2 = cse_read_byte(s) & 0xff;  l3 = cse_read_byte(s) & 0xff;  read_length = (l1 << 16) + (l2 << 8) + (l3);  if (s->socket < 0)    return -1;  if (length > read_length)    length = read_length;  if (cse_read_all(s, buf, length) < 0)    return -1;  buf[length] = 0;  // scan extra  for (read_length -= length; read_length > 0; read_length--)    cse_read_byte(s);  return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode_session(char *session){  int value = 0;  int i;  for (i = 2; i >= 0; i--) {    int code = session[i];    if (code >= 'a' && code <= 'z')      value = 64 * value + code - 'a';    else if (code >= 'A' && code <= 'Z')      value = 64 * value + code - 'A' + 26;    else if (code >= '0' && code <= '9')      value = 64 * value + code - 'A' + 52;    else if (code == '_')      value = 64 * value + 62;    else if (code == '/')      value = 64 * value + 63;    else      return -1;  }  if (i > -1)    return -1;  else    return value;}intcse_session_from_string(char *source, char *cookie){  char *match = strstr(source, cookie);    if (match)    return decode_session(match + strlen(cookie));  return -1;}/** * Adds a new host to the configuration */voidcse_add_host_int(config_t *config, const char *hostname,		 int port, int is_backup){  struct hostent *hostent;  srun_t *srun;  if (config->srun_size == 1 && ! config->srun_list->host)    config->srun_size = 0;  // Resize if too many hosts.  if (config->srun_size >= config->srun_capacity) {    int capacity = config->srun_capacity;    srun_t *srun_list;    if (capacity == 0)      capacity = 4;    srun_list = (srun_t *) cse_alloc(config, 2 * capacity * sizeof(srun_t));    memset(srun_list, 0, 2 * capacity * sizeof(srun_t));    if (config->srun_list)      memcpy(srun_list, config->srun_list, capacity * sizeof(srun_t));    config->srun_capacity = 2 * capacity;    cse_free(config, config->srun_list);    config->srun_list = srun_list;  }  hostent = gethostbyname(hostname);  if (hostent && hostent->h_addr) {    srun = &config->srun_list[config->srun_size];    srun->hostname = cse_strdup(config, hostname);    srun->host = (struct in_addr *) cse_alloc(config, sizeof (struct in_addr));    memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr));    srun->port = port;    srun->is_backup = is_backup;    srun->n_sockets = 0;    srun->max_sockets = 32;    /*     * XXX: for dynamic srun     * srun->session = cse_query_session(config, srun, config->pool);     */    srun->session = config->srun_size;    config->srun_size++;  }  else {    cse_error(config, "CSE cannot find host %s\n", hostname);  }}/** * Select a client JVM randomly. */static intrandom_host(char *ip, int requestTime){  int host = requestTime * 23 + g_count++;  for (; *ip; ip++) {    host = 23 * host + *ip;  }  if (host < 0)    return -host;  else    return host;}static voidcse_reuse(stream_t *s, config_t *config, srun_t *srun,	  int request_time, void *pool){  memset(s, 0, sizeof(*s));  s->pool = pool;
  s->config = config;  s->write_length = 0;  s->read_length = 0;  s->read_offset = 0;  s->socket = srun->sockets[--srun->n_sockets];  cse_set_socket_cleanup(s->socket, s->pool);  s->srun = srun;  srun->is_dead = 0;  srun->last_time = request_time;    LOG(("reopen %d\n", s->socket));}voidcse_recycle(stream_t *s){  int socket = s->socket;  srun_t *srun = s->srun;  cse_lock(s->config->lock);  if (socket >= 0 && srun && srun->n_sockets < srun->max_sockets) {    s->socket = -1;    cse_kill_socket_cleanup(socket, s->pool);    srun->sockets[srun->n_sockets++] = socket;    cse_unlock(s->config->lock);    LOG(("recycle %d\n", socket));  }  else {	  cse_unlock(s->config->lock);    LOG(("close2 %d\n", socket));    cse_write_packet(s, CSE_CLOSE, 0, 0);    cse_close(s, "recycle");  }}static intcse_reuse_socket(stream_t *s, config_t *config, srun_t *srun, int request_time, void *pool){  cse_lock(config->lock);     if (srun->n_sockets > 0 && srun->last_time + LIVE_TIME >= request_time) {      cse_reuse(s, config, srun, request_time, pool);      cse_unlock(config->lock);      return 1;   }   else if (srun->n_sockets > 0) {     while (srun->n_sockets > 0) {	int socket = srun->sockets[--srun->n_sockets];	send(socket, "X\0\0\0", 4, 0);	closesocket(socket);	LOG(("close reuse-timeout %d\n", socket));      }   }   cse_unlock(config->lock);   return 0;}intcse_open_connection(stream_t *s, config_t *config,		    int session_index, char *ip, int request_time, void *pool){  int size;  int i;  int host;  int incr;  size = config->srun_size;  if (size < 1)    size = 1;  if (session_index < 0) {    host = random_host(ip, request_time) % size;  }  else {    for (host = 0; host < size; host++) {      if (config->srun_list[host].session == session_index)	break;    }    if (host == size) {      host = random_host(ip, request_time) % size;    }  }    if (host < 0)    host = -host;    incr = 2 * (host & 1) - 1;  // XXX: if the session belongs to a host, we should retry for a few seconds?  for (i = 0; i < size; i++) {    srun_t *srun = config->srun_list + (host + i * (incr + size)) % size;    if (session_index < 0 && srun->is_backup) {    }    else if (cse_reuse_socket(s, config, srun, request_time, pool)) {      return 1;    }    else if (srun->is_dead && size > 0 &&	     srun->last_time + DEAD_TIME >= request_time) {    }    else if (cse_open(s, config, srun, pool)) {      s->srun = srun;      srun->is_dead = 0;      srun->last_time = request_time;      return 1;    }    else {      srun->is_dead = 1;      srun->last_time = request_time;    }  }  // Okay, the primaries failed.  So try the secondaries.  for (i = 0; i < size; i++) {    srun_t *srun = config->srun_list + (host + i * (incr + size)) % size;    if (cse_reuse_socket(s, config, srun, request_time, pool)) {      return 1;    }    else if (cse_open(s, config, srun, pool)) {      s->srun = srun;      srun->is_dead = 0;      srun->last_time = request_time;      return 1;    }    else {      srun->is_dead = 1;      srun->last_time = request_time;    }  }  return 0;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?