📄 stream.c
字号:
s->write_length = 0; return 0;}/** * Flushes the output buffer and fills the read buffer. The two buffers * are combined so we can try another srun if the request fails. */intcse_fill_buffer(stream_t *s){ int len = 0; int retry; if (s->socket < 0) return -1; /* flush the buffer */ if (s->write_length > 0) { LOG(("%s:%d:cse_fill_buffer(): write %d %d\n", __FILE__, __LINE__, s->socket, s->write_length)); /* config read/save has no cluster_srun */ if (s->cluster_srun) len = s->cluster_srun->srun->write(s, s->write_buf, s->write_length); else len = write(s->socket, s->write_buf, s->write_length); if (len != s->write_length) { cse_close(s, "flush"); return -1; } } s->read_offset = 0; retry = 3; do { /* config read/save has no cluster_srun */ if (s->cluster_srun) s->read_length = s->cluster_srun->srun->read(s, s->read_buf, BUF_LENGTH); else s->read_length = read(s->socket, s->read_buf, BUF_LENGTH); // repeat for EINTR, EAGAIN } while (s->read_length < 0 && errno != EPIPE && errno != ECONNRESET && retry-- > 0); if (s->read_length <= 0) { cse_close(s, "fill_buffer"); return -1; } s->sent_data = 1; s->write_length = 0; return s->read_length;}intcse_read_byte(stream_t *s){ if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } return s->read_buf[s->read_offset++];}voidcse_write(stream_t *s, const char *buf, int length){ int write_length = s->write_length; /* XXX: writev??? */ if (BUF_LENGTH < write_length + length) { if (write_length > 0) { if (cse_flush(s) < 0) { s->sent_data = 1; return; } write_length = 0; } if (BUF_LENGTH <= length) { int len; /* config read/save has no cluster_srun */ if (s->cluster_srun) len = s->cluster_srun->srun->write(s, buf, length); else len = write(s->socket, buf, length); s->sent_data = 1; if (len < 0) cse_close(s, "write"); return; } } memcpy(s->write_buf + write_length, buf, length); s->write_length = write_length + length;}voidcse_write_byte(stream_t *s, int ch){ /* XXX: writev??? */ if (BUF_LENGTH < s->write_length + 1) { if (s->write_length > 0) { if (cse_flush(s) < 0) { s->sent_data = 1; return; } } } s->write_buf[s->write_length++] = ch;}intcse_read_all(stream_t *s, char *buf, int len){ while (len > 0) { int sublen; if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } sublen = s->read_length - s->read_offset; if (len < sublen) sublen = len; memcpy(buf, s->read_buf + s->read_offset, sublen); buf += sublen; len -= sublen; s->read_offset += sublen; } return 1;}intcse_skip(stream_t *s, int len){ while (len > 0) { int sublen; if (s->read_offset >= s->read_length) { if (cse_fill_buffer(s) < 0) return -1; } sublen = s->read_length - s->read_offset; if (len < sublen) sublen = len; len -= sublen; s->read_offset += sublen; } return 1;}intcse_read_limit(stream_t *s, char *buf, int buflen, int readlen){ int result; if (readlen <= buflen) { result = cse_read_all(s, buf, readlen); buf[readlen] = 0; } else { result = cse_read_all(s, buf, buflen); buf[buflen - 1] = 0; cse_skip(s, readlen - buflen); } return result > 0 ? readlen : 0;}inthmux_read_len(stream_t *s){ int l1 = cse_read_byte(s) & 0xff; int l2 = cse_read_byte(s) & 0xff; return (l1 << 8) + l2;}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param buf data buffer * @param length length of data in buffer */voidcse_write_packet(stream_t *s, char code, const char *buf, int length){ char temp[4]; temp[0] = code; temp[1] = (length >> 8) & 0xff; temp[2] = (length) & 0xff; cse_write(s, temp, 3); if (length >= 0) cse_write(s, buf, length);}/** * writes a string to srun */voidcse_write_string(stream_t *s, char code, const char *buf){ if (buf) cse_write_packet(s, code, buf, strlen(buf));}/** * writes a string to srun */voidhmux_write_string(stream_t *s, char code, const char *buf){ if (buf) cse_write_packet(s, code, buf, strlen(buf));}/** * write a packet to srun * * @param s stream to srun * @param code packet code * @param int data int */voidhmux_write_int(stream_t *s, char code, int i){ char temp[8]; temp[0] = code; temp[1] = 0; temp[2] = 4; temp[3] = (char) (i >> 24); temp[4] = (char) (i >> 16); temp[5] = (char) (i >> 8); temp[6] = (char) (i); cse_write(s, temp, 7);}voidhmux_start_channel(stream_t *s, unsigned short channel){ cse_write_byte(s, HMUX_CHANNEL); cse_write_byte(s, channel >> 8); cse_write_byte(s, channel);}voidhmux_write_close(stream_t *s){ cse_write_byte(s, HMUX_QUIT);}voidhmux_write_exit(stream_t *s){ cse_write_byte(s, HMUX_EXIT);}inthmux_read_string(stream_t *s, char *buf, int length){ int l1, l2; int read_length; length--; l1 = cse_read_byte(s) & 0xff; l2 = cse_read_byte(s) & 0xff; read_length = (l1 << 8) + l2; if (s->socket < 0) { *buf = 0; return -1; } if (length > read_length) length = read_length; if (cse_read_all(s, buf, length) < 0) { *buf = 0; return -1; } buf[length] = 0; /* scan extra */ for (read_length -= length; read_length > 0; read_length--) cse_read_byte(s); return length;}intcse_read_string(stream_t *s, char *buf, int length){ int code; int read_length; length--; code = cse_read_byte(s); read_length = hmux_read_string(s, buf, length); if (read_length < 0) return -1; else return code;}/** * Decodes the first 3 characters of the session to see which * JVM owns it. */static intdecode(char code){ if ('a' <= code && code <= 'z') return code - 'a'; else if ('A' <= code && code <= 'Z') return code - 'A' + 26; else if ('0' <= code && code <= '9') return code - '0' + 52; else if (code == '_') return 62; else if (code == '-') return 63; else return -1;}/** * Returns the session id from a cookie. */intcse_session_from_string(char *source, char *cookie, int *backup){ char *match = strstr(source, cookie); if (match) { int len = strlen(cookie); if (match[len] == '=') len++; *backup = decode(match[len + 1]); return decode(match[len]); } return -1;}static srun_t *cse_find_config_srun(config_t *config, const char *hostname, int port, int ssl){ int i; srun_t *srun; for (i = 0; i < config->srun_capacity; i++) { srun = config->srun_list[i]; if (! strcmp(srun->hostname, hostname) && srun->port == port && (srun->ssl != 0) == ssl) { return srun; } } return 0;}static voidcse_add_config_srun(config_t *config, srun_t *srun){ int size; srun_t **srun_list; size = config->srun_capacity; srun_list = malloc((size + 1) * sizeof(srun_t *)); memcpy(srun_list, config->srun_list, size * sizeof(srun_t *)); srun_list[size] = srun; config->srun_list = srun_list; config->srun_capacity = size + 1;}static srun_t *cse_add_srun(cluster_t *cluster, const char *hostname, int port, int ssl){ struct hostent *hostent = 0; srun_t *srun = 0; config_t *config = cluster->config; int i; LOG(("%s:%d:cse_add_srun(): adding host %s:%d cluster=%p\n", __FILE__, __LINE__, hostname, port, cluster)); srun = cse_find_config_srun(config, hostname, port, ssl); if (srun) return srun; srun = malloc(sizeof(srun_t)); memset(srun, 0, sizeof(srun_t)); hostent = gethostbyname(hostname); if (hostent && hostent->h_addr) { srun->hostname = strdup(hostname); srun->host = (struct in_addr *) malloc(sizeof (struct in_addr)); memcpy(srun->host, hostent->h_addr, sizeof(struct in_addr)); srun->port = port; srun->conn_head = 0; srun->conn_tail = 0; srun->max_sockets = 32; srun->connect_timeout = CONNECT_TIMEOUT; srun->live_time = LIVE_TIME; srun->dead_time = DEAD_TIME; srun->read_timeout = WINDOWS_READ_TIMEOUT; srun->open = std_open; srun->read = std_read; srun->write = std_write; srun->close = std_close;#ifdef OPENSSL if (ssl) { SSL_CTX* ctx; SSL_METHOD *meth; SSLeay_add_ssl_algorithms(); meth = SSLv3_client_method(); SSL_load_error_strings(); ctx = SSL_CTX_new(meth); if (ctx) { srun->ssl = ctx; srun->open = ssl_open; srun->read = ssl_read; srun->write = ssl_write; srun->close = ssl_close; } else { ERR(("%s:%d:cse_add_srun(): can't initialize ssl", __FILE__, __LINE__)); } }#endif srun->lock = cse_create_lock(config); LOG(("%s:%d:cse_add_srun(): srun lock %x\n", __FILE__, __LINE__, srun->lock)); cse_add_config_srun(config, srun); return srun; } return 0;}/** * Adds a new host to the configuration */cluster_srun_t *cse_add_cluster_server(mem_pool_t *pool, cluster_t *cluster, const char *hostname, int port, const char *id, int index, int is_backup, int is_ssl){ config_t *config = cluster->config; srun_t *srun; cluster_srun_t *cluster_srun; if (index < 0) index = cluster->srun_size; /* Resize if too many hosts. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -