📄 stream.c
字号:
/* * Copyright (c) 1999-2008 Caucho Technology. All rights reserved. * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * * @author Scott Ferguson */#include <stdlib.h>#include <errno.h>#include <stdio.h>#include <string.h>#include <fcntl.h>#ifdef WIN32#include <winsock2.h>#else#include <sys/types.h>#include <sys/time.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <unistd.h>#endif#ifdef OPENSSL/* SSLeay stuff */#include <openssl/rsa.h> #include <openssl/crypto.h>#include <openssl/x509.h>#include <openssl/pem.h>#include <openssl/ssl.h>#include <openssl/err.h>#endif#include "cse.h"#define WINDOWS_READ_TIMEOUT 3600#define DEFAULT_PORT 6800#define DEAD_TIME 20#define LIVE_TIME 10#define CONNECT_TIMEOUT 2#ifndef ECONNRESET#define ECONNRESET EPIPE#endif/** * Opening method for non-ssl. */static intstd_open(stream_t *stream){ return stream->socket >= 0;}static intpoll_read(int fd, int s){ fd_set read_set; struct timeval timeout; FD_ZERO(&read_set); FD_SET(fd, &read_set); timeout.tv_sec = s; timeout.tv_usec = 0; return select(fd + 1, &read_set, 0, 0, &timeout);}/** * Read for non-ssl. */static intstd_read(stream_t *s, void *buf, int length){#ifdef WIN32 { /* windows can hang the socket even when the opposite side has closed */ int timeout = s->cluster_srun->srun->read_timeout; if (poll_read(s->socket, timeout) <= 0) return -1; }#endif return recv(s->socket, buf, length, 0);}/** * Write for non-ssl. */static intstd_write(stream_t *s, const void *buf, int length){ return send(s->socket, buf, length, 0);}/** * Close for non-ssl. */static intstd_close(int socket, void *ssl){ return closesocket(socket);}#ifdef OPENSSL/** * Opening method for ssl. */static intssl_open(stream_t *stream){ SSL_CTX *ctx = stream->cluster_srun->srun->ssl; SSL *ssl; if (stream->socket < 0) return 0; ssl = SSL_new(ctx); if (! ssl) { close(stream->socket); stream->socket = -1; ERR(("%s:%d:ssl_open(): can't allocate ssl\n", __FILE__, __LINE__)); return 0; } SSL_set_fd(ssl, stream->socket); if (SSL_connect(ssl) < 0) { closesocket(stream->socket); stream->socket = -1; SSL_free(ssl); ERR(("%s:%d:ssl_open(): can't connect with ssl\n", __FILE__, __LINE__)); return 0; } LOG(("%s:%d:ssl_open(): connect with ssl %d\n", __FILE__, __LINE__, stream->socket)); stream->ssl = ssl; return stream->socket >= 0;}/** * Read for ssl. */static intssl_read(stream_t *s, void *buf, int length){ SSL *ssl = s->ssl; if (! ssl) return -1; return SSL_read(ssl, buf, length);}/** * Write for non-ssl. */static intssl_write(stream_t *s, const void *buf, int length){ SSL *ssl = s->ssl; if (! ssl) return -1; return SSL_write(ssl, (char *) buf, length);}/** * Close for ssl. */static intssl_close(int socket, void *ssl){ if (ssl) SSL_free(ssl); return closesocket(socket);}#endifvoidcse_close(stream_t *s, char *msg){ int socket = s->socket; s->socket = -1; if (socket >= 0) { LOG(("%s:%d:cse_close(): close %d %s\n", __FILE__, __LINE__, socket, msg)); cse_kill_socket_cleanup(socket, s->web_pool); /* config read/save has no cluster_srun */ if (s->cluster_srun) s->cluster_srun->srun->close(socket, s->ssl); else closesocket(socket); }}#ifdef WIN32static intcse_connect(struct sockaddr_in *sin, srun_t *srun){ unsigned int sock; unsigned long is_nonblock; sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == INVALID_SOCKET) { ERR(("%s:%d:cse_connect(): mod_caucho can't create socket.\n", __FILE__, __LINE__)); return -1; /* bad socket */ } is_nonblock = 1; ioctlsocket(sock, FIONBIO, &is_nonblock); if (connect(sock, (struct sockaddr *) sin, sizeof(struct sockaddr_in))) { WSAEVENT event = WSACreateEvent(); WSANETWORKEVENTS networkResult; int result; WSAEventSelect(sock, event, FD_CONNECT); result = WSAWaitForMultipleEvents(1, &event, 0, srun->connect_timeout * 1000, 0); WSAEnumNetworkEvents(sock, event, &networkResult); WSAEventSelect(sock, 0, 0); WSACloseEvent(event); if (result != WSA_WAIT_EVENT_0 || networkResult.iErrorCode[FD_CONNECT_BIT] != NO_ERROR) { closesocket(sock); return -1; } } is_nonblock = 0; ioctlsocket(sock, FIONBIO, &is_nonblock); LOG(("%s:%d:cse_connect(): connect %d\n", __FILE__, __LINE__, sock)); return sock;}#elsestatic intcse_connect(struct sockaddr_in *sin, srun_t *srun){ int sock; fd_set write_fds; struct timeval timeout; int flags; int error = 0; unsigned int len = sizeof(error); sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { ERR(("%s:%d:cse_connect(): mod_caucho can't create socket.\n", __FILE__, __LINE__)); return -1; /* bad socket */ } if (sock < FD_SETSIZE) { flags = fcntl(sock, F_GETFL); fcntl(sock, F_SETFL, O_NONBLOCK|flags); FD_ZERO(&write_fds); FD_SET(sock, &write_fds); } timeout.tv_sec = srun->connect_timeout; timeout.tv_usec = 0; if (! connect(sock, (const struct sockaddr *) sin, sizeof(*sin))) { if (sock < FD_SETSIZE) { fcntl(sock, F_SETFL, flags); } return sock; } else if (FD_SETSIZE <= sock) { ERR(("%s:%d:cse_connect(): connect failed %x %d %d\n", __FILE__, __LINE__, sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); close(sock); return -1; } /* * Solaris can return other errno for a connection that will succeed, * see bug #1415. So we avoid the extra error checking here and only * check after the select() call. */ /* else if (errno != EWOULDBLOCK && errno != EINPROGRESS) { ERR(("%s:%d:cse_connect(): connect quickfailed %x %d %d\n", __FILE__, __LINE__, sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); close(sock); return -1; } */ else if (select(sock + 1, 0, &write_fds, 0, &timeout) <= 0) { ERR(("%s:%d:cse_connect(): timeout %x %d %d\n", __FILE__, __LINE__, sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); fcntl(sock, F_SETFL, flags); close(sock); return -1; } else if (! FD_ISSET(sock, &write_fds) || getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0 || error) { ERR(("%s:%d:cse_connect(): connect failed %x %d %d\n", __FILE__, __LINE__, sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); close(sock); return -1; } else { fcntl(sock, F_SETFL, flags); LOG(("%s:%d:cse_connect(): connect %x:%d -> %d\n", __FILE__, __LINE__, sin->sin_addr.s_addr, ntohs(sin->sin_port), sock)); return sock; }}#endifstatic intcse_connect_wait(struct sockaddr_in *sin){ int sock; sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { ERR(("%s:%d:cse_connect_wait(): mod_caucho can't create socket.\n", __FILE__, __LINE__)); return -1; /* bad socket */ } if (! connect(sock, (const struct sockaddr *) sin, sizeof(*sin))) { return sock; } LOG(("%s:%d:cse_connect_wait(): can't connect %x %d %d\n", __FILE__, __LINE__, sin->sin_addr.s_addr, ntohs(sin->sin_port), errno)); closesocket(sock); return -1;}intcse_open(stream_t *s, cluster_t *cluster, cluster_srun_t *cluster_srun, void *web_pool, int wait){ config_t *config = cluster->config; struct sockaddr_in sin; srun_t *srun = cluster_srun->srun; if (! srun) return 0; s->config = config; s->update_count = config->update_count; s->pool = config->p; s->web_pool = web_pool; s->write_length = 0; s->ssl = 0; s->read_length = 0; s->read_offset = 0; s->cluster_srun = cluster_srun; s->sent_data = 0; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; if (srun->host) { memcpy(&sin.sin_addr, srun->host, sizeof(struct in_addr)); } else sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); if (srun->port <= 0) srun->port = DEFAULT_PORT; sin.sin_port = htons((short) srun->port); if (wait || srun->connect_timeout <= 0) s->socket = cse_connect_wait(&sin); else s->socket = cse_connect(&sin, srun); if (s->socket < 0) { ERR(("%s:%d:cse_open(): open new failed %x:%d\n", __FILE__, __LINE__, s->socket, *srun->host, srun->port)); return 0; } srun->fail_time = 0; if (srun->send_buffer_size == 0) { int size; unsigned int len = sizeof(size); #ifdef SO_SNDBUF if (getsockopt(s->socket, SOL_SOCKET, SO_SNDBUF, (char *) &size, &len) >= 0) { size -= 1024; if (size < 8192) size = 8192; srun->send_buffer_size = size; }#else srun->send_buffer_size = 16 * 1024;#endif LOG(("%s:%d:cse_open(): send buffer size %d\n", __FILE__, __LINE__, srun->send_buffer_size)); } LOG(("%s:%d:cse_open(): open new connection %d %x:%d\n", __FILE__, __LINE__, s->socket, *srun->host, srun->port)); return srun->open(s);}/** * Flush the results to the stream. * * @param s the buffered stream for the results. */intcse_flush(stream_t *s){ unsigned char *buf = s->write_buf; int length = s->write_length; while (length > 0) { int len; /* config read/save has no cluster_srun */ if (s->cluster_srun) len = s->cluster_srun->srun->write(s, buf, length); else len = write(s->socket, buf, length); if (len <= 0) { cse_close(s, "flush"); return -1; } length -= len; buf += len; } s->sent_data = 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -