stream.c
来自「《jsp编程起步》里面的所有源代码」· C语言 代码 · 共 562 行
C
562 行
/* * Copyright (c) 1999 Caucho Technology. All rights reserved. * * Caucho Technology permits redistribution, modification and use * of this file in source and binary form ("the Software") under the * Caucho Developer Source License ("the License"). In particular, the following * conditions must be met: * * 1. Each copy or derived work of the Software must preserve the copyright * notice and this notice unmodified. * * 2. Redistributions of the Software in source or binary form must include * an unmodified copy of the License, normally in a plain ASCII text * * 3. The names "Resin" or "Caucho" are trademarks of Caucho Technology and * may not be used to endorse products derived from this software. * "Resin" or "Caucho" may not appear in the names of products derived * from this software. * * 4. Caucho Technology requests that attribution be given to Resin * in any manner possible. We suggest using the "Resin Powered" * button or creating a "powered by Resin(tm)" link to * http://www.caucho.com for each page served by Resin. * * This Software is provided "AS IS," without a warranty of any kind. * ALL EXPRESS OR IMPLIED REPRESENTATIONS AND WARRANTIES, INCLUDING ANY * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE * OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. * CAUCHO TECHNOLOGY AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES * SUFFERED BY LICENSEE OR ANY THIRD PARTY AS A RESULT OF USING OR * DISTRIBUTING SOFTWARE. IN NO EVENT WILL CAUCHO OR ITS LICENSORS BE LIABLE * FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, * CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND * REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF OR * INABILITY TO USE SOFTWARE, EVEN IF HE HAS BEEN ADVISED OF THE POSSIBILITY * OF SUCH DAMAGES. * * @author Scott Ferguson * * $Id: stream.c,v 1.6 2000/10/13 01:51:35 ferg Exp $ */#include <stdlib.h>#include <errno.h>#include <stdio.h>#include <string.h>#ifdef WIN32#include <winsock2.h>#else#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <unistd.h>#endif#include "cse.h"#define DEFAULT_PORT 6802#define DEAD_TIME 5#define LIVE_TIME 5static int g_count;voidcse_close(stream_t *s, char *msg){ int socket = s->socket; LOG(("close %d %s\n", s->socket, msg)); s->socket = -1; cse_kill_socket_cleanup(socket, s->pool); closesocket(socket);}intcse_open(stream_t *s, config_t *config, srun_t *srun, void *p){ struct sockaddr_in sin; memset(s, 0, sizeof(*s)); s->config = config; s->pool = p; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->srun = srun; s->socket = socket(AF_INET, SOCK_STREAM, 0); if (s->socket < 0) { LOG(("mod_caucho can't create socket.\n")); return 0; // bad socket } LOG(("open %d\n", s->socket)); sin.sin_family = AF_INET; if (srun->host) sin.sin_addr = *srun->host; else sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); if (srun->port <= 0) srun->port = DEFAULT_PORT; sin.sin_port = htons((short) srun->port); if (connect(s->socket, (const struct sockaddr *) &sin, sizeof(struct sockaddr_in)) < 0) { LOG(("connect %x %d %d\n", sin.sin_addr.s_addr, ntohs(sin.sin_port), errno)); // XXX: logging doesn't make sense with load balancing(?) //if (server) // ap_log_printf(server, "mod_caucho can't connect.\n"); cse_close(s, "connect"); return 0; } return 1;}intcse_flush(stream_t *s){ int len; if (s->write_length > 0) { len = send(s->socket, s->write_buf, s->write_length, 0); if (len != s->write_length) return -1; s->write_length = 0; } return 0;}intcse_fill_buffer(stream_t *s){ int i = 0; if (s->socket < 0 || cse_flush(s) < 0) return -1; s->read_offset = 0; do { s->read_length = recv(s->socket, s->read_buf, BUF_LENGTH, 0); } while (s->read_length < 0 && errno == EINTR && i++ < 10); if (s->read_length <= 0) { cse_close(s, "fill_buffer"); return -1; } return 0;}intcse_read_byte(stream_t *s){ if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } return s->read_buf[s->read_offset++];}voidcse_write(stream_t *s, const char *buf, int length){ /* XXX: writev??? */ if (s->write_length + length > BUF_LENGTH) { if (s->write_length > 0) { int len; len = send(s->socket, s->write_buf, s->write_length, 0); s->write_length = 0; if (len < 0) { cse_close(s, "write"); return; } } if (length > BUF_LENGTH) { int len; len = send(s->socket, s->write_buf, s->write_length, 0); if (len < 0) cse_close(s, "write"); return; } } memcpy(s->write_buf + s->write_length, buf, length); s->write_length += length;}intcse_read_all(stream_t *s, char *buf, int len){ while (len > 0) { int sublen; if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } sublen = s->read_length - s->read_offset; if (len < sublen) sublen = len; memcpy(buf, s->read_buf + s->read_offset, sublen); buf += sublen; len -= sublen; s->read_offset += sublen; } return 1;}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){ char temp[4]; temp[0] = code; temp[1] = (length >> 16) & 0xff; temp[2] = (length >> 8) & 0xff; temp[3] = (length) & 0xff; cse_write(s, temp, 4); if (length > 0) cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){ if (buf) cse_write_packet(s, code, buf, strlen(buf));}intcse_read_string(stream_t *s, char *buf, int length){ int code; int l1, l2, l3; int read_length; length--; code = cse_read_byte(s); l1 = cse_read_byte(s) & 0xff; l2 = cse_read_byte(s) & 0xff; l3 = cse_read_byte(s) & 0xff; read_length = (l1 << 16) + (l2 << 8) + (l3); if (s->socket < 0) return -1; if (length > read_length) length = read_length; if (cse_read_all(s, buf, length) < 0) return -1; buf[length] = 0; // scan extra for (read_length -= length; read_length > 0; read_length--) cse_read_byte(s); return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode_session(char *session){ int value = 0; int i; for (i = 2; i >= 0; i--) { int code = session[i]; if (code >= 'a' && code <= 'z') value = 64 * value + code - 'a'; else if (code >= 'A' && code <= 'Z') value = 64 * value + code - 'A' + 26; else if (code >= '0' && code <= '9') value = 64 * value + code - 'A' + 52; else if (code == '_') value = 64 * value + 62; else if (code == '/') value = 64 * value + 63; else return -1; } if (i > -1) return -1; else return value;}intcse_session_from_string(char *source, char *cookie){ char *match = strstr(source, cookie); if (match) return decode_session(match + strlen(cookie)); return -1;}/** * Adds a new host to the configuration */voidcse_add_host_int(config_t *config, const char *hostname, int port, int is_backup){ struct hostent *hostent; srun_t *srun; if (config->srun_size == 1 && ! config->srun_list->host) config->srun_size = 0; // Resize if too many hosts. if (config->srun_size >= config->srun_capacity) { int capacity = config->srun_capacity; srun_t *srun_list; if (capacity == 0) capacity = 4; srun_list = (srun_t *) cse_alloc(config, 2 * capacity * sizeof(srun_t)); memset(srun_list, 0, 2 * capacity * sizeof(srun_t)); if (config->srun_list) memcpy(srun_list, config->srun_list, capacity * sizeof(srun_t)); config->srun_capacity = 2 * capacity; cse_free(config, config->srun_list); config->srun_list = srun_list; } hostent = gethostbyname(hostname); if (hostent && hostent->h_addr) { srun = &config->srun_list[config->srun_size]; srun->hostname = cse_strdup(config, hostname); srun->host = (struct in_addr *) cse_alloc(config, sizeof (struct in_addr)); memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr)); srun->port = port; srun->is_backup = is_backup; srun->n_sockets = 0; srun->max_sockets = 32; /* * XXX: for dynamic srun * srun->session = cse_query_session(config, srun, config->pool); */ srun->session = config->srun_size; config->srun_size++; } else { cse_error(config, "CSE cannot find host %s\n", hostname); }}/** * Select a client JVM randomly. */static intrandom_host(char *ip, int requestTime){ int host = requestTime * 23 + g_count++; for (; *ip; ip++) { host = 23 * host + *ip; } if (host < 0) return -host; else return host;}static voidcse_reuse(stream_t *s, config_t *config, srun_t *srun, int request_time, void *pool){ memset(s, 0, sizeof(*s)); s->pool = pool;
s->config = config; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->socket = srun->sockets[--srun->n_sockets]; cse_set_socket_cleanup(s->socket, s->pool); s->srun = srun; srun->is_dead = 0; srun->last_time = request_time; LOG(("reopen %d\n", s->socket));}voidcse_recycle(stream_t *s){ int socket = s->socket; srun_t *srun = s->srun; cse_lock(s->config->lock); if (socket >= 0 && srun && srun->n_sockets < srun->max_sockets) { s->socket = -1; cse_kill_socket_cleanup(socket, s->pool); srun->sockets[srun->n_sockets++] = socket; cse_unlock(s->config->lock); LOG(("recycle %d\n", socket)); } else { cse_unlock(s->config->lock); LOG(("close2 %d\n", socket)); cse_write_packet(s, CSE_CLOSE, 0, 0); cse_close(s, "recycle"); }}static intcse_reuse_socket(stream_t *s, config_t *config, srun_t *srun, int request_time, void *pool){ cse_lock(config->lock); if (srun->n_sockets > 0 && srun->last_time + LIVE_TIME >= request_time) { cse_reuse(s, config, srun, request_time, pool); cse_unlock(config->lock); return 1; } else if (srun->n_sockets > 0) { while (srun->n_sockets > 0) { int socket = srun->sockets[--srun->n_sockets]; send(socket, "X\0\0\0", 4, 0); closesocket(socket); LOG(("close reuse-timeout %d\n", socket)); } } cse_unlock(config->lock); return 0;}intcse_open_connection(stream_t *s, config_t *config, int session_index, char *ip, int request_time, void *pool){ int size; int i; int host; int incr; size = config->srun_size; if (size < 1) size = 1; if (session_index < 0) { host = random_host(ip, request_time) % size; } else { for (host = 0; host < size; host++) { if (config->srun_list[host].session == session_index) break; } if (host == size) { host = random_host(ip, request_time) % size; } } if (host < 0) host = -host; incr = 2 * (host & 1) - 1; // XXX: if the session belongs to a host, we should retry for a few seconds? for (i = 0; i < size; i++) { srun_t *srun = config->srun_list + (host + i * (incr + size)) % size; if (session_index < 0 && srun->is_backup) { } else if (cse_reuse_socket(s, config, srun, request_time, pool)) { return 1; } else if (srun->is_dead && size > 0 && srun->last_time + DEAD_TIME >= request_time) { } else if (cse_open(s, config, srun, pool)) { s->srun = srun; srun->is_dead = 0; srun->last_time = request_time; return 1; } else { srun->is_dead = 1; srun->last_time = request_time; } } // Okay, the primaries failed. So try the secondaries. for (i = 0; i < size; i++) { srun_t *srun = config->srun_list + (host + i * (incr + size)) % size; if (cse_reuse_socket(s, config, srun, request_time, pool)) { return 1; } else if (cse_open(s, config, srun, pool)) { s->srun = srun; srun->is_dead = 0; srun->last_time = request_time; return 1; } else { srun->is_dead = 1; srun->last_time = request_time; } } return 0;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?