📄 stream.c
字号:
open_bzwstream_(const char *filename, const char *mode){ stream *s; if ((s = open_bzstream(filename, mode)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_BIN; if (s->errnr == NO__ERROR) BZ2_bzwrite((BZFILE *) s->stream_data.p, (void *) &s->byteorder, sizeof(s->byteorder)); return s;}stream *open_bzwstream(const char *filename){ return open_bzwstream_(filename, "wb");}stream *open_bzrastream(const char *filename){ stream *s; if ((s = open_bzstream(filename, "rb")) == NULL) return NULL; s->type = ST_ASCII; return s;}static stream *open_bzwastream_(const char *filename, const char *mode){ stream *s; if ((s = open_bzstream(filename, mode)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_ASCII; return s;}stream *open_bzwastream(const char *filename){ return open_bzwastream_(filename, "wb");}#endifstream *open_rstream(const char *filename){ stream *s; const char *ext;#ifdef STREAM_DEBUG printf("open_rstream %s\n", filename);#endif ext = get_extention(filename); if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ return open_gzrstream(filename);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2 return open_bzrstream(filename);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if ((s = open_stream(filename, "rb")) == NULL) return NULL; s->type = ST_BIN; if (s->errnr == NO__ERROR) { fread((void *) &s->byteorder, sizeof(s->byteorder), 1, (FILE *) s->stream_data.p); if (ferror((FILE *) s->stream_data.p)) s->errnr = OPEN_ERROR; } return s;}static stream *open_wstream_(const char *filename, char *mode){ stream *s; const char *ext;#ifdef STREAM_DEBUG printf("open_wstream %s\n", filename);#endif ext = get_extention(filename); if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ return open_gzwstream_(filename, mode);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2 return open_bzwstream_(filename, mode);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if ((s = open_stream(filename, mode)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_BIN; if (s->errnr == NO__ERROR) fwrite((void *) &s->byteorder, sizeof(s->byteorder), 1, (FILE *) s->stream_data.p); return s;}stream *open_wstream(const char *filename){ return open_wstream_(filename, "wb");}stream *append_wstream(const char *filename){ return open_wstream_(filename, "ab");}stream *open_rastream(const char *filename){ stream *s; const char *ext;#ifdef STREAM_DEBUG printf("open_rastream %s\n", filename);#endif ext = get_extention(filename); if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ return open_gzrastream(filename);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2 return open_bzrastream(filename);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if ((s = open_stream(filename, "r")) == NULL) return NULL; s->type = ST_ASCII; return s;}static stream *open_wastream_(const char *filename, char *mode){ stream *s; const char *ext;#ifdef STREAM_DEBUG printf("open_wastream %s\n", filename);#endif ext = get_extention(filename); if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ return open_gzwastream_(filename, mode);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2 return open_bzwastream_(filename, mode);#else if ((s = create_stream(filename)) != NULL) s->errnr = OPEN_ERROR; return s;#endif } if ((s = open_stream(filename, mode)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_ASCII; return s;}stream *open_wastream(const char *filename){ return open_wastream_(filename, "w");}stream *append_wastream(const char *filename){ return open_wastream_(filename, "a");}#ifdef HAVE_CURL#include <curl/curl.h>#ifdef USE_CURL_MULTIstatic CURLM *multi_handle;#endifstruct curl_data { CURL *handle; char *buffer; /* buffer to store incoming data */ size_t maxsize; /* size of allocated buffer */ size_t usesize; /* end of used data */ size_t offset; /* start of unread data */ int running; /* whether still transferring */#ifdef USE_CURL_MULTI CURLMcode result; /* result of transfer (if !running) */ struct curl_data *next; /* linked list (curl_handles) */#endif};#ifdef USE_CURL_MULTIstatic struct curl_data *curl_handles;#endif/* this function is called by libcurl when there is data for us */static size_twrite_callback(char *buffer, size_t size, size_t nitems, void *userp){ stream *s = (stream *) userp; struct curl_data *c = (struct curl_data *) s->stream_data.p; size *= nitems; /* allocate a buffer if we don't have one yet */ if (c->buffer == NULL && size != 0) { /* BLOCK had better be a power of 2! */ c->maxsize = (size + BLOCK - 1) & ~(BLOCK - 1); if ((c->buffer = malloc(c->maxsize)) == NULL) return 0; c->usesize = 0; c->offset = 0; }#ifndef USE_CURL_MULTI /* move data if we don't have enough space */ if (c->maxsize - c->usesize < size && c->offset > 0) { memmove(c->buffer, c->buffer + c->offset, c->usesize - c->offset); c->usesize -= c->offset; c->offset = 0; }#endif /* allocate more buffer space if we still don't have enough space */ if (c->maxsize - c->usesize < size) { c->maxsize = (c->usesize + size + BLOCK - 1) & ~(BLOCK - 1); c->buffer = realloc(c->buffer, c->usesize + size); } /* finally, store the data we received */ memcpy(c->buffer + c->usesize, buffer, size); c->usesize += size; return size;}static voidcurl_destroy(stream *s){ struct curl_data *c;#ifdef USE_CURL_MULTI struct curl_data **cp;#endif if ((c = (struct curl_data *) s->stream_data.p) != NULL) { s->stream_data.p = NULL;#ifdef USE_CURL_MULTI /* lock access to curl_handles */ cp = &curl_handles; while (*cp && *cp != c) cp = &(*cp)->next; if (*cp) *cp = c->next; /* unlock access to curl_handles */#endif if (c->handle) {#ifdef USE_CURL_MULTI curl_multi_remove_handle(mult_handle, c->handle);#endif curl_easy_cleanup(c->handle); } if (c->buffer) free(c->buffer); free(c); } destroy(s);}static ssize_tcurl_read(stream *s, void *buf, size_t elmsize, size_t cnt){ struct curl_data *c = (struct curl_data *) s->stream_data.p; size_t size; if (c->usesize - c->offset >= elmsize || !c->running) { /* there is at least one element's worth of data available, or we have reached the end: return as much as we have, but no more than requested */ if (cnt * elmsize > c->usesize - c->offset) cnt = (c->usesize - c->offset) / elmsize; size = cnt * elmsize; memcpy(buf, c->buffer + c->offset, size); c->offset += size; if (c->offset == c->usesize) c->usesize = c->offset = 0; return (ssize_t) cnt; } /* not enough data, we must wait until we get some */#ifndef USE_CURL_MULTI return 0;#endif}static ssize_tcurl_write(stream *s, const void *buf, size_t elmsize, size_t cnt){ (void) s; (void) buf; (void) elmsize; (void) cnt; assert(0); return -1;}static voidcurl_close(stream *s){ (void) s;}stream *open_urlstream(const char *url){ stream *s; struct curl_data *c;#ifdef USE_CURL_MULTI CURLMsg *msg;#endif if ((c = malloc(sizeof(*c))) == NULL) return NULL; c->handle = NULL; c->buffer = NULL; c->maxsize = c->usesize = c->offset = 0; c->running = 1; if ((s = create_stream(url)) == NULL) { free(c); return NULL; }#ifdef USE_CURL_MULTI /* lock access to curl_handles */ c->next = curl_handles; curl_handles = c; /* unlock access to curl_handles */#endif s->read = curl_read; s->write = curl_write; s->close = curl_close; s->destroy = curl_destroy; s->stream_data.p = (void *) c; if ((c->handle = curl_easy_init()) == NULL) { destroy(s); return NULL; } curl_easy_setopt(c->handle, CURLOPT_URL, s->name); curl_easy_setopt(c->handle, CURLOPT_WRITEDATA, s); curl_easy_setopt(c->handle, CURLOPT_VERBOSE, FALSE); curl_easy_setopt(c->handle, CURLOPT_NOSIGNAL, TRUE); curl_easy_setopt(c->handle, CURLOPT_WRITEFUNCTION, write_callback);#ifdef USE_CURL_MULTI if (multi_handle == NULL) multi_handle = curl_multi_init(); curl_multi_add_handle(multi_handle, c->handle); while (curl_multi_perform(multi_handle, NULL) == CURLM_CALL_MULTI_PERFORM) ; while ((msg = curl_multi_info_read(multi_handle, NULL)) != NULL) { struct curl_data *p; /* lock access to curl_handles */ for (p = curl_handles; p; p = p->next) { if (p->handle == msg->easy_handle) { switch (msg->msg) { case CURLMSG_DONE: p->running = 0; p->result = msg->data.result; curl_multi_remove_handle(multi_handle, p->handle); curl_easy_cleanup(p->handle); p->handle = NULL; break; default: break; } break; } } /* unlock access to curl_handles */ }#else if (curl_easy_perform(c->handle) != CURLE_OK) s->errnr = OPEN_ERROR; curl_easy_cleanup(c->handle); c->handle = NULL; c->running = 0;#endif return s;}#endif /* HAVE_CURL */static ssize_tsocket_write(stream *s, const void *buf, size_t elmsize, size_t cnt){ ssize_t nr = 0, res = 0, size = elmsize * cnt; if (!s || s->errnr) return -1; if (size == 0 || elmsize == 0) return cnt; errno = 0; while (res < size && (#ifdef NATIVE_WIN32 /* send works on int, make sure the argument fits */ ((nr = send(s->stream_data.s, (void *) ((char *) buf + res), (int) min(size - res, 1 << 16), 0)) > 0)#else ((nr = write(s->stream_data.s, (void *) ((char *) buf + res), size - res)) > 0)#endif || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) ) { errno = 0; if (nr > 0) res += nr; } if (nr < 0) { s->errnr = WRITE_ERROR; return nr; } if (res > 0) return res / elmsize; s->errnr = WRITE_ERROR; return -1;}static ssize_tsocket_read(stream *s, void *buf, size_t elmsize, size_t cnt){ ssize_t nr = 0, res = 0, size = elmsize * cnt; if (!s || s->errnr) return(-1); errno = 0; while (res < size && (#ifdef NATIVE_WIN32 /* recv works on int, make sure the argument fits */ ((nr = recv(s->stream_data.s, (void *) ((char *) buf + res), (int) min(size - res, 1 << 16), 0)) > 0)#else ((nr = read(s->stream_data.s, (void *) ((char *) buf + res), size - res)) > 0)#endif || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) ) { errno = 0; if (nr > 0) res += nr; } if (nr < 0) { s->errnr = READ_ERROR; return nr; } return res / elmsize;}/* Read one line (seperated by \n) of at most maxcnt characters from the stream. Returns the number of characters actually read. */static ssize_tsocket_readline(stream *s, void *buf, size_t maxcnt){ size_t len = 0; char *b = buf, *start = buf; while (socket_read(s, start, 1, 1) > 0 && len < maxcnt) { if (*start == '\n') break; start++; } if (s->errnr) return s->errnr; return (start - b);}static voidsocket_close(stream *s){ SOCKET fd = s->stream_data.s; if (fd >= 0) { /* Related read/write (in/out, from/to) streams * share a single socket which is not dup'ed (anymore) * as Windows' dup doesn't work on sockets; * hence, only one of the streams must/may close that * socket; we choose to let the read socket do the * job, since in mapi.mx it may happen that the read * stream is closed before the write stream was even * created. */ if (s->access == ST_READ) {#ifdef HAVE_SHUTDOWN shutdown(fd, SHUT_RDWR);#endif closesocket(fd); } } s->stream_data.s = -1;}static stream *socket_open(SOCKET sock, const char *name){ stream *s; if ((s = create_stream(name)) == NULL) return NULL; s->read = socket_read; s->readline = socket_readline; s->write = socket_write; s->close = socket_close; s->flush = NULL; s->stream_data.s = sock; errno = 0;#if defined(SO_KEEPALIVE) && !defined(WIN32) { int opt = 0; if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *) &opt, sizeof(opt)) < 0) { s->errnr = OPEN_ERROR; return s; } }#endif#if defined(IPTOS_THROUGHPUT) && !defined(WIN32) { int tos = IPTOS_THROUGHPUT; if (setsockopt(sock, IPPROTO_IP, IP_TOS, (void *) &tos, sizeof(tos)) < 0) { s->errnr = OPEN_ERROR; return s; } }#endif#ifdef TCP_NODELAY { int nodelay = 1; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *) &nodelay, sizeof(nodelay)) < 0) { s->errnr = OPEN_ERROR; return s; } }#endif#ifdef HAVE_FCNTL { int fl = fcntl(sock, F_GETFL); fl &= ~O_NONBLOCK; if (fcntl(sock, F_SETFL, fl) < 0) { s->errnr = OPEN_ERROR; return s; } }#endif return s;}stream *socket_rstream(SOCKET sock, const char *name){ stream *s;#ifdef STREAM_DEBUG printf("socket_rstream " SSZFMT " %s\n", (ssize_t) sock, name);#endif if ((s = socket_open(sock, name)) == NULL) return NULL; s->type = ST_BIN; if (s->errnr == NO__ERROR) socket_read(s, (void *) &s->byteorder, sizeof(s->byteorder), 1); return s;}stream *socket_wstream(SOCKET sock, const char *name){ stream *s;#ifdef STREAM_DEBUG printf("socket_wstream " SSZFMT " %s\n", (ssize_t) sock, name);#endif if ((s = socket_open(sock, name)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_BIN; if (s->errnr == NO__ERROR) socket_write(s, (void *) &s->byteorder, sizeof(s->byteorder), 1); return s;}stream *socket_rastream(SOCKET sock, const char *name){ stream *s;#ifdef STREAM_DEBUG printf("socket_rastream " SSZFMT " %s\n", (ssize_t) sock, name);#endif if ((s = socket_open(sock, name)) != NULL) s->type = ST_ASCII; return s;}stream *socket_wastream(SOCKET sock, const char *name){ stream *s;#ifdef STREAM_DEBUG printf("socket_wastream " SSZFMT " %s\n", (ssize_t) sock, name);#endif if ((s = socket_open(sock, name)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_ASCII; return s;}#ifdef HAVE_OPENSSLstruct ssl_data { int error; int ret; SSL *ssl;};static ssize_tssl_read(stream *s, void *buf, size_t elmsize, size_t cnt){ SSL *ssl; size_t res = 0, size = elmsize * cnt; if (!s || s->errnr) return(-1); assert(s->stream_data.p); ssl = ((struct ssl_data *) s->stream_data.p)->ssl; assert(ssl); while (res < size) { int nr, err; nr = SSL_read(ssl, (void *) ((char *) buf + res), (int) (size - res));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -