📄 stream.c
字号:
#line 276 "/export/scratch0/monet/monet.GNU.64.64.d.14791/MonetDB/src/common/stream.mx"#include "monetdb_config.h"#include "stream.h"#include <string.h>#ifdef HAVE_NETDB_H# include <sys/types.h># include <netinet/in_systm.h># include <netinet/in.h># include <netinet/ip.h># include <netinet/tcp.h># include <netdb.h>#endif#ifdef NATIVE_WIN32#include <winsock.h>#include <io.h>#endif#ifdef HAVE_FCNTL_H#include <fcntl.h>#endif#ifdef HAVE_LIBZ#include <zlib.h>#endif#ifdef HAVE_LIBBZ2#include <bzlib.h>#endif#ifndef SHUT_RD#define SHUT_RD 0#define SHUT_WR 1#define SHUT_RDWR 2#endif#ifndef EWOULDBLOCK#define EWOULDBLOCK EAGAIN#endif#ifndef EINTR#define EINTR EAGAIN#endifstruct stream { short byteorder; short access; /* read/write */ short type; /* ascii/binary */ char *name; union { void *p; int i; SOCKET s; } stream_data; int errnr; ssize_t (*read) (stream *s, void *buf, size_t elmsize, size_t cnt); ssize_t (*readline) (stream *s, void *buf, size_t maxcnt); ssize_t (*write) (stream *s, const void *buf, size_t elmsize, size_t cnt); void (*close) (stream *s); char *(*error) (stream *s); void (*destroy) (stream *s); int (*flush) (stream *s); int (*fsync) (stream *s);};#if !HAVE_DECL_STRDUP#ifdef HAVE_STRDUPextern char *strdup(const char *);#else#define strdup(s) strcpy(malloc(strlen(s)+1),(s))#endif#endif#ifdef NATIVE_WIN32#define strdup _strdup#endifintstream_init(void){ static int inited = 0; if (inited) return 0;#ifdef NATIVE_WIN32 { int sockopt = SO_SYNCHRONOUS_NONALERT; WSADATA w; if (WSAStartup(0x0101, &w) != 0) return -1; if (setsockopt(INVALID_SOCKET, SOL_SOCKET, SO_OPENTYPE, (void *) &sockopt, sizeof(sockopt)) < 0) { WSACleanup(); return -1; } }#endif#ifdef HAVE_OPENSSL SSL_load_error_strings(); (void) SSL_library_init();#endif inited = 1; return 0;}/* #define STREAM_DEBUG 1 *//* #define BSTREAM_DEBUG 1 */#ifdef STREAM_DEBUG/* code adapted from gdk.mx *//* define printf formats for printing size_t and ssize_t variables */#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901 && !defined(__svr4__) && !defined(WIN32) && !defined(__sgi)#define SZFMT "%zu"#elif SIZEOF_SIZE_T == SIZEOF_INT#define SZFMT "%u"#elif SIZEOF_SIZE_T == SIZEOF_LONG#define SZFMT "%lu"#elif SIZEOF_SIZE_T == SIZEOF_LONG_LONG#define SZFMT "%llu"#elif SIZEOF_SIZE_T == SIZEOF___INT64#define SZFMT "%I64u"#else#error no definition for SZFMT#endif/* define printf format for printing pointer values */#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901#define PTRFMT "%p"#define PTRFMTCAST /* no cast needed */#elif SIZEOF_VOID_P == SIZEOF_INT#define PTRFMT "%x"#define PTRFMTCAST (unsigned int)#elif SIZEOF_VOID_P == SIZEOF_LONG#define PTRFMT "%lx"#define PTRFMTCAST (unsigned long)#elif SIZEOF_VOID_P == SIZEOF_LONG_LONG#define PTRFMT "%llx"#define PTRFMTCAST (unsigned long long)#elif SIZEOF_VOID_P == SIZEOF___INT64#define PTRFMT "%I64x"#define PTRFMTCAST (unsigned __int64)#else#error no definition for PTRFMT#endif#endif /* STREAM_DEBUG *//* Read at most cnt elements of size elmsize from the stream. Returns the number of elements actually read. */ssize_tstream_read(stream *s, void *buf, size_t elmsize, size_t cnt){#ifdef STREAM_DEBUG printf("read %s " SZFMT " " SZFMT "\n", s->name ? s->name : "<unnamed>", elmsize, cnt);#endif assert(s->access == ST_READ); if (s->errnr) return s->errnr; return (*s->read) (s, buf, elmsize, cnt);}/* Read one line (seperated by \n) of atmost maxcnt characters from the stream. Returns the number of characters actually read. */ssize_tstream_readline(stream *s, void *buf, size_t maxcnt){#ifdef STREAM_DEBUG printf("readline %s " SZFMT "\n", s->name ? s->name : "<unnamed>", cnt);#endif assert(s->access == ST_READ); if (s->errnr) return s->errnr; if (!s->readline) { size_t len = 0; char *b = buf, *start = buf; while ((*s->read)(s, start, 1, 1) > 0 && len < maxcnt) { if (*start == '\n') break; start++; } if (s->errnr) return s->errnr; return (start - b); } else return (*s->readline) (s, buf, maxcnt);}/* Write cnt elements of size elmsize to the stream. Returns the number of elements actually written. If elmsize or cnt equals zero, returns cnt. */ssize_tstream_write(stream *s, const void *buf, size_t elmsize, size_t cnt){#ifdef STREAM_DEBUG printf("write %s " SZFMT " " SZFMT "\n", s->name ? s->name : "<unnamed>", elmsize, cnt);#endif assert(s->access == ST_WRITE); if (s->errnr) return s->errnr; return (*s->write) (s, buf, elmsize, cnt);}voidstream_close(stream *s){#ifdef STREAM_DEBUG printf("close %s\n", s->name ? s->name : "<unnamed>");#endif if (s) (*s->close) (s);}voidstream_destroy(stream *s){#ifdef STREAM_DEBUG printf("destroy %s\n", s->name ? s->name : "<unnamed>");#endif if (s) (*s->destroy) (s);}char *stream_error(stream *s){ if (s == 0) return "Connection terminated"; return (*s->error) (s);}intstream_flush(stream *s){#ifdef STREAM_DEBUG printf("flush %s\n", s->name ? s->name : "<unnamed>");#endif if (!s) return -1; assert(s->access == ST_WRITE); if (s->errnr) return s->errnr; if (s->flush) return (*s->flush) (s); return 0;}intstream_fsync(stream *s){#ifdef STREAM_DEBUG printf("fsync %s\n", s->name ? s->name : "<unnamed>");#endif if (!s) return -1; assert(s->access == ST_WRITE); if (s->errnr) return s->errnr; if (s->fsync) return (*s->fsync) (s); return 0;}char *stream_name(stream *s){ if (s == 0) return "connection terminated"; return s->name;}intstream_errnr(stream *s){ if (s == 0) return READ_ERROR; return s->errnr;}intstream_type(stream *s){ if (s == 0) return 0; return s->type;}intstream_byteorder(stream *s){ if (s == 0) return 0; return s->byteorder;}void stream_set_byteorder(stream *s, char bigendian){#ifdef STREAM_DEBUG printf("stream_set_byteorder %s, %d\n", s->name ? s->name : "<unnamed>");#endif assert(s->access == ST_READ); s->type = ST_BIN;#ifdef WORDS_BIGENDIAN s->byteorder = bigendian ? 1234 : 3412;#else s->byteorder = bigendian ? 3412 : 1234;#endif}voidclose_stream(stream *s){ s->close(s); s->destroy(s);}stream *stream_rstream(stream *s){#ifdef STREAM_DEBUG printf("stream_rstream %s\n", s->name ? s->name : "<unnamed>");#endif assert(s->access == ST_READ); s->type = ST_BIN; if (s->errnr == NO__ERROR) s->read(s, (void *) &s->byteorder, sizeof(s->byteorder), 1); return s;}stream *stream_wstream(stream *s){#ifdef STREAM_DEBUG printf("stream_wstream %s\n", s->name ? s->name : "<unnamed>");#endif assert(s->access == ST_WRITE); s->type = ST_BIN; if (s->errnr == NO__ERROR) s->write(s, (void *) &s->byteorder, sizeof(s->byteorder), 1); return s;}#define EXT_LEN 4static const char *get_extention(const char *file){ char *ext_start; return (ext_start = strrchr(file, '.')) != NULL ? ext_start + 1 : "";}static voiddestroy(stream *s){ free(s->name); free(s);}static char *error(stream *s){ char buf[BUFSIZ]; switch (s->errnr) { case OPEN_ERROR: snprintf(buf, BUFSIZ, "error could not open file %s\n", s->name); return strdup(buf); case READ_ERROR: snprintf(buf, BUFSIZ, "error reading file %s\n", s->name); return strdup(buf); case WRITE_ERROR: snprintf(buf, BUFSIZ, "error writing file %s\n", s->name); return strdup(buf); } return strdup("Unknown error");}static stream *create_stream(const char *name){ stream *s; if ((s = (stream *) malloc(sizeof(*s))) == NULL) return NULL; s->byteorder = 1234; s->access = ST_READ; s->type = ST_ASCII; s->name = strdup(name); s->stream_data.p = NULL; s->errnr = NO__ERROR; s->stream_data.p = NULL; s->read = NULL; s->readline = NULL; s->write = NULL; s->close = NULL; s->error = error; s->destroy = destroy; s->flush = NULL; s->fsync = NULL;#ifdef STREAM_DEBUG printf("create_stream %s -> " PTRFMT "\n", name ? name : "<unnamed>", PTRFMTCAST s);#endif return s;}static ssize_tfile_read(stream *s, void *buf, size_t elmsize, size_t cnt){ FILE *fp = (FILE *) s->stream_data.p; size_t rc = 0; if (!feof(fp)) { rc = fread(buf, elmsize, cnt, fp); if (ferror(fp)) s->errnr = READ_ERROR; } return rc;}static ssize_tfile_write(stream *s, const void *buf, size_t elmsize, size_t cnt){ if (elmsize && cnt) { size_t rc = fwrite(buf, elmsize, cnt, (FILE *) s->stream_data.p); if (rc < cnt) s->errnr = WRITE_ERROR; return (ssize_t) rc; } return cnt;}static voidfile_close(stream *s){ FILE *fp = (FILE *) s->stream_data.p; if (!fp) return; if (fp != stdin && fp != stdout && fp != stderr) fclose(fp); else if (s->access == ST_WRITE) fflush(fp); s->stream_data.p = NULL;}static intfile_flush(stream *s){ FILE *fp = (FILE*)s->stream_data.p; if (s->access == ST_WRITE && fflush(fp) < 0){ s->errnr = WRITE_ERROR; return 1; } return 0;}static intfile_fsync(stream *s){ FILE *fp = (FILE*)s->stream_data.p; if (s->access == ST_WRITE &&#ifdef NATIVE_WIN32 _commit(_fileno(fp)) < 0#else fsync(fileno(fp)) < 0#endif ){ s->errnr = WRITE_ERROR; return 1; } return 0;}static stream *open_stream(const char *filename, const char *flags){ stream *s; FILE *fp; if ((s = create_stream(filename)) == NULL) return NULL; if ((fp = fopen(filename, flags)) == NULL) s->errnr = OPEN_ERROR; s->read = file_read; s->readline = NULL; s->write = file_write; s->close = file_close; s->flush = file_flush; s->fsync = file_fsync; s->stream_data.p = (void *) fp; return s;}#ifdef HAVE_LIBZstatic ssize_tstream_gzread(stream *s, void *buf, size_t elmsize, size_t cnt){ gzFile *fp = (gzFile *) s->stream_data.p; int size = (int) (elmsize * cnt); if (!gzeof(fp)) { size = gzread(fp, buf, size); if (size) return size / elmsize; } return 0;}static ssize_tstream_gzwrite(stream *s, const void *buf, size_t elmsize, size_t cnt){ int size = (int) (elmsize * cnt); if (size) { size = gzwrite((gzFile *) s->stream_data.p, (void *) buf, size); return size / elmsize; } return cnt;}static voidstream_gzclose(stream *s){ if (s->stream_data.p) gzclose((gzFile *) s->stream_data.p); s->stream_data.p = NULL;}static intstream_gzflush(stream *s){ if (s->access == ST_WRITE) gzflush((gzFile *) s->stream_data.p, Z_SYNC_FLUSH); return 0;}static stream *open_gzstream(const char *filename, const char *flags){ stream *s; gzFile *fp; if ((s = create_stream(filename)) == NULL) return NULL; if ((fp = gzopen(filename, flags)) == NULL) s->errnr = OPEN_ERROR; s->read = stream_gzread; s->write = stream_gzwrite; s->close = stream_gzclose; s->flush = stream_gzflush; s->stream_data.p = (void *) fp; return s;}stream *open_gzrstream(const char *filename){ stream *s; if ((s = open_gzstream(filename, "rb")) == NULL) return NULL; s->type = ST_BIN; if (s->errnr == NO__ERROR) gzread((gzFile *) s->stream_data.p, (void *) &s->byteorder, sizeof(s->byteorder)); return s;}static stream *open_gzwstream_(const char *filename, const char *mode){ stream *s; if ((s = open_gzstream(filename, mode)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_BIN; if (s->errnr == NO__ERROR) gzwrite((gzFile *) s->stream_data.p, (void *) &s->byteorder, sizeof(s->byteorder)); return s;}stream *open_gzwstream(const char *filename){ return open_gzwstream_(filename, "wb");}stream *open_gzrastream(const char *filename){ stream *s; if ((s = open_gzstream(filename, "rb")) == NULL) return NULL; s->type = ST_ASCII; return s;}static stream *open_gzwastream_(const char *filename, const char *mode){ stream *s; if ((s = open_gzstream(filename, mode)) == NULL) return NULL; s->access = ST_WRITE; s->type = ST_ASCII; return s;}stream *open_gzwastream(const char *filename){ return open_gzwastream_(filename, "wb");}#endif#ifdef HAVE_LIBBZ2static ssize_tstream_bzread(stream *s, void *buf, size_t elmsize, size_t cnt){ int size = (int) (elmsize * cnt); size = BZ2_bzread((BZFILE *) s->stream_data.p, buf, size); if (size) return size / elmsize; return 0;}static ssize_tstream_bzwrite(stream *s, const void *buf, size_t elmsize, size_t cnt){ int size = (int) (elmsize * cnt); if (size) { size = BZ2_bzwrite((BZFILE *) s->stream_data.p, (void *) buf, size); return size / elmsize; } return cnt;}static voidstream_bzclose(stream *s){ if (s->stream_data.p) BZ2_bzclose((BZFILE *) s->stream_data.p); s->stream_data.p = NULL;}static intstream_bzflush(stream *s){ if (s->access == ST_WRITE) BZ2_bzflush((BZFILE *) s->stream_data.p); return 0;}static stream *open_bzstream(const char *filename, const char *flags){ stream *s; BZFILE *fp; if ((s = create_stream(filename)) == NULL) return NULL; if ((fp = BZ2_bzopen(filename, flags)) == NULL) s->errnr = OPEN_ERROR; s->read = stream_bzread; s->write = stream_bzwrite; s->close = stream_bzclose; s->flush = stream_bzflush; s->stream_data.p = (void *) fp; return s;}stream *open_bzrstream(const char *filename){ stream *s; if ((s = open_bzstream(filename, "rb")) == NULL) return NULL; s->type = ST_BIN; if (s->errnr == NO__ERROR) BZ2_bzread((BZFILE *) s->stream_data.p, (void *) &s->byteorder, sizeof(s->byteorder)); return s;}static stream *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -