⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stream.c

📁 这个是内存数据库中的一个管理工具
💻 C
📖 第 1 页 / 共 4 页
字号:
open_bzwstream_(const char *filename, const char *mode){	stream *s;	if ((s = open_bzstream(filename, mode)) == NULL)		return NULL;	s->access = ST_WRITE;	s->type = ST_BIN;	if (s->errnr == NO__ERROR)		BZ2_bzwrite((BZFILE *) s->stream_data.p, (void *) &s->byteorder, sizeof(s->byteorder));	return s;}stream *open_bzwstream(const char *filename){	return open_bzwstream_(filename, "wb");}stream *open_bzrastream(const char *filename){	stream *s;	if ((s = open_bzstream(filename, "rb")) == NULL)		return NULL;	s->type = ST_ASCII;	return s;}static stream *open_bzwastream_(const char *filename, const char *mode){	stream *s;	if ((s = open_bzstream(filename, mode)) == NULL)		return NULL;	s->access = ST_WRITE;	s->type = ST_ASCII;	return s;}stream *open_bzwastream(const char *filename){	return open_bzwastream_(filename, "wb");}#endifstream *open_rstream(const char *filename){	stream *s;	const char *ext;#ifdef STREAM_DEBUG	printf("open_rstream %s\n", filename);#endif	ext = get_extention(filename);	if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ		return open_gzrstream(filename);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2		return open_bzrstream(filename);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if ((s = open_stream(filename, "rb")) == NULL)		return NULL;	s->type = ST_BIN;	if (s->errnr == NO__ERROR) {		fread((void *) &s->byteorder, sizeof(s->byteorder), 1, (FILE *) s->stream_data.p);		if (ferror((FILE *) s->stream_data.p))			s->errnr = OPEN_ERROR;	}	return s;}static stream *open_wstream_(const char *filename, char *mode){	stream *s;	const char *ext;#ifdef STREAM_DEBUG	printf("open_wstream %s\n", filename);#endif	ext = get_extention(filename);	if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ		return open_gzwstream_(filename, mode);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2		return open_bzwstream_(filename, mode);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if ((s = open_stream(filename, mode)) == NULL)		return NULL;	s->access = ST_WRITE;	s->type = ST_BIN;	if (s->errnr == NO__ERROR)		fwrite((void *) &s->byteorder, sizeof(s->byteorder), 1, (FILE *) s->stream_data.p);	return s;}stream *open_wstream(const char *filename){	return open_wstream_(filename, "wb");}stream *append_wstream(const char *filename){	return open_wstream_(filename, "ab");}stream *open_rastream(const char *filename){	stream *s;	const char *ext;#ifdef STREAM_DEBUG	printf("open_rastream %s\n", filename);#endif	ext = get_extention(filename);	if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ		return open_gzrastream(filename);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2		return open_bzrastream(filename);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if ((s = open_stream(filename, "r")) == NULL)		return NULL;	s->type = ST_ASCII;	return s;}static stream *open_wastream_(const char *filename, char *mode){	stream *s;	const char *ext;#ifdef STREAM_DEBUG	printf("open_wastream %s\n", filename);#endif	ext = get_extention(filename);	if (strcmp(ext, "gz") == 0) {#ifdef HAVE_LIBZ		return open_gzwastream_(filename, mode);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if (strcmp(ext, "bz2") == 0) {#ifdef HAVE_LIBBZ2		return open_bzwastream_(filename, mode);#else		if ((s = create_stream(filename)) != NULL)			s->errnr = OPEN_ERROR;		return s;#endif	}	if ((s = open_stream(filename, mode)) == NULL)		return NULL;	s->access = ST_WRITE;	s->type = ST_ASCII;	return s;}stream *open_wastream(const char *filename){	return open_wastream_(filename, "w");}stream *append_wastream(const char *filename){	return open_wastream_(filename, "a");}#ifdef HAVE_CURL#include <curl/curl.h>#ifdef USE_CURL_MULTIstatic CURLM *multi_handle;#endifstruct curl_data {	CURL *handle;	char *buffer;		/* buffer to store incoming data */	size_t maxsize;		/* size of allocated buffer */	size_t usesize;		/* end of used data */	size_t offset;		/* start of unread data */	int running;		/* whether still transferring */#ifdef USE_CURL_MULTI	CURLMcode result;	/* result of transfer (if !running) */	struct curl_data *next;	/* linked list (curl_handles) */#endif};#ifdef USE_CURL_MULTIstatic struct curl_data *curl_handles;#endif/* this function is called by libcurl when there is data for us */static size_twrite_callback(char *buffer, size_t size, size_t nitems, void *userp){	stream *s = (stream *) userp;	struct curl_data *c = (struct curl_data *) s->stream_data.p;	size *= nitems;	/* allocate a buffer if we don't have one yet */	if (c->buffer == NULL && size != 0) {		/* BLOCK had better be a power of 2! */		c->maxsize = (size + BLOCK - 1) & ~(BLOCK - 1);		if ((c->buffer = malloc(c->maxsize)) == NULL)			return 0;		c->usesize = 0;		c->offset = 0;	}#ifndef USE_CURL_MULTI	/* move data if we don't have enough space */	if (c->maxsize - c->usesize < size && c->offset > 0) {		memmove(c->buffer, c->buffer + c->offset, c->usesize - c->offset);		c->usesize -= c->offset;		c->offset = 0;	}#endif	/* allocate more buffer space if we still don't have enough space */	if (c->maxsize - c->usesize < size) {		c->maxsize = (c->usesize + size + BLOCK - 1) & ~(BLOCK - 1);		c->buffer = realloc(c->buffer, c->usesize + size);	}	/* finally, store the data we received */	memcpy(c->buffer + c->usesize, buffer, size);	c->usesize += size;	return size;}static voidcurl_destroy(stream *s){	struct curl_data *c;#ifdef USE_CURL_MULTI	struct curl_data **cp;#endif	if ((c = (struct curl_data *) s->stream_data.p) != NULL) {		s->stream_data.p = NULL;#ifdef USE_CURL_MULTI		/* lock access to curl_handles */		cp = &curl_handles;		while (*cp && *cp != c)			cp = &(*cp)->next;		if (*cp)			*cp = c->next;		/* unlock access to curl_handles */#endif		if (c->handle) {#ifdef USE_CURL_MULTI			curl_multi_remove_handle(mult_handle, c->handle);#endif			curl_easy_cleanup(c->handle);		}		if (c->buffer)			free(c->buffer);		free(c);	}	destroy(s);}static ssize_tcurl_read(stream *s, void *buf, size_t elmsize, size_t cnt){	struct curl_data *c = (struct curl_data *) s->stream_data.p;	size_t size;	if (c->usesize - c->offset >= elmsize || !c->running) {		/* there is at least one element's worth of data		   available, or we have reached the end: return as		   much as we have, but no more than requested */		if (cnt * elmsize > c->usesize - c->offset)			cnt = (c->usesize - c->offset) / elmsize;		size = cnt * elmsize;		memcpy(buf, c->buffer + c->offset, size);		c->offset += size;		if (c->offset == c->usesize)			c->usesize = c->offset = 0;		return (ssize_t) cnt;	}	/* not enough data, we must wait until we get some */#ifndef USE_CURL_MULTI	return 0;#endif}static ssize_tcurl_write(stream *s, const void *buf, size_t elmsize, size_t cnt){	(void) s;	(void) buf;	(void) elmsize;	(void) cnt;	assert(0);	return -1;}static voidcurl_close(stream *s){	(void) s;}stream *open_urlstream(const char *url){	stream *s;	struct curl_data *c;#ifdef USE_CURL_MULTI	CURLMsg *msg;#endif	if ((c = malloc(sizeof(*c))) == NULL)		return NULL;	c->handle = NULL;	c->buffer = NULL;	c->maxsize = c->usesize = c->offset = 0;	c->running = 1;	if ((s = create_stream(url)) == NULL) {		free(c);		return NULL;	}#ifdef USE_CURL_MULTI	/* lock access to curl_handles */	c->next = curl_handles;	curl_handles = c;	/* unlock access to curl_handles */#endif	s->read = curl_read;	s->write = curl_write;	s->close = curl_close;	s->destroy = curl_destroy;	s->stream_data.p = (void *) c;	if ((c->handle = curl_easy_init()) == NULL) {		destroy(s);		return NULL;	}	curl_easy_setopt(c->handle, CURLOPT_URL, s->name);	curl_easy_setopt(c->handle, CURLOPT_WRITEDATA, s);	curl_easy_setopt(c->handle, CURLOPT_VERBOSE, FALSE);	curl_easy_setopt(c->handle, CURLOPT_NOSIGNAL, TRUE);	curl_easy_setopt(c->handle, CURLOPT_WRITEFUNCTION, write_callback);#ifdef USE_CURL_MULTI	if (multi_handle == NULL)		multi_handle = curl_multi_init();	curl_multi_add_handle(multi_handle, c->handle);	while (curl_multi_perform(multi_handle, NULL) == CURLM_CALL_MULTI_PERFORM)		;	while ((msg = curl_multi_info_read(multi_handle, NULL)) != NULL) {		struct curl_data *p;		/* lock access to curl_handles */		for (p = curl_handles; p; p = p->next) {			if (p->handle == msg->easy_handle) {				switch (msg->msg) {				case CURLMSG_DONE:					p->running = 0;					p->result = msg->data.result;					curl_multi_remove_handle(multi_handle, p->handle);					curl_easy_cleanup(p->handle);					p->handle = NULL;					break;				default:					break;				}				break;			}		}		/* unlock access to curl_handles */	}#else	if (curl_easy_perform(c->handle) != CURLE_OK)		s->errnr = OPEN_ERROR;	curl_easy_cleanup(c->handle);	c->handle = NULL;	c->running = 0;#endif	return s;}#endif /* HAVE_CURL */static ssize_tsocket_write(stream *s, const void *buf, size_t elmsize, size_t cnt){	ssize_t nr = 0, res = 0, size = elmsize * cnt;	if (!s || s->errnr) 		return -1;	if (size == 0 || elmsize == 0)		return cnt;	errno = 0;	while (res < size && (#ifdef NATIVE_WIN32				     /* send works on int, make sure the argument fits */				     ((nr = send(s->stream_data.s, (void *) ((char *) buf + res), (int) min(size - res, 1 << 16), 0)) > 0)#else				     ((nr = write(s->stream_data.s, (void *) ((char *) buf + res), size - res)) > 0)#endif				     || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)	    ) {		errno = 0;		if (nr > 0)			res += nr;	}	if (nr < 0) {		s->errnr = WRITE_ERROR;		return nr;	}	if (res > 0)		return res / elmsize;	s->errnr = WRITE_ERROR;	return -1;}static ssize_tsocket_read(stream *s, void *buf, size_t elmsize, size_t cnt){	ssize_t nr = 0, res = 0, size = elmsize * cnt;	if (!s || s->errnr) 		return(-1);	errno = 0;	while (res < size && (#ifdef NATIVE_WIN32				     /* recv works on int, make sure the argument fits */				     ((nr = recv(s->stream_data.s, (void *) ((char *) buf + res), (int) min(size - res, 1 << 16), 0)) > 0)#else				     ((nr = read(s->stream_data.s, (void *) ((char *) buf + res), size - res)) > 0)#endif				     || errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)	    ) {		errno = 0;		if (nr > 0)			res += nr;	}	if (nr < 0) {		s->errnr = READ_ERROR;		return nr;	}	return res / elmsize;}/* Read one line (seperated by \n) of at most maxcnt characters from the    stream. Returns the number of characters actually read. */static ssize_tsocket_readline(stream *s, void *buf, size_t maxcnt){	size_t len = 0;	char *b = buf, *start = buf;	while (socket_read(s, start, 1, 1) > 0 && len < maxcnt) {		if (*start == '\n')			break;		start++;	}	if (s->errnr) return s->errnr;	return (start - b);}static voidsocket_close(stream *s){	SOCKET fd = s->stream_data.s;	if (fd >= 0) {		/* Related read/write (in/out, from/to) streams		 * share a single socket which is not dup'ed (anymore)		 * as Windows' dup doesn't work on sockets;		 * hence, only one of the streams must/may close that		 * socket; we choose to let the read socket do the		 * job, since in mapi.mx it may happen that the read		 * stream is closed before the write stream was even		 * created.		 */		if (s->access == ST_READ) {#ifdef HAVE_SHUTDOWN			shutdown(fd, SHUT_RDWR);#endif			closesocket(fd);		}	}	s->stream_data.s = -1;}static stream *socket_open(SOCKET sock, const char *name){	stream *s;	if ((s = create_stream(name)) == NULL)		return NULL;	s->read = socket_read;	s->readline = socket_readline;	s->write = socket_write;	s->close = socket_close;	s->flush = NULL;	s->stream_data.s = sock;	errno = 0;#if defined(SO_KEEPALIVE) && !defined(WIN32)	{		int opt = 0;		if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *) &opt, sizeof(opt)) < 0) {			s->errnr = OPEN_ERROR;			return s;		}	}#endif#if defined(IPTOS_THROUGHPUT) && !defined(WIN32)	{		int tos = IPTOS_THROUGHPUT;		if (setsockopt(sock, IPPROTO_IP, IP_TOS, (void *) &tos, sizeof(tos)) < 0) {			s->errnr = OPEN_ERROR;			return s;		}	}#endif#ifdef TCP_NODELAY	{		int nodelay = 1;		if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *) &nodelay, sizeof(nodelay)) < 0) {			s->errnr = OPEN_ERROR;			return s;		}	}#endif#ifdef HAVE_FCNTL	{		int fl = fcntl(sock, F_GETFL);		fl &= ~O_NONBLOCK;		if (fcntl(sock, F_SETFL, fl) < 0) {			s->errnr = OPEN_ERROR;			return s;		}	}#endif	return s;}stream *socket_rstream(SOCKET sock, const char *name){	stream *s;#ifdef STREAM_DEBUG	printf("socket_rstream " SSZFMT " %s\n", (ssize_t) sock, name);#endif	if ((s = socket_open(sock, name)) == NULL)		return NULL;	s->type = ST_BIN;	if (s->errnr == NO__ERROR)		socket_read(s, (void *) &s->byteorder, sizeof(s->byteorder), 1);	return s;}stream *socket_wstream(SOCKET sock, const char *name){	stream *s;#ifdef STREAM_DEBUG	printf("socket_wstream " SSZFMT " %s\n", (ssize_t) sock, name);#endif	if ((s = socket_open(sock, name)) == NULL)		return NULL;	s->access = ST_WRITE;	s->type = ST_BIN;	if (s->errnr == NO__ERROR)		socket_write(s, (void *) &s->byteorder, sizeof(s->byteorder), 1);	return s;}stream *socket_rastream(SOCKET sock, const char *name){	stream *s;#ifdef STREAM_DEBUG	printf("socket_rastream " SSZFMT " %s\n", (ssize_t) sock, name);#endif	if ((s = socket_open(sock, name)) != NULL)		s->type = ST_ASCII;	return s;}stream *socket_wastream(SOCKET sock, const char *name){	stream *s;#ifdef STREAM_DEBUG	printf("socket_wastream " SSZFMT " %s\n", (ssize_t) sock, name);#endif	if ((s = socket_open(sock, name)) == NULL)		return NULL;	s->access = ST_WRITE;	s->type = ST_ASCII;	return s;}#ifdef HAVE_OPENSSLstruct ssl_data {	int error;	int ret;	SSL *ssl;};static ssize_tssl_read(stream *s, void *buf, size_t elmsize, size_t cnt){	SSL *ssl;	size_t res = 0, size = elmsize * cnt;	if (!s || s->errnr) 		return(-1);	assert(s->stream_data.p);	ssl = ((struct ssl_data *) s->stream_data.p)->ssl;	assert(ssl);	while (res < size) {		int nr, err;		nr = SSL_read(ssl, (void *) ((char *) buf + res), (int) (size - res));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -