⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 io.c

📁 Rsync 3.0.5 source code
💻 C
📖 第 1 页 / 共 3 页
字号:
/* * Socket and pipe I/O utilities used in rsync. * * Copyright (C) 1996-2001 Andrew Tridgell * Copyright (C) 1996 Paul Mackerras * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org> * Copyright (C) 2003-2008 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, visit the http://fsf.org website. *//* Rsync provides its own multiplexing system, which is used to send * stderr and stdout over a single socket. * * For historical reasons this is off during the start of the * connection, but it's switched on quite early using * io_start_multiplex_out() and io_start_multiplex_in(). */#include "rsync.h"#include "ifuncs.h"/** If no timeout is specified then use a 60 second select timeout */#define SELECT_TIMEOUT 60extern int bwlimit;extern size_t bwlimit_writemax;extern int io_timeout;extern int allowed_lull;extern int am_server;extern int am_daemon;extern int am_sender;extern int am_generator;extern int inc_recurse;extern int io_error;extern int eol_nulls;extern int flist_eof;extern int list_only;extern int read_batch;extern int csum_length;extern int protect_args;extern int checksum_seed;extern int protocol_version;extern int remove_source_files;extern int preserve_hard_links;extern struct stats stats;extern struct file_list *cur_flist;#ifdef ICONV_OPTIONextern int filesfrom_convert;extern iconv_t ic_send, ic_recv;#endifconst char phase_unknown[] = "unknown";int ignore_timeout = 0;int batch_fd = -1;int msgdone_cnt = 0;/* Ignore an EOF error if non-zero. See whine_about_eof(). */int kluge_around_eof = 0;int msg_fd_in = -1;int msg_fd_out = -1;int sock_f_in = -1;int sock_f_out = -1;static int iobuf_f_in = -1;static char *iobuf_in;static size_t iobuf_in_siz;static size_t iobuf_in_ndx;static size_t iobuf_in_remaining;static int iobuf_f_out = -1;static char *iobuf_out;static int iobuf_out_cnt;int flist_forward_from = -1;static int io_multiplexing_out;static int io_multiplexing_in;static time_t last_io_in;static time_t last_io_out;static int no_flush;static int write_batch_monitor_in = -1;static int write_batch_monitor_out = -1;static int io_filesfrom_f_in = -1;static int io_filesfrom_f_out = -1;static xbuf ff_buf = EMPTY_XBUF;static char ff_lastchar;#ifdef ICONV_OPTIONstatic xbuf iconv_buf = EMPTY_XBUF;#endifstatic int defer_forwarding_messages = 0, keep_defer_forwarding = 0;static int select_timeout = SELECT_TIMEOUT;static int active_filecnt = 0;static OFF_T active_bytecnt = 0;static int first_message = 1;static char int_byte_extra[64] = {	0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */	0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */	1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */	2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */};#define REMOTE_OPTION_ERROR "rsync: on remote machine: -"#define REMOTE_OPTION_ERROR2 ": unknown option"enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND };static void readfd(int fd, char *buffer, size_t N);static void writefd(int fd, const char *buf, size_t len);static void writefd_unbuffered(int fd, const char *buf, size_t len);static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);static flist_ndx_list redo_list, hlink_list;struct msg_list_item {	struct msg_list_item *next;	char convert;	char buf[1];};struct msg_list {	struct msg_list_item *head, *tail;};static struct msg_list msg_queue;static void got_flist_entry_status(enum festatus status, const char *buf){	int ndx = IVAL(buf, 0);	struct file_list *flist = flist_for_ndx(ndx, "got_flist_entry_status");	if (remove_source_files) {		active_filecnt--;		active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);	}	if (inc_recurse)		flist->in_progress--;	switch (status) {	case FES_SUCCESS:		if (remove_source_files)			send_msg(MSG_SUCCESS, buf, 4, 0);		if (preserve_hard_links) {			struct file_struct *file = flist->files[ndx - flist->ndx_start];			if (F_IS_HLINKED(file)) {				flist_ndx_push(&hlink_list, ndx);				flist->in_progress++;			}		}		break;	case FES_REDO:		if (read_batch) {			if (inc_recurse)				flist->in_progress++;			break;		}		if (inc_recurse)			flist->to_redo++;		flist_ndx_push(&redo_list, ndx);		break;	case FES_NO_SEND:		break;	}}static void check_timeout(void){	time_t t;	if (!io_timeout || ignore_timeout)		return;	if (!last_io_in) {		last_io_in = time(NULL);		return;	}	t = time(NULL);	if (t - last_io_in >= io_timeout) {		if (!am_server && !am_daemon) {			rprintf(FERROR, "io timeout after %d seconds -- exiting\n",				(int)(t-last_io_in));		}		exit_cleanup(RERR_TIMEOUT);	}}/* Note the fds used for the main socket (which might really be a pipe * for a local transfer, but we can ignore that). */void io_set_sock_fds(int f_in, int f_out){	sock_f_in = f_in;	sock_f_out = f_out;}void set_io_timeout(int secs){	io_timeout = secs;	if (!io_timeout || io_timeout > SELECT_TIMEOUT)		select_timeout = SELECT_TIMEOUT;	else		select_timeout = io_timeout;	allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;}/* Setup the fd used to receive MSG_* messages.  Only needed during the * early stages of being a local sender (up through the sending of the * file list) or when we're the generator (to fetch the messages from * the receiver). */void set_msg_fd_in(int fd){	msg_fd_in = fd;}/* Setup the fd used to send our MSG_* messages.  Only needed when * we're the receiver (to send our messages to the generator). */void set_msg_fd_out(int fd){	msg_fd_out = fd;	set_nonblocking(msg_fd_out);}/* Add a message to the pending MSG_* list. */static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert){	struct msg_list_item *m;	int sz = len + 4 + sizeof m[0] - 1;	if (!(m = (struct msg_list_item *)new_array(char, sz)))		out_of_memory("msg_list_add");	m->next = NULL;	m->convert = convert;	SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);	memcpy(m->buf + 4, buf, len);	if (lst->tail)		lst->tail->next = m;	else		lst->head = m;	lst->tail = m;}static inline int flush_a_msg(int fd){	struct msg_list_item *m = msg_queue.head;	int len = IVAL(m->buf, 0) & 0xFFFFFF;	int tag = *((uchar*)m->buf+3) - MPLEX_BASE;	if (!(msg_queue.head = m->next))		msg_queue.tail = NULL;	defer_forwarding_messages++;	mplex_write(fd, tag, m->buf + 4, len, m->convert);	defer_forwarding_messages--;	free(m);	return len;}static void msg_flush(void){	if (am_generator) {		while (msg_queue.head && io_multiplexing_out)			stats.total_written += flush_a_msg(sock_f_out) + 4;	} else {		while (msg_queue.head)			(void)flush_a_msg(msg_fd_out);	}}static void check_for_d_option_error(const char *msg){	static char rsync263_opts[] = "BCDHIKLPRSTWabceghlnopqrtuvxz";	char *colon;	int saw_d = 0;	if (*msg != 'r'	 || strncmp(msg, REMOTE_OPTION_ERROR, sizeof REMOTE_OPTION_ERROR - 1) != 0)		return;	msg += sizeof REMOTE_OPTION_ERROR - 1;	if (*msg == '-' || (colon = strchr(msg, ':')) == NULL	 || strncmp(colon, REMOTE_OPTION_ERROR2, sizeof REMOTE_OPTION_ERROR2 - 1) != 0)		return;	for ( ; *msg != ':'; msg++) {		if (*msg == 'd')			saw_d = 1;		else if (*msg == 'e')			break;		else if (strchr(rsync263_opts, *msg) == NULL)			return;	}	if (saw_d) {		rprintf(FWARNING,		    "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n");	}}/* Read a message from the MSG_* fd and handle it.  This is called either * during the early stages of being a local sender (up through the sending * of the file list) or when we're the generator (to fetch the messages * from the receiver). */static void read_msg_fd(void){	char buf[2048];	size_t n;	struct file_list *flist;	int fd = msg_fd_in;	int tag, len;	/* Temporarily disable msg_fd_in.  This is needed to avoid looping back	 * to this routine from writefd_unbuffered(). */	no_flush++;	msg_fd_in = -1;	defer_forwarding_messages++;	readfd(fd, buf, 4);	tag = IVAL(buf, 0);	len = tag & 0xFFFFFF;	tag = (tag >> 24) - MPLEX_BASE;	switch (tag) {	case MSG_DONE:		if (len < 0 || len > 1 || !am_generator) {		  invalid_msg:			rprintf(FERROR, "invalid message %d:%d [%s%s]\n",				tag, len, who_am_i(),				inc_recurse ? "/inc" : "");			exit_cleanup(RERR_STREAMIO);		}		if (len) {			readfd(fd, buf, len);			stats.total_read = read_varlong(fd, 3);		}		msgdone_cnt++;		break;	case MSG_REDO:		if (len != 4 || !am_generator)			goto invalid_msg;		readfd(fd, buf, 4);		got_flist_entry_status(FES_REDO, buf);		break;	case MSG_FLIST:		if (len != 4 || !am_generator || !inc_recurse)			goto invalid_msg;		readfd(fd, buf, 4);		/* Read extra file list from receiver. */		assert(iobuf_in != NULL);		assert(iobuf_f_in == fd);		if (verbose > 3) {			rprintf(FINFO, "[%s] receiving flist for dir %d\n",				who_am_i(), IVAL(buf,0));		}		flist = recv_file_list(fd);		flist->parent_ndx = IVAL(buf,0);#ifdef SUPPORT_HARD_LINKS		if (preserve_hard_links)			match_hard_links(flist);#endif		break;	case MSG_FLIST_EOF:		if (len != 0 || !am_generator || !inc_recurse)			goto invalid_msg;		flist_eof = 1;		break;	case MSG_IO_ERROR:		if (len != 4)			goto invalid_msg;		readfd(fd, buf, len);		io_error |= IVAL(buf, 0);		break;	case MSG_DELETED:		if (len >= (int)sizeof buf || !am_generator)			goto invalid_msg;		readfd(fd, buf, len);		send_msg(MSG_DELETED, buf, len, 1);		break;	case MSG_SUCCESS:		if (len != 4 || !am_generator)			goto invalid_msg;		readfd(fd, buf, 4);		got_flist_entry_status(FES_SUCCESS, buf);		break;	case MSG_NO_SEND:		if (len != 4 || !am_generator)			goto invalid_msg;		readfd(fd, buf, 4);		got_flist_entry_status(FES_NO_SEND, buf);		break;	case MSG_ERROR_SOCKET:	case MSG_ERROR_UTF8:	case MSG_CLIENT:		if (!am_generator)			goto invalid_msg;		if (tag == MSG_ERROR_SOCKET)			io_end_multiplex_out();		/* FALL THROUGH */	case MSG_INFO:	case MSG_ERROR:	case MSG_ERROR_XFER:	case MSG_WARNING:	case MSG_LOG:		while (len) {			n = len;			if (n >= sizeof buf)				n = sizeof buf - 1;			readfd(fd, buf, n);			rwrite((enum logcode)tag, buf, n, !am_generator);			len -= n;		}		break;	default:		rprintf(FERROR, "unknown message %d:%d [%s]\n",			tag, len, who_am_i());		exit_cleanup(RERR_STREAMIO);	}	no_flush--;	msg_fd_in = fd;	if (!--defer_forwarding_messages && !no_flush)		msg_flush();}/* This is used by the generator to limit how many file transfers can * be active at once when --remove-source-files is specified.  Without * this, sender-side deletions were mostly happening at the end. */void increment_active_files(int ndx, int itemizing, enum logcode code){	while (1) {		/* TODO: tune these limits? */		int limit = active_bytecnt >= 128*1024 ? 10 : 50;		if (active_filecnt < limit)			break;		check_for_finished_files(itemizing, code, 0);		if (active_filecnt < limit)			break;		if (iobuf_out_cnt)			io_flush(NORMAL_FLUSH);		else			read_msg_fd();	}	active_filecnt++;	active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);}/* Write an message to a multiplexed stream. If this fails, rsync exits. */static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert){	char buffer[BIGPATHBUFLEN]; /* Oversized for use by iconv code. */	size_t n = len;#ifdef ICONV_OPTION	/* We need to convert buf before doing anything else so that we	 * can include the (converted) byte length in the message header. */	if (convert && ic_send != (iconv_t)-1) {		xbuf outbuf, inbuf;		INIT_XBUF(outbuf, buffer + 4, 0, sizeof buffer - 4);		INIT_XBUF(inbuf, (char*)buf, len, -1);		iconvbufs(ic_send, &inbuf, &outbuf,			  ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE);		if (inbuf.len > 0) {			rprintf(FERROR, "overflowed conversion buffer in mplex_write");			exit_cleanup(RERR_UNSUPPORTED);		}		n = len = outbuf.len;	} else#endif	if (n > 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */		n = 0;    /* We'd rather do 2 writes than too much memcpy(). */	else		memcpy(buffer + 4, buf, n);	SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);	keep_defer_forwarding++; /* defer_forwarding_messages++ on return */	writefd_unbuffered(fd, buffer, n+4);	keep_defer_forwarding--;	if (len > n)		writefd_unbuffered(fd, buf+n, len-n);	if (!--defer_forwarding_messages && !no_flush)		msg_flush();}int send_msg(enum msgcode code, const char *buf, int len, int convert){	if (msg_fd_out < 0) {		if (!defer_forwarding_messages)			return io_multiplex_write(code, buf, len, convert);		if (!io_multiplexing_out)			return 0;		msg_list_add(&msg_queue, code, buf, len, convert);		return 1;	}	if (flist_forward_from >= 0)		msg_list_add(&msg_queue, code, buf, len, convert);	else		mplex_write(msg_fd_out, code, buf, len, convert);	return 1;}void send_msg_int(enum msgcode code, int num){	char numbuf[4];	SIVAL(numbuf, 0, num);	send_msg(code, numbuf, 4, 0);}void wait_for_receiver(void){	if (io_flush(NORMAL_FLUSH))		return;	read_msg_fd();}int get_redo_num(void){	return flist_ndx_pop(&redo_list);}int get_hlink_num(void){	return flist_ndx_pop(&hlink_list);}/** * When we're the receiver and we have a local --files-from list of names * that needs to be sent over the socket to the sender, we have to do two * things at the same time: send the sender a list of what files we're * processing and read the incoming file+info list from the sender.  We do * this by augmenting the read_timeout() function to copy this data.  It * uses ff_buf to read a block of data from f_in (when it is ready, since * it might be a pipe) and then blast it out f_out (when it is ready to * receive more data). */void io_set_filesfrom_fds(int f_in, int f_out){	io_filesfrom_f_in = f_in;	io_filesfrom_f_out = f_out;	alloc_xbuf(&ff_buf, 2048);#ifdef ICONV_OPTION	if (protect_args)		alloc_xbuf(&iconv_buf, 1024);#endif}/* It's almost always an error to get an EOF when we're trying to read from the * network, because the protocol is (for the most part) self-terminating. * * There is one case for the receiver when it is at the end of the transfer * (hanging around reading any keep-alive packets that might come its way): if * the sender dies before the generator's kill-signal comes through, we can end * up here needing to loop until the kill-signal arrives.  In this situation, * kluge_around_eof will be < 0. * * There is another case for older protocol versions (< 24) where the module * listing was not terminated, so we must ignore an EOF error in that case and * exit.  In this situation, kluge_around_eof will be > 0. */static void whine_about_eof(int fd){	if (kluge_around_eof && fd == sock_f_in) {		int i;		if (kluge_around_eof > 0)			exit_cleanup(0);		/* If we're still here after 10 seconds, exit with an error. */		for (i = 10*1000/20; i--; )			msleep(20);	}	rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "		"(%.0f bytes received so far) [%s]\n",		(double)stats.total_read, who_am_i());	exit_cleanup(RERR_STREAMIO);}/** * Read from a socket with I/O timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. * * TODO: If the remote shell connection fails, then current versions * actually report an "unexpected EOF" error here.  Since it's a * fairly common mistake to try to use rsh when ssh is required, we * should trap that: if we fail to read any data at all, we should * give a better explanation.  We can tell whether the connection has * started by looking e.g. at whether the remote version is known yet. */static int read_timeout(int fd, char *buf, size_t len){	int n, cnt = 0;	io_flush(FULL_FLUSH);	while (cnt == 0) {		/* until we manage to read *something* */		fd_set r_fds, w_fds;		struct timeval tv;		int maxfd = fd;		int count;		FD_ZERO(&r_fds);		FD_ZERO(&w_fds);		FD_SET(fd, &r_fds);		if (io_filesfrom_f_out >= 0) {			int new_fd;			if (ff_buf.len == 0) {				if (io_filesfrom_f_in >= 0) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -