⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stream.mx

📁 这个是内存数据库中的一个管理工具
💻 MX
📖 第 1 页 / 共 5 页
字号:
@' The contents of this file are subject to the MonetDB Public License@' Version 1.1 (the "License"); you may not use this file except in@' compliance with the License. You may obtain a copy of the License at@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html@'@' Software distributed under the License is distributed on an "AS IS"@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the@' License for the specific language governing rights and limitations@' under the License.@'@' The Original Code is the MonetDB Database System.@'@' The Initial Developer of the Original Code is CWI.@' Portions created by CWI are Copyright (C) 1997-2007 CWI.@' All Rights Reserved.@f stream@a Niels Nes@* An simple interface to streamsProcessing files, streams, and sockets is quite different on Linux and Windowsplatforms. To improve portability between both, we advise to replace the stdioactions with the stream functionality provided here.This interface can also be used to open 'non compressed, gzipped, bz2zipped'data files and sockets. Using this interface one could easily switch betweenthe various underlying storage types.@h#ifndef _STREAM_H_#define _STREAM_H_/* * File: stream.h * Auteur: Niels J. Nes * Date: 09-01-2001 * * Version 0.1: start * * This is the general interface to input/output. Each stream will * contains some stream info (for now only byteorder). This is * required for proper conversion on different byte order platforms. */#include <monet_utils.h>#include <unistd.h>#include <ctype.h>#include <assert.h>#ifdef HAVE_MALLOC_H#include <malloc.h>#endif#include <signal.h>#include <limits.h>#ifdef HAVE_SYS_SOCKET_H#include <sys/socket.h>#endif#ifdef NATIVE_WIN32# include <winsock.h>#endif#ifdef NATIVE_WIN32#ifndef LIBSTREAM#define stream_export extern __declspec(dllimport)#else#define stream_export extern __declspec(dllexport)#endif#else#define stream_export extern#endif#define EOT 4#define ST_ASCII  0#define ST_BIN 1#define ST_READ  0#define ST_WRITE 1#define short_int_SWAP(s) ((short)(((0x00ff&(s))<<8) | ((0xff00&(s))>>8)))#define normal_int_SWAP(i) (((0x000000ff&(i))<<24) | ((0x0000ff00&(i))<<8) | \	               ((0x00ff0000&(i))>>8)  | ((0xff000000&(i))>>24))#define long_long_SWAP(l) \		((((lng)normal_int_SWAP(l))<<32) |\		 (0xffffffff&normal_int_SWAP(l>>32)))typedef struct stream stream;/* some os specific initialization */stream_export int stream_init(void);/* all stream_readX/stream_writeX    return 	0 on error        !0 on success */stream_export int stream_readBte(stream *s, signed char *val);stream_export int stream_writeBte(stream *s, signed char val);stream_export int stream_readSht(stream *s, short *val);stream_export int stream_writeSht(stream *s, short val);stream_export int stream_readInt(stream *s, int *val);stream_export int stream_writeInt(stream *s, int val);stream_export int stream_readLng(stream *s, lng *val);stream_export int stream_writeLng(stream *s, lng val);stream_export int stream_readBteArray(stream *s, signed char *val, size_t cnt);stream_export int stream_writeBteArray(stream *s, const signed char *val, size_t cnt);stream_export int stream_readShtArray(stream *s, short *val, size_t cnt);stream_export int stream_writeShtArray(stream *s, const short *val, size_t cnt);stream_export int stream_readIntArray(stream *s, int *val, size_t cnt);stream_export int stream_writeIntArray(stream *s, const int *val, size_t cnt);stream_export int stream_readLngArray(stream *s, lng *val, size_t cnt);stream_export int stream_writeLngArray(stream *s, const lng *val, size_t cnt);stream_export int stream_printf(stream *s, const char *format, ...);stream_export ssize_t stream_read(stream *s, void *buf, size_t elmsize, size_t cnt);stream_export ssize_t stream_readline(stream *s, void *buf, size_t maxcnt);stream_export ssize_t stream_write(stream *s, const void *buf, size_t elmsize, size_t cnt);stream_export void stream_close(stream *s);stream_export void stream_destroy(stream *s);stream_export char *stream_error(stream *s);stream_export int stream_flush(stream *s);stream_export int stream_fsync(stream *s);stream_export char *stream_name(stream *s);stream_export int stream_errnr(stream *s);stream_export int stream_type(stream *s);stream_export int stream_byteorder(stream *s);stream_export void stream_set_byteorder(stream *s, char bigendian);stream_export stream *stream_rstream(stream *s);stream_export stream *stream_wstream(stream *s);stream_export stream *open_rstream(const char *filename);stream_export stream *open_wstream(const char *filename);/* append to stream */stream_export stream *append_wstream(const char *filename);/* open in ascii stream in read mode */stream_export stream *open_rastream(const char *filename);/* open in ascii stream in write mode*/stream_export stream *open_wastream(const char *filename);/* append to ascii stream */stream_export stream *append_wastream(const char *filename);#ifdef HAVE_LIBZstream_export stream *open_gzrstream(const char *filename);stream_export stream *open_gzwstream(const char *filename);stream_export stream *open_gzrastream(const char *filename);stream_export stream *open_gzwastream(const char *filename);#endif#ifdef HAVE_LIBBZ2stream_export stream *open_bzrstream(const char *filename);stream_export stream *open_bzwstream(const char *filename);stream_export stream *open_bzrastream(const char *filename);stream_export stream *open_bzwastream(const char *filename);#endifstream_export void close_stream(stream *s);#ifdef HAVE_CURLstream_export stream * open_urlstream(const char *url);#endifstream_export stream *socket_rstream(SOCKET socket, const char *name);stream_export stream *socket_wstream(SOCKET socket, const char *name);stream_export stream *socket_rastream(SOCKET socket, const char *name);stream_export stream *socket_wastream(SOCKET socket, const char *name);#ifdef HAVE_OPENSSL#include <openssl/ssl.h>#include <openssl/err.h>stream_export stream *ssl_rstream(SSL * ssl, const char *name);stream_export stream *ssl_wstream(SSL * ssl, const char *name);stream_export stream *ssl_rastream(SSL * ssl, const char *name);stream_export stream *ssl_wastream(SSL * ssl, const char *name);#endifstream_export stream *file_rstream(FILE *fp, const char *name);stream_export stream *file_wstream(FILE *fp, const char *name);stream_export stream *file_rastream(FILE *fp, const char *name);stream_export stream *file_wastream(FILE *fp, const char *name);stream_export int rendezvous_streams(stream **in, stream **out, const char *name);typedef struct buffer {	char *buf;	size_t pos;	size_t len;} buffer;stream_export void buffer_init(buffer *b, char *buf, size_t size);stream_export buffer *buffer_create(size_t size);stream_export char *buffer_get_buf(buffer *b);stream_export void buffer_destroy(buffer *b);stream_export stream *buffer_rastream(buffer *b, const char *name);stream_export stream *buffer_wastream(buffer *b, const char *name);stream_export buffer *stream_get_buffer(stream *s);/* note, the size is fixed to 8K, you cannot simply change it to any   value */#define BLOCK (8 * 1024 - 2)/*   Block stream is a stream which sends data in blocks of a known   size (BLOCK size or dynamically changed using CHANGE_BLOCK_SIZE msg).   A block is written once more then BLOCK size data has been written using   the write commands or when the flush command is sent.   All full blocks together with a single not full block form a major   block. Major blocks can be used to synchronize the communication.   Example server sends some reply, ie a major block consisting of   various minor blocks. The header of the major block can contain   special info which the client can interpret.   Each read attempt tries to return the number of bytes. Once a lower number   of bytes can be read the end of the major block is found. The next   read will then start with a new major block. */stream_export stream *block_stream(stream *s);stream_export ssize_t bs_read_next(stream *s, void *buf, size_t nbytes, int *last);stream_export int isa_block_stream(stream *s);/* read block of data including the end of block marker */stream_export ssize_t stream_read_block(stream *s, void *buf, size_t elmsize, size_t cnt);@+ buffered streamsThe bstream (or buffered_stream) can be used for efficient readingof a stream. Reading can be done in large chunks andaccess can be done in smaller bits, by directly accessing the underlyingbuffer.Beware that a flush on a buffered stream emits an empty block tosynchronize with the other side, telling it has reached the endof the sequence and can close its descriptors.@htypedef struct bstream {	stream *s;	char *buf;	size_t size;		/* size of buf */	size_t pos;		/* the data cursor (ie read uptil pos) */	size_t len;		/* len of the data (could < size but usually == size) */	int eof;	int mode;		/* 0 line mode else size for block mode */} bstream;stream_export bstream *bstream_create(stream *rs, size_t chunk_size);stream_export void bstream_destroy(bstream *s);stream_export ssize_t bstream_read(bstream *s, size_t size);stream_export ssize_t bstream_next(bstream *s);@+ bstream functionsThe bstream_create gets a read stream (rs) as input and the initial chunk sizeand creates a buffered stream from this. A spare byte is kept at the endof the buffer.The bstream_read will at least read the next 'size' bytes. If the not read data(aka pos < len) together with the new data will not fit in the current bufferit is resized. The spare byte is kept.@htypedef enum stream_errors {	NO__ERROR = 0,	OPEN_ERROR,	READ_ERROR,	WRITE_ERROR} stream_errors;#endif /*_STREAM_H_*/@c#include "monetdb_config.h"#include "stream.h"#include <string.h>#ifdef HAVE_NETDB_H# include <sys/types.h># include <netinet/in_systm.h># include <netinet/in.h># include <netinet/ip.h># include <netinet/tcp.h># include <netdb.h>#endif#ifdef NATIVE_WIN32#include <winsock.h>#include <io.h>#endif#ifdef HAVE_FCNTL_H#include <fcntl.h>#endif#ifdef HAVE_LIBZ#include <zlib.h>#endif#ifdef HAVE_LIBBZ2#include <bzlib.h>#endif#ifndef SHUT_RD#define SHUT_RD		0#define SHUT_WR		1#define SHUT_RDWR	2#endif#ifndef EWOULDBLOCK#define EWOULDBLOCK EAGAIN#endif#ifndef EINTR#define EINTR 	EAGAIN#endifstruct stream {	short byteorder;	short access;		/* read/write */	short type;		/* ascii/binary */	char *name;	union {		void *p;		int i;		SOCKET s;	} stream_data;	int errnr;	ssize_t (*read) (stream *s, void *buf, size_t elmsize, size_t cnt);	ssize_t (*readline) (stream *s, void *buf, size_t maxcnt);	ssize_t (*write) (stream *s, const void *buf, size_t elmsize, size_t cnt);	void (*close) (stream *s);	char *(*error) (stream *s);	void (*destroy) (stream *s);	int (*flush) (stream *s);	int (*fsync) (stream *s);};#if !HAVE_DECL_STRDUP#ifdef HAVE_STRDUPextern char *strdup(const char *);#else#define strdup(s)	strcpy(malloc(strlen(s)+1),(s))#endif#endif#ifdef NATIVE_WIN32#define strdup _strdup#endifintstream_init(void){	static int inited = 0;	if (inited)		return 0;#ifdef NATIVE_WIN32	{		int sockopt = SO_SYNCHRONOUS_NONALERT;		WSADATA w;		if (WSAStartup(0x0101, &w) != 0)			return -1;		if (setsockopt(INVALID_SOCKET, SOL_SOCKET, SO_OPENTYPE, (void *) &sockopt, sizeof(sockopt)) < 0) {			WSACleanup();			return -1;		}	}#endif#ifdef HAVE_OPENSSL	SSL_load_error_strings();	(void) SSL_library_init();#endif	inited = 1;	return 0;}/* #define STREAM_DEBUG 1  *//* #define BSTREAM_DEBUG 1 */#ifdef STREAM_DEBUG/* code adapted from gdk.mx *//* define printf formats for printing size_t and ssize_t variables */#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901 && !defined(__svr4__) && !defined(WIN32) && !defined(__sgi)#define SZFMT "%zu"#elif SIZEOF_SIZE_T == SIZEOF_INT#define SZFMT "%u"#elif SIZEOF_SIZE_T == SIZEOF_LONG#define SZFMT "%lu"#elif SIZEOF_SIZE_T == SIZEOF_LONG_LONG#define SZFMT "%llu"#elif SIZEOF_SIZE_T == SIZEOF___INT64#define SZFMT "%I64u"#else#error no definition for SZFMT#endif/* define printf format for printing pointer values */#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901#define PTRFMT		"%p"#define PTRFMTCAST		/* no cast needed */#elif SIZEOF_VOID_P == SIZEOF_INT#define PTRFMT		"%x"#define PTRFMTCAST	(unsigned int)#elif SIZEOF_VOID_P == SIZEOF_LONG#define PTRFMT		"%lx"#define PTRFMTCAST	(unsigned long)#elif SIZEOF_VOID_P == SIZEOF_LONG_LONG#define PTRFMT		"%llx"#define PTRFMTCAST	(unsigned long long)#elif SIZEOF_VOID_P == SIZEOF___INT64#define PTRFMT		"%I64x"#define PTRFMTCAST	(unsigned __int64)#else#error no definition for PTRFMT#endif#endif /* STREAM_DEBUG *//* Read at most cnt elements of size elmsize from the stream.  Returns   the number of elements actually read. */ssize_tstream_read(stream *s, void *buf, size_t elmsize, size_t cnt){#ifdef STREAM_DEBUG	printf("read %s " SZFMT " " SZFMT "\n", s->name ? s->name : "<unnamed>", elmsize, cnt);#endif	assert(s->access == ST_READ);	if (s->errnr) return s->errnr;	return (*s->read) (s, buf, elmsize, cnt);}/* Read one line (seperated by \n) of atmost maxcnt characters from the stream.    Returns the number of characters actually read. */ssize_tstream_readline(stream *s, void *buf, size_t maxcnt){#ifdef STREAM_DEBUG	printf("readline %s " SZFMT "\n", s->name ? s->name : "<unnamed>", cnt);#endif	assert(s->access == ST_READ);	if (s->errnr) return s->errnr;	if (!s->readline) {		size_t len = 0;		char *b = buf, *start = buf;		while ((*s->read)(s, start, 1, 1) > 0 && len < maxcnt) {			if (*start == '\n')				break;			start++;		}		if (s->errnr) return s->errnr;		return (start - b);	} else		return (*s->readline) (s, buf, maxcnt);}/* Write cnt elements of size elmsize to the stream.  Returns the   number of elements actually written.  If elmsize or cnt equals zero,   returns cnt. */ssize_tstream_write(stream *s, const void *buf, size_t elmsize, size_t cnt){#ifdef STREAM_DEBUG	printf("write %s " SZFMT " " SZFMT "\n", s->name ? s->name : "<unnamed>", elmsize, cnt);#endif	assert(s->access == ST_WRITE);	if (s->errnr) return s->errnr;	return (*s->write) (s, buf, elmsize, cnt);}voidstream_close(stream *s){#ifdef STREAM_DEBUG	printf("close %s\n", s->name ? s->name : "<unnamed>");#endif	if (s)		(*s->close) (s);}voidstream_destroy(stream *s){#ifdef STREAM_DEBUG	printf("destroy %s\n", s->name ? s->name : "<unnamed>");#endif	if (s)		(*s->destroy) (s);}char *stream_error(stream *s){	if (s == 0)		return "Connection terminated";	return (*s->error) (s);}intstream_flush(stream *s){#ifdef STREAM_DEBUG	printf("flush %s\n", s->name ? s->name : "<unnamed>");#endif	if (!s) return -1;	assert(s->access == ST_WRITE);	if (s->errnr) return s->errnr;	if (s->flush)		return (*s->flush) (s);	return 0;}intstream_fsync(stream *s){#ifdef STREAM_DEBUG	printf("fsync %s\n", s->name ? s->name : "<unnamed>");#endif	if (!s) return -1;	assert(s->access == ST_WRITE);	if (s->errnr) return s->errnr;	if (s->fsync)		return (*s->fsync) (s);	return 0;}char *stream_name(stream *s){	if (s == 0)		return "connection terminated";	return s->name;}intstream_errnr(stream *s){	if (s == 0)		return READ_ERROR;	return s->errnr;}intstream_type(stream *s){	if (s == 0)		return 0;	return s->type;}intstream_byteorder(stream *s){	if (s == 0)		return 0;	return s->byteorder;}void stream_set_byteorder(stream *s, char bigendian){#ifdef STREAM_DEBUG	printf("stream_set_byteorder %s, %d\n", s->name ? s->name : "<unnamed>");#endif	assert(s->access == ST_READ);	s->type = ST_BIN;#ifdef WORDS_BIGENDIAN	s->byteorder = bigendian ? 1234 : 3412;#else	s->byteorder = bigendian ? 3412 : 1234;#endif}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -