📄 io.c
字号:
FD_SET(io_filesfrom_f_in, &r_fds); new_fd = io_filesfrom_f_in; } else { io_filesfrom_f_out = -1; new_fd = -1; } } else { FD_SET(io_filesfrom_f_out, &w_fds); new_fd = io_filesfrom_f_out; } if (new_fd > maxfd) maxfd = new_fd; } tv.tv_sec = select_timeout; tv.tv_usec = 0; errno = 0; count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv); if (count <= 0) { if (errno == EBADF) { defer_forwarding_messages = 0; exit_cleanup(RERR_SOCKETIO); } check_timeout(); continue; } if (io_filesfrom_f_out >= 0) { if (ff_buf.len) { if (FD_ISSET(io_filesfrom_f_out, &w_fds)) { int l = write(io_filesfrom_f_out, ff_buf.buf + ff_buf.pos, ff_buf.len); if (l > 0) { if (!(ff_buf.len -= l)) ff_buf.pos = 0; else ff_buf.pos += l; } else if (errno != EINTR) { /* XXX should we complain? */ io_filesfrom_f_out = -1; } } } else if (io_filesfrom_f_in >= 0) { if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {#ifdef ICONV_OPTION xbuf *ibuf = filesfrom_convert ? &iconv_buf : &ff_buf;#else xbuf *ibuf = &ff_buf;#endif int l = read(io_filesfrom_f_in, ibuf->buf, ibuf->size); if (l <= 0) { if (l == 0 || errno != EINTR) { /* Send end-of-file marker */ memcpy(ff_buf.buf, "\0\0", 2); ff_buf.len = ff_lastchar? 2 : 1; ff_buf.pos = 0; io_filesfrom_f_in = -1; } } else {#ifdef ICONV_OPTION if (filesfrom_convert) { iconv_buf.pos = 0; iconv_buf.len = l; iconvbufs(ic_send, &iconv_buf, &ff_buf, ICB_EXPAND_OUT|ICB_INCLUDE_BAD|ICB_INCLUDE_INCOMPLETE); l = ff_buf.len; }#endif if (!eol_nulls) { char *s = ff_buf.buf + l; /* Transform CR and/or LF into '\0' */ while (s-- > ff_buf.buf) { if (*s == '\n' || *s == '\r') *s = '\0'; } } if (!ff_lastchar) { /* Last buf ended with a '\0', so don't * let this buf start with one. */ while (l && ff_buf.buf[ff_buf.pos] == '\0') ff_buf.pos++, l--; } if (!l) ff_buf.pos = 0; else { char *f = ff_buf.buf + ff_buf.pos; char *t = f; char *eob = f + l; /* Eliminate any multi-'\0' runs. */ while (f != eob) { if (!(*t++ = *f++)) { while (f != eob && !*f) f++, l--; } } ff_lastchar = f[-1]; } ff_buf.len = l; } } } } if (!FD_ISSET(fd, &r_fds)) continue; n = read(fd, buf, len); if (n <= 0) { if (n == 0) whine_about_eof(fd); /* Doesn't return. */ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) continue; /* Don't write errors on a dead socket. */ if (fd == sock_f_in) { io_end_multiplex_out(); rsyserr(FERROR_SOCKET, errno, "read error"); } else rsyserr(FERROR, errno, "read error"); exit_cleanup(RERR_STREAMIO); } buf += n; len -= n; cnt += n; if (fd == sock_f_in && io_timeout) last_io_in = time(NULL); } return cnt;}/* Read a line into the "buf" buffer. */int read_line(int fd, char *buf, size_t bufsiz, int flags){ char ch, *s, *eob; int cnt;#ifdef ICONV_OPTION if (flags & RL_CONVERT && iconv_buf.size < bufsiz) realloc_xbuf(&iconv_buf, bufsiz + 1024);#endif start:#ifdef ICONV_OPTION s = flags & RL_CONVERT ? iconv_buf.buf : buf;#else s = buf;#endif eob = s + bufsiz - 1; while (1) { cnt = read(fd, &ch, 1); if (cnt < 0 && (errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN)) { struct timeval tv; fd_set r_fds, e_fds; FD_ZERO(&r_fds); FD_SET(fd, &r_fds); FD_ZERO(&e_fds); FD_SET(fd, &e_fds); tv.tv_sec = select_timeout; tv.tv_usec = 0; if (!select(fd+1, &r_fds, NULL, &e_fds, &tv)) check_timeout(); /*if (FD_ISSET(fd, &e_fds)) rprintf(FINFO, "select exception on fd %d\n", fd); */ continue; } if (cnt != 1) break; if (flags & RL_EOL_NULLS ? ch == '\0' : (ch == '\r' || ch == '\n')) { /* Skip empty lines if dumping comments. */ if (flags & RL_DUMP_COMMENTS && s == buf) continue; break; } if (s < eob) *s++ = ch; } *s = '\0'; if (flags & RL_DUMP_COMMENTS && (*buf == '#' || *buf == ';')) goto start;#ifdef ICONV_OPTION if (flags & RL_CONVERT) { xbuf outbuf; INIT_XBUF(outbuf, buf, 0, bufsiz); iconv_buf.pos = 0; iconv_buf.len = s - iconv_buf.buf; iconvbufs(ic_recv, &iconv_buf, &outbuf, ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE); outbuf.buf[outbuf.len] = '\0'; return outbuf.len; }#endif return s - buf;}void read_args(int f_in, char *mod_name, char *buf, size_t bufsiz, int rl_nulls, char ***argv_p, int *argc_p, char **request_p){ int maxargs = MAX_ARGS; int dot_pos = 0; int argc = 0; char **argv, *p; int rl_flags = (rl_nulls ? RL_EOL_NULLS : 0);#ifdef ICONV_OPTION rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0);#endif if (!(argv = new_array(char *, maxargs))) out_of_memory("read_args"); if (mod_name && !protect_args) argv[argc++] = "rsyncd"; while (1) { if (read_line(f_in, buf, bufsiz, rl_flags) == 0) break; if (argc == maxargs-1) { maxargs += MAX_ARGS; if (!(argv = realloc_array(argv, char *, maxargs))) out_of_memory("read_args"); } if (dot_pos) { if (request_p) { *request_p = strdup(buf); request_p = NULL; } if (mod_name) glob_expand_module(mod_name, buf, &argv, &argc, &maxargs); else glob_expand(buf, &argv, &argc, &maxargs); } else { if (!(p = strdup(buf))) out_of_memory("read_args"); argv[argc++] = p; if (*p == '.' && p[1] == '\0') dot_pos = argc; } } argv[argc] = NULL; glob_expand(NULL, NULL, NULL, NULL); *argc_p = argc; *argv_p = argv;}int io_start_buffering_out(int f_out){ if (iobuf_out) { assert(f_out == iobuf_f_out); return 0; } if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE))) out_of_memory("io_start_buffering_out"); iobuf_out_cnt = 0; iobuf_f_out = f_out; return 1;}int io_start_buffering_in(int f_in){ if (iobuf_in) { assert(f_in == iobuf_f_in); return 0; } iobuf_in_siz = 2 * IO_BUFFER_SIZE; if (!(iobuf_in = new_array(char, iobuf_in_siz))) out_of_memory("io_start_buffering_in"); iobuf_f_in = f_in; return 1;}void io_end_buffering_in(void){ if (!iobuf_in) return; free(iobuf_in); iobuf_in = NULL; iobuf_in_ndx = 0; iobuf_in_remaining = 0; iobuf_f_in = -1;}void io_end_buffering_out(void){ if (!iobuf_out) return; io_flush(FULL_FLUSH); free(iobuf_out); iobuf_out = NULL; iobuf_f_out = -1;}void maybe_flush_socket(int important){ if (iobuf_out && iobuf_out_cnt && (important || time(NULL) - last_io_out >= 5)) io_flush(NORMAL_FLUSH);}void maybe_send_keepalive(void){ if (time(NULL) - last_io_out >= allowed_lull) { if (!iobuf_out || !iobuf_out_cnt) { if (protocol_version < 29) return; /* there's nothing we can do */ if (protocol_version >= 30) send_msg(MSG_NOOP, "", 0, 0); else { write_int(sock_f_out, cur_flist->used); write_shortint(sock_f_out, ITEM_IS_NEW); } } if (iobuf_out) io_flush(NORMAL_FLUSH); }}void start_flist_forward(int f_in){ assert(iobuf_out != NULL); assert(iobuf_f_out == msg_fd_out); flist_forward_from = f_in;}void stop_flist_forward(){ flist_forward_from = -1; io_flush(FULL_FLUSH);}/** * Continue trying to read len bytes - don't return until len has been * read. **/static void read_loop(int fd, char *buf, size_t len){ while (len) { int n = read_timeout(fd, buf, len); buf += n; len -= n; }}/** * Read from the file descriptor handling multiplexing - return number * of bytes read. * * Never returns <= 0. */static int readfd_unbuffered(int fd, char *buf, size_t len){ size_t msg_bytes; int tag, cnt = 0; char line[BIGPATHBUFLEN]; if (!iobuf_in || fd != iobuf_f_in) return read_timeout(fd, buf, len); if (!io_multiplexing_in && iobuf_in_remaining == 0) { iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz); iobuf_in_ndx = 0; } while (cnt == 0) { if (iobuf_in_remaining) { len = MIN(len, iobuf_in_remaining); memcpy(buf, iobuf_in + iobuf_in_ndx, len); iobuf_in_ndx += len; iobuf_in_remaining -= len; cnt = len; break; } read_loop(fd, line, 4); tag = IVAL(line, 0); msg_bytes = tag & 0xFFFFFF; tag = (tag >> 24) - MPLEX_BASE; switch (tag) { case MSG_DATA: if (msg_bytes > iobuf_in_siz) { if (!(iobuf_in = realloc_array(iobuf_in, char, msg_bytes))) out_of_memory("readfd_unbuffered"); iobuf_in_siz = msg_bytes; } read_loop(fd, iobuf_in, msg_bytes); iobuf_in_remaining = msg_bytes; iobuf_in_ndx = 0; break; case MSG_NOOP: if (am_sender) maybe_send_keepalive(); break; case MSG_IO_ERROR: if (msg_bytes != 4) goto invalid_msg; read_loop(fd, line, msg_bytes); send_msg_int(MSG_IO_ERROR, IVAL(line, 0)); io_error |= IVAL(line, 0); break; case MSG_DELETED: if (msg_bytes >= sizeof line) goto overflow;#ifdef ICONV_OPTION if (ic_recv != (iconv_t)-1) { xbuf outbuf, inbuf; char ibuf[512]; int add_null = 0; int pos = 0; INIT_CONST_XBUF(outbuf, line); INIT_XBUF(inbuf, ibuf, 0, -1); while (msg_bytes) { inbuf.len = msg_bytes > sizeof ibuf ? sizeof ibuf : msg_bytes; read_loop(fd, inbuf.buf, inbuf.len); if (!(msg_bytes -= inbuf.len) && !ibuf[inbuf.len-1]) inbuf.len--, add_null = 1; if (iconvbufs(ic_send, &inbuf, &outbuf, ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0) goto overflow; pos = -1; } if (add_null) { if (outbuf.len == outbuf.size) goto overflow; outbuf.buf[outbuf.len++] = '\0'; } msg_bytes = outbuf.len; } else#endif read_loop(fd, line, msg_bytes); /* A directory name was sent with the trailing null */ if (msg_bytes > 0 && !line[msg_bytes-1]) log_delete(line, S_IFDIR); else { line[msg_bytes] = '\0'; log_delete(line, S_IFREG); } break; case MSG_SUCCESS: if (msg_bytes != 4) { invalid_msg: rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n", tag, (long)msg_bytes, who_am_i()); exit_cleanup(RERR_STREAMIO); } read_loop(fd, line, msg_bytes); successful_send(IVAL(line, 0)); break; case MSG_NO_SEND: if (msg_bytes != 4) goto invalid_msg; read_loop(fd, line, msg_bytes); send_msg_int(MSG_NO_SEND, IVAL(line, 0)); break; case MSG_INFO: case MSG_ERROR: case MSG_ERROR_XFER: case MSG_WARNING: if (msg_bytes >= sizeof line) { overflow: rprintf(FERROR, "multiplexing overflow %d:%ld [%s]\n", tag, (long)msg_bytes, who_am_i()); exit_cleanup(RERR_STREAMIO); } read_loop(fd, line, msg_bytes); rwrite((enum logcode)tag, line, msg_bytes, 1); if (first_message) { if (list_only && !am_sender && tag == 1) { line[msg_bytes] = '\0'; check_for_d_option_error(line); } first_message = 0; } break; default: rprintf(FERROR, "unexpected tag %d [%s]\n", tag, who_am_i()); exit_cleanup(RERR_STREAMIO); } } if (iobuf_in_remaining == 0) io_flush(NORMAL_FLUSH); return cnt;}/* Do a buffered read from fd. Don't return until all N bytes have * been read. If all N can't be read then exit with an error. */static void readfd(int fd, char *buffer, size_t N){ int cnt; size_t total = 0; while (total < N) { cnt = readfd_unbuffered(fd, buffer + total, N-total); total += cnt; } if (fd == write_batch_monitor_in) { if ((size_t)write(batch_fd, buffer, total) != total) exit_cleanup(RERR_FILEIO); } if (fd == flist_forward_from) writefd(iobuf_f_out, buffer, total); if (fd == sock_f_in) stats.total_read += total;}unsigned short read_shortint(int f){ char b[2]; readfd(f, b, 2); return (UVAL(b, 1) << 8) + UVAL(b, 0);}int32 read_int(int f){ char b[4]; int32 num; readfd(f, b, 4); num = IVAL(b, 0);#if SIZEOF_INT32 > 4 if (num & (int32)0x80000000) num |= ~(int32)0xffffffff;#endif return num;}int32 read_varint(int f){ union { char b[5]; int32 x; } u; uchar ch; int extra; u.x = 0; readfd(f, (char*)&ch, 1); extra = int_byte_extra[ch / 4]; if (extra) { uchar bit = ((uchar)1<<(8-extra)); if (extra >= (int)sizeof u.b) { rprintf(FERROR, "Overflow in read_varint()\n"); exit_cleanup(RERR_STREAMIO); } readfd(f, u.b, extra); u.b[extra] = ch & (bit-1); } else u.b[0] = ch;#if CAREFUL_ALIGNMENT u.x = IVAL(u.b,0);#endif#if SIZEOF_INT32 > 4 if (u.x & (int32)0x80000000) u.x |= ~(int32)0xffffffff;#endif return u.x;}int64 read_varlong(int f, uchar min_bytes){ union { char b[9]; int64 x; } u; char b2[8]; int extra;#if SIZEOF_INT64 < 8 memset(u.b, 0, 8);#else u.x = 0;#endif readfd(f, b2, min_bytes); memcpy(u.b, b2+1, min_bytes-1); extra = int_byte_extra[CVAL(b2, 0) / 4]; if (extra) { uchar bit = ((uchar)1<<(8-extra)); if (min_bytes + extra > (int)sizeof u.b) { rprintf(FERROR, "Overflow in read_varlong()\n"); exit_cleanup(RERR_STREAMIO); } readfd(f, u.b + min_bytes - 1, extra); u.b[min_bytes + extra - 1] = CVAL(b2, 0) & (bit-1);#if SIZEOF_INT64 < 8 if (min_bytes + extra > 5 || u.b[4] || CVAL(u.b,3) & 0x80) { rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); exit_cleanup(RERR_UNSUPPORTED); }#endif } else u.b[min_bytes + extra - 1] = CVAL(b2, 0);#if SIZEOF_INT64 < 8 u.x = IVAL(u.b,0);#elif CAREFUL_ALIGNMENT u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32);#endif return u.x;}int64 read_longint(int f){#if SIZEOF_INT64 >= 8 char b[9];#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -