📄 stream.c
字号:
/* * Copyright (c) 1999-2002 Caucho Technology. All rights reserved. * * Caucho Technology permits modification and use of this file in * source and binary form ("the Software") subject to the Caucho * Developer Source License 1.1 ("the License") which accompanies * this file. The License is also available at * http://www.caucho.com/download/cdsl1-1.xtp * * In addition to the terms of the License, the following conditions * must be met: * * 1. Each copy or derived work of the Software must preserve the copyright * notice and this notice unmodified. * * 2. Each copy of the Software in source or binary form must include * an unmodified copy of the License in a plain ASCII text file named * LICENSE. * * 3. Caucho reserves all rights to its names, trademarks and logos. * In particular, the names "Resin" and "Caucho" are trademarks of * Caucho and may not be used to endorse products derived from * this software. "Resin" and "Caucho" may not appear in the names * of products derived from this software. * * This Software is provided "AS IS," without a warranty of any kind. * ALL EXPRESS OR IMPLIED REPRESENTATIONS AND WARRANTIES, INCLUDING ANY * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE * OR NON-INFRINGEMENT, ARE HEREBY EXCLUDED. * * CAUCHO TECHNOLOGY AND ITS LICENSORS SHALL NOT BE LIABLE FOR ANY DAMAGES * SUFFERED BY LICENSEE OR ANY THIRD PARTY AS A RESULT OF USING OR * DISTRIBUTING SOFTWARE. IN NO EVENT WILL CAUCHO OR ITS LICENSORS BE LIABLE * FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL, * CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND * REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF OR * INABILITY TO USE SOFTWARE, EVEN IF HE HAS BEEN ADVISED OF THE POSSIBILITY * OF SUCH DAMAGES. * * @author Scott Ferguson */#include <stdlib.h>#include <errno.h>#include <stdio.h>#include <string.h>#include <fcntl.h>#ifdef WIN32#include <winsock2.h>#else#include <sys/types.h>#include <sys/time.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <unistd.h>#endif#ifdef OPENSSL/* SSLeay stuff */#include <openssl/rsa.h> #include <openssl/crypto.h>#include <openssl/x509.h>#include <openssl/pem.h>#include <openssl/ssl.h>#include <openssl/err.h>#endif#include "cse.h"#define DEFAULT_PORT 6802#define DEAD_TIME 120#define LIVE_TIME 10#define CONNECT_TIMEOUT 2static srun_t *g_srun_list[4096];static int g_srun_count;static int g_ssl_init;/** * Opening method for non-ssl. */static intstd_open(stream_t *stream){ return stream->socket >= 0;}/** * Read for non-ssl. */static intstd_read(stream_t *s, char *buf, int length){ return recv(s->socket, buf, length, 0);}/** * Write for non-ssl. */static intstd_write(stream_t *s, const char *buf, int length){ return send(s->socket, buf, length, 0);}/** * Close for non-ssl. */static intstd_close(int socket, void *ssl){ return closesocket(socket);}#ifdef OPENSSL/** * Opening method for ssl. */static intssl_open(stream_t *stream){ SSL_CTX *ctx; SSL *ssl; int fd; SSL_METHOD *meth; fd = stream->socket; LOG(("Trying ssl %d\n", fd)); if (fd < 0) return 0; if (! stream->ssl_ctx) { if (! g_ssl_init) { OpenSSL_add_ssl_algorithms(); SSL_load_error_strings(); g_ssl_init = 1; } meth = SSLv2_client_method(); ctx = SSL_CTX_new(meth); if (! ctx) { stream->socket = -1; closesocket(fd); LOG(("Failed SSL context\n")); return 0; } stream->ssl_ctx = ctx; } ctx = stream->ssl_ctx; /* ctx = stream->srun->srun->ssl; */ if (! ctx) { stream->socket = -1; closesocket(fd); LOG(("Can't allocate ssl context\n")); return 0; } ssl = SSL_new(ctx); if (! ssl) { stream->socket = -1; closesocket(fd); LOG(("Can't allocate ssl\n")); return 0; } SSL_set_fd(ssl, fd); if (SSL_connect(ssl) < 0) { LOG(("Can't open SSL connection %p\n", ssl)); ERR_print_errors_fp(stderr); stream->socket = -1; closesocket(fd); SSL_free(ssl); return 0; } LOG(("Connect with ssl %d\n", fd)); stream->ssl = ssl; return 1;}/** * Read for ssl. */static intssl_read(stream_t *s, char *buf, int length){ SSL *ssl = s->ssl; if (! ssl) return -1; return SSL_read(ssl, buf, length);}/** * Write for non-ssl. */static intssl_write(stream_t *s, const char *buf, int length){ SSL *ssl = s->ssl; if (! ssl) return -1; return SSL_write(ssl, (char *) buf, length);}/** * Close for ssl. */static intssl_close(int socket, void *ssl){ if (ssl) SSL_free(ssl); return closesocket(socket);}#endifvoidcse_close(stream_t *s, char *msg){ int socket = s->socket; s->socket = -1; if (socket >= 0) { LOG(("close %d %s\n", socket, msg)); cse_kill_socket_cleanup(socket, s->web_pool); s->srun->srun->close(socket, s->ssl); }}#ifdef WIN32static intcse_connect(struct sockaddr_in *sin, srun_t *srun){ unsigned int sock; unsigned long is_nonblock; sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == INVALID_SOCKET) { LOG(("mod_caucho can't create socket.\n")); return -1; /* bad socket */ } is_nonblock = 1; ioctlsocket(sock, FIONBIO, &is_nonblock); if (connect(sock, (struct sockaddr *) sin, sizeof(struct sockaddr_in))) { WSAEVENT event = WSACreateEvent(); WSANETWORKEVENTS networkResult; int result; WSAEventSelect(sock, event, FD_CONNECT); result = WSAWaitForMultipleEvents(1, &event, 0, srun->connect_timeout * 1000, 0); WSAEnumNetworkEvents(sock, event, &networkResult); WSAEventSelect(sock, 0, 0); WSACloseEvent(event); if (result != WSA_WAIT_EVENT_0 || networkResult.iErrorCode[FD_CONNECT_BIT] != NO_ERROR) { closesocket(sock); return -1; } } is_nonblock = 0; ioctlsocket(sock, FIONBIO, &is_nonblock); LOG(("connect %d\n", sock)); return sock;}#elsestatic intcse_connect(struct sockaddr_in *sin, srun_t *srun){ int sock; fd_set write_fds; struct timeval timeout; int flags; int error = 0; int len = sizeof(error); sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { LOG(("mod_caucho can't create socket.\n")); return -1; /* bad socket */ } flags = fcntl(sock, F_GETFL); fcntl(sock, F_SETFL, O_NONBLOCK|flags); FD_ZERO(&write_fds); FD_SET(sock, &write_fds); timeout.tv_sec = srun->connect_timeout; timeout.tv_usec = 0; if (! connect(sock, (const struct sockaddr *) sin, sizeof(*sin))) { fcntl(sock, F_SETFL, flags); return sock; } else if (errno != EWOULDBLOCK && errno != EINPROGRESS) { LOG(("connect quickfailed %x %d %d\n", sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); close(sock); return -1; } else if (select(sock + 1, 0, &write_fds, 0, &timeout) <= 0) { LOG(("timeout %x %d %d\n", sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); fcntl(sock, F_SETFL, flags); close(sock); return -1; } else if (! FD_ISSET(sock, &write_fds) || getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0 || error) { LOG(("connect failed %x %d %d\n", sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); close(sock); return -1; } else { fcntl(sock, F_SETFL, flags); LOG(("connect %x:%d -> %d\n", sin->sin_addr.s_addr, ntohs(sin->sin_port), sock)); return sock; }}#endifstatic intcse_connect_wait(struct sockaddr_in *sin){ int sock; sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { LOG(("mod_caucho can't create socket.\n")); return -1; /* bad socket */ } if (! connect(sock, (const struct sockaddr *) sin, sizeof(*sin))) { return sock; } LOG(("cse_connect_wait can't connect %x %d %d\n", sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); closesocket(sock); return -1;}intcse_open(stream_t *s, config_t *config, srun_item_t *srun_item, void *web_pool, int wait){ struct sockaddr_in sin; srun_t *srun = srun_item->srun; if (! srun) return 0; s->config = config; s->update_count = config->update_count; s->pool = config->p; s->web_pool = web_pool; s->write_length = 0; s->read_length = 0; s->read_offset = 0; s->srun = srun_item; s->sent_data = 0; sin.sin_family = AF_INET; if (srun->host) sin.sin_addr = *srun->host; else sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); if (srun->port <= 0) srun->port = DEFAULT_PORT; sin.sin_port = htons((short) srun->port); if (wait || srun->connect_timeout <= 0) s->socket = cse_connect_wait(&sin); else s->socket = cse_connect(&sin, srun); if (s->socket < 0) return 0; LOG(("open new connection %d %x:%d\n", s->socket, *srun->host, srun->port)); return srun->open(s);}/** * Flush the results to the stream. * * @param s the buffered stream for the results. */intcse_flush(stream_t *s){ char *buf = s->write_buf; int length = s->write_length; while (length > 0) { int len = s->srun->srun->write(s, buf, length); if (len <= 0) { cse_close(s, "flush"); return -1; } length -= len; buf += len; } s->sent_data = 1; s->write_length = 0; return 0;}/** * Flushes the output buffer and fills the read buffer. The two buffers * are combined so we can try another srun if the request fails. */intcse_fill_buffer(stream_t *s){ int len = 0; if (s->socket < 0) return -1; /* flush the buffer */ if (s->write_length > 0) { LOG(("write %d %d\n", s->socket, s->write_length)); len = s->srun->srun->write(s, s->write_buf, s->write_length); if (len != s->write_length) { cse_close(s, "flush"); return -1; } } s->read_offset = 0; s->read_length = s->srun->srun->read(s, s->read_buf, BUF_LENGTH); if (s->read_length <= 0) { cse_close(s, "fill_buffer"); return -1; } s->sent_data = 1; s->write_length = 0; return s->read_length;}intcse_read_byte(stream_t *s){ if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } return s->read_buf[s->read_offset++];}voidcse_write(stream_t *s, const char *buf, int length){ /* XXX: writev??? */ if (s->write_length + length > BUF_LENGTH) { if (s->write_length > 0) { if (cse_flush(s) < 0) { s->sent_data = 1; return; } } if (length >= BUF_LENGTH) { int len; len = s->srun->srun->write(s, buf, length); s->sent_data = 1; if (len < 0) cse_close(s, "write"); return; } } memcpy(s->write_buf + s->write_length, buf, length); s->write_length += length;}intcse_read_all(stream_t *s, char *buf, int len){ while (len > 0) { int sublen; if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } sublen = s->read_length - s->read_offset; if (len < sublen) sublen = len; memcpy(buf, s->read_buf + s->read_offset, sublen); buf += sublen; len -= sublen; s->read_offset += sublen; } return 1;}intcse_skip(stream_t *s, int len){ while (len > 0) { int sublen; if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } sublen = s->read_length - s->read_offset; if (len < sublen) sublen = len; len -= sublen; s->read_offset += sublen; } return 1;}intcse_read_limit(stream_t *s, char *buf, int buflen, int readlen){ int result; if (buflen >= readlen) { result = cse_read_all(s, buf, readlen); buf[readlen] = 0; } else { result = cse_read_all(s, buf, buflen); buf[buflen - 1] = 0; cse_skip(s, readlen - buflen); } return result > 0 ? readlen : 0;}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){ char temp[4]; temp[0] = code; temp[1] = (length >> 16) & 0xff; temp[2] = (length >> 8) & 0xff; temp[3] = (length) & 0xff; cse_write(s, temp, 4); if (length >= 0) cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){ if (buf) cse_write_packet(s, code, buf, strlen(buf));}intcse_read_string(stream_t *s, char *buf, int length){ int code; int l1, l2, l3; int read_length; length--; code = cse_read_byte(s); l1 = cse_read_byte(s) & 0xff; l2 = cse_read_byte(s) & 0xff; l3 = cse_read_byte(s) & 0xff; read_length = (l1 << 16) + (l2 << 8) + (l3); if (s->socket < 0) { *buf = 0; return -1; } if (length > read_length) length = read_length; if (cse_read_all(s, buf, length) < 0) { *buf = 0; return -1; } buf[length] = 0; /* scan extra */ for (read_length -= length; read_length > 0; read_length--) cse_read_byte(s);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -