📄 io.c
字号:
/* * Socket and pipe I/O utilities used in rsync. * * Copyright (C) 1996-2001 Andrew Tridgell * Copyright (C) 1996 Paul Mackerras * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org> * Copyright (C) 2003-2008 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, visit the http://fsf.org website. *//* Rsync provides its own multiplexing system, which is used to send * stderr and stdout over a single socket. * * For historical reasons this is off during the start of the * connection, but it's switched on quite early using * io_start_multiplex_out() and io_start_multiplex_in(). */#include "rsync.h"#include "ifuncs.h"/** If no timeout is specified then use a 60 second select timeout */#define SELECT_TIMEOUT 60extern int bwlimit;extern size_t bwlimit_writemax;extern int io_timeout;extern int allowed_lull;extern int am_server;extern int am_daemon;extern int am_sender;extern int am_generator;extern int inc_recurse;extern int io_error;extern int eol_nulls;extern int flist_eof;extern int list_only;extern int read_batch;extern int csum_length;extern int protect_args;extern int checksum_seed;extern int protocol_version;extern int remove_source_files;extern int preserve_hard_links;extern struct stats stats;extern struct file_list *cur_flist;#ifdef ICONV_OPTIONextern int filesfrom_convert;extern iconv_t ic_send, ic_recv;#endifconst char phase_unknown[] = "unknown";int ignore_timeout = 0;int batch_fd = -1;int msgdone_cnt = 0;/* Ignore an EOF error if non-zero. See whine_about_eof(). */int kluge_around_eof = 0;int msg_fd_in = -1;int msg_fd_out = -1;int sock_f_in = -1;int sock_f_out = -1;static int iobuf_f_in = -1;static char *iobuf_in;static size_t iobuf_in_siz;static size_t iobuf_in_ndx;static size_t iobuf_in_remaining;static int iobuf_f_out = -1;static char *iobuf_out;static int iobuf_out_cnt;int flist_forward_from = -1;static int io_multiplexing_out;static int io_multiplexing_in;static time_t last_io_in;static time_t last_io_out;static int no_flush;static int write_batch_monitor_in = -1;static int write_batch_monitor_out = -1;static int io_filesfrom_f_in = -1;static int io_filesfrom_f_out = -1;static xbuf ff_buf = EMPTY_XBUF;static char ff_lastchar;#ifdef ICONV_OPTIONstatic xbuf iconv_buf = EMPTY_XBUF;#endifstatic int defer_forwarding_messages = 0, keep_defer_forwarding = 0;static int select_timeout = SELECT_TIMEOUT;static int active_filecnt = 0;static OFF_T active_bytecnt = 0;static int first_message = 1;static char int_byte_extra[64] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */ 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */};#define REMOTE_OPTION_ERROR "rsync: on remote machine: -"#define REMOTE_OPTION_ERROR2 ": unknown option"enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND };static void readfd(int fd, char *buffer, size_t N);static void writefd(int fd, const char *buf, size_t len);static void writefd_unbuffered(int fd, const char *buf, size_t len);static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);static flist_ndx_list redo_list, hlink_list;struct msg_list_item { struct msg_list_item *next; char convert; char buf[1];};struct msg_list { struct msg_list_item *head, *tail;};static struct msg_list msg_queue;static void got_flist_entry_status(enum festatus status, const char *buf){ int ndx = IVAL(buf, 0); struct file_list *flist = flist_for_ndx(ndx, "got_flist_entry_status"); if (remove_source_files) { active_filecnt--; active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]); } if (inc_recurse) flist->in_progress--; switch (status) { case FES_SUCCESS: if (remove_source_files) send_msg(MSG_SUCCESS, buf, 4, 0); if (preserve_hard_links) { struct file_struct *file = flist->files[ndx - flist->ndx_start]; if (F_IS_HLINKED(file)) { flist_ndx_push(&hlink_list, ndx); flist->in_progress++; } } break; case FES_REDO: if (read_batch) { if (inc_recurse) flist->in_progress++; break; } if (inc_recurse) flist->to_redo++; flist_ndx_push(&redo_list, ndx); break; case FES_NO_SEND: break; }}static void check_timeout(void){ time_t t; if (!io_timeout || ignore_timeout) return; if (!last_io_in) { last_io_in = time(NULL); return; } t = time(NULL); if (t - last_io_in >= io_timeout) { if (!am_server && !am_daemon) { rprintf(FERROR, "io timeout after %d seconds -- exiting\n", (int)(t-last_io_in)); } exit_cleanup(RERR_TIMEOUT); }}/* Note the fds used for the main socket (which might really be a pipe * for a local transfer, but we can ignore that). */void io_set_sock_fds(int f_in, int f_out){ sock_f_in = f_in; sock_f_out = f_out;}void set_io_timeout(int secs){ io_timeout = secs; if (!io_timeout || io_timeout > SELECT_TIMEOUT) select_timeout = SELECT_TIMEOUT; else select_timeout = io_timeout; allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;}/* Setup the fd used to receive MSG_* messages. Only needed during the * early stages of being a local sender (up through the sending of the * file list) or when we're the generator (to fetch the messages from * the receiver). */void set_msg_fd_in(int fd){ msg_fd_in = fd;}/* Setup the fd used to send our MSG_* messages. Only needed when * we're the receiver (to send our messages to the generator). */void set_msg_fd_out(int fd){ msg_fd_out = fd; set_nonblocking(msg_fd_out);}/* Add a message to the pending MSG_* list. */static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert){ struct msg_list_item *m; int sz = len + 4 + sizeof m[0] - 1; if (!(m = (struct msg_list_item *)new_array(char, sz))) out_of_memory("msg_list_add"); m->next = NULL; m->convert = convert; SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); memcpy(m->buf + 4, buf, len); if (lst->tail) lst->tail->next = m; else lst->head = m; lst->tail = m;}static inline int flush_a_msg(int fd){ struct msg_list_item *m = msg_queue.head; int len = IVAL(m->buf, 0) & 0xFFFFFF; int tag = *((uchar*)m->buf+3) - MPLEX_BASE; if (!(msg_queue.head = m->next)) msg_queue.tail = NULL; defer_forwarding_messages++; mplex_write(fd, tag, m->buf + 4, len, m->convert); defer_forwarding_messages--; free(m); return len;}static void msg_flush(void){ if (am_generator) { while (msg_queue.head && io_multiplexing_out) stats.total_written += flush_a_msg(sock_f_out) + 4; } else { while (msg_queue.head) (void)flush_a_msg(msg_fd_out); }}static void check_for_d_option_error(const char *msg){ static char rsync263_opts[] = "BCDHIKLPRSTWabceghlnopqrtuvxz"; char *colon; int saw_d = 0; if (*msg != 'r' || strncmp(msg, REMOTE_OPTION_ERROR, sizeof REMOTE_OPTION_ERROR - 1) != 0) return; msg += sizeof REMOTE_OPTION_ERROR - 1; if (*msg == '-' || (colon = strchr(msg, ':')) == NULL || strncmp(colon, REMOTE_OPTION_ERROR2, sizeof REMOTE_OPTION_ERROR2 - 1) != 0) return; for ( ; *msg != ':'; msg++) { if (*msg == 'd') saw_d = 1; else if (*msg == 'e') break; else if (strchr(rsync263_opts, *msg) == NULL) return; } if (saw_d) { rprintf(FWARNING, "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); }}/* Read a message from the MSG_* fd and handle it. This is called either * during the early stages of being a local sender (up through the sending * of the file list) or when we're the generator (to fetch the messages * from the receiver). */static void read_msg_fd(void){ char buf[2048]; size_t n; struct file_list *flist; int fd = msg_fd_in; int tag, len; /* Temporarily disable msg_fd_in. This is needed to avoid looping back * to this routine from writefd_unbuffered(). */ no_flush++; msg_fd_in = -1; defer_forwarding_messages++; readfd(fd, buf, 4); tag = IVAL(buf, 0); len = tag & 0xFFFFFF; tag = (tag >> 24) - MPLEX_BASE; switch (tag) { case MSG_DONE: if (len < 0 || len > 1 || !am_generator) { invalid_msg: rprintf(FERROR, "invalid message %d:%d [%s%s]\n", tag, len, who_am_i(), inc_recurse ? "/inc" : ""); exit_cleanup(RERR_STREAMIO); } if (len) { readfd(fd, buf, len); stats.total_read = read_varlong(fd, 3); } msgdone_cnt++; break; case MSG_REDO: if (len != 4 || !am_generator) goto invalid_msg; readfd(fd, buf, 4); got_flist_entry_status(FES_REDO, buf); break; case MSG_FLIST: if (len != 4 || !am_generator || !inc_recurse) goto invalid_msg; readfd(fd, buf, 4); /* Read extra file list from receiver. */ assert(iobuf_in != NULL); assert(iobuf_f_in == fd); if (verbose > 3) { rprintf(FINFO, "[%s] receiving flist for dir %d\n", who_am_i(), IVAL(buf,0)); } flist = recv_file_list(fd); flist->parent_ndx = IVAL(buf,0);#ifdef SUPPORT_HARD_LINKS if (preserve_hard_links) match_hard_links(flist);#endif break; case MSG_FLIST_EOF: if (len != 0 || !am_generator || !inc_recurse) goto invalid_msg; flist_eof = 1; break; case MSG_IO_ERROR: if (len != 4) goto invalid_msg; readfd(fd, buf, len); io_error |= IVAL(buf, 0); break; case MSG_DELETED: if (len >= (int)sizeof buf || !am_generator) goto invalid_msg; readfd(fd, buf, len); send_msg(MSG_DELETED, buf, len, 1); break; case MSG_SUCCESS: if (len != 4 || !am_generator) goto invalid_msg; readfd(fd, buf, 4); got_flist_entry_status(FES_SUCCESS, buf); break; case MSG_NO_SEND: if (len != 4 || !am_generator) goto invalid_msg; readfd(fd, buf, 4); got_flist_entry_status(FES_NO_SEND, buf); break; case MSG_ERROR_SOCKET: case MSG_ERROR_UTF8: case MSG_CLIENT: if (!am_generator) goto invalid_msg; if (tag == MSG_ERROR_SOCKET) io_end_multiplex_out(); /* FALL THROUGH */ case MSG_INFO: case MSG_ERROR: case MSG_ERROR_XFER: case MSG_WARNING: case MSG_LOG: while (len) { n = len; if (n >= sizeof buf) n = sizeof buf - 1; readfd(fd, buf, n); rwrite((enum logcode)tag, buf, n, !am_generator); len -= n; } break; default: rprintf(FERROR, "unknown message %d:%d [%s]\n", tag, len, who_am_i()); exit_cleanup(RERR_STREAMIO); } no_flush--; msg_fd_in = fd; if (!--defer_forwarding_messages && !no_flush) msg_flush();}/* This is used by the generator to limit how many file transfers can * be active at once when --remove-source-files is specified. Without * this, sender-side deletions were mostly happening at the end. */void increment_active_files(int ndx, int itemizing, enum logcode code){ while (1) { /* TODO: tune these limits? */ int limit = active_bytecnt >= 128*1024 ? 10 : 50; if (active_filecnt < limit) break; check_for_finished_files(itemizing, code, 0); if (active_filecnt < limit) break; if (iobuf_out_cnt) io_flush(NORMAL_FLUSH); else read_msg_fd(); } active_filecnt++; active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);}/* Write an message to a multiplexed stream. If this fails, rsync exits. */static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert){ char buffer[BIGPATHBUFLEN]; /* Oversized for use by iconv code. */ size_t n = len;#ifdef ICONV_OPTION /* We need to convert buf before doing anything else so that we * can include the (converted) byte length in the message header. */ if (convert && ic_send != (iconv_t)-1) { xbuf outbuf, inbuf; INIT_XBUF(outbuf, buffer + 4, 0, sizeof buffer - 4); INIT_XBUF(inbuf, (char*)buf, len, -1); iconvbufs(ic_send, &inbuf, &outbuf, ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE); if (inbuf.len > 0) { rprintf(FERROR, "overflowed conversion buffer in mplex_write"); exit_cleanup(RERR_UNSUPPORTED); } n = len = outbuf.len; } else#endif if (n > 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */ n = 0; /* We'd rather do 2 writes than too much memcpy(). */ else memcpy(buffer + 4, buf, n); SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); keep_defer_forwarding++; /* defer_forwarding_messages++ on return */ writefd_unbuffered(fd, buffer, n+4); keep_defer_forwarding--; if (len > n) writefd_unbuffered(fd, buf+n, len-n); if (!--defer_forwarding_messages && !no_flush) msg_flush();}int send_msg(enum msgcode code, const char *buf, int len, int convert){ if (msg_fd_out < 0) { if (!defer_forwarding_messages) return io_multiplex_write(code, buf, len, convert); if (!io_multiplexing_out) return 0; msg_list_add(&msg_queue, code, buf, len, convert); return 1; } if (flist_forward_from >= 0) msg_list_add(&msg_queue, code, buf, len, convert); else mplex_write(msg_fd_out, code, buf, len, convert); return 1;}void send_msg_int(enum msgcode code, int num){ char numbuf[4]; SIVAL(numbuf, 0, num); send_msg(code, numbuf, 4, 0);}void wait_for_receiver(void){ if (io_flush(NORMAL_FLUSH)) return; read_msg_fd();}int get_redo_num(void){ return flist_ndx_pop(&redo_list);}int get_hlink_num(void){ return flist_ndx_pop(&hlink_list);}/** * When we're the receiver and we have a local --files-from list of names * that needs to be sent over the socket to the sender, we have to do two * things at the same time: send the sender a list of what files we're * processing and read the incoming file+info list from the sender. We do * this by augmenting the read_timeout() function to copy this data. It * uses ff_buf to read a block of data from f_in (when it is ready, since * it might be a pipe) and then blast it out f_out (when it is ready to * receive more data). */void io_set_filesfrom_fds(int f_in, int f_out){ io_filesfrom_f_in = f_in; io_filesfrom_f_out = f_out; alloc_xbuf(&ff_buf, 2048);#ifdef ICONV_OPTION if (protect_args) alloc_xbuf(&iconv_buf, 1024);#endif}/* It's almost always an error to get an EOF when we're trying to read from the * network, because the protocol is (for the most part) self-terminating. * * There is one case for the receiver when it is at the end of the transfer * (hanging around reading any keep-alive packets that might come its way): if * the sender dies before the generator's kill-signal comes through, we can end * up here needing to loop until the kill-signal arrives. In this situation, * kluge_around_eof will be < 0. * * There is another case for older protocol versions (< 24) where the module * listing was not terminated, so we must ignore an EOF error in that case and * exit. In this situation, kluge_around_eof will be > 0. */static void whine_about_eof(int fd){ if (kluge_around_eof && fd == sock_f_in) { int i; if (kluge_around_eof > 0) exit_cleanup(0); /* If we're still here after 10 seconds, exit with an error. */ for (i = 10*1000/20; i--; ) msleep(20); } rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed " "(%.0f bytes received so far) [%s]\n", (double)stats.total_read, who_am_i()); exit_cleanup(RERR_STREAMIO);}/** * Read from a socket with I/O timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. * * TODO: If the remote shell connection fails, then current versions * actually report an "unexpected EOF" error here. Since it's a * fairly common mistake to try to use rsh when ssh is required, we * should trap that: if we fail to read any data at all, we should * give a better explanation. We can tell whether the connection has * started by looking e.g. at whether the remote version is known yet. */static int read_timeout(int fd, char *buf, size_t len){ int n, cnt = 0; io_flush(FULL_FLUSH); while (cnt == 0) { /* until we manage to read *something* */ fd_set r_fds, w_fds; struct timeval tv; int maxfd = fd; int count; FD_ZERO(&r_fds); FD_ZERO(&w_fds); FD_SET(fd, &r_fds); if (io_filesfrom_f_out >= 0) { int new_fd; if (ff_buf.len == 0) { if (io_filesfrom_f_in >= 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -