📄 main_reading.c
字号:
usage(void){ fprintf(stderr, "usage: rtpproxy [-2fv] [-l addr1[/addr2]] " "[-6 addr1[/addr2]] [-s path] [-t tos] [-r rdir [-S sdir]]\n"); exit(1);}static voidfatsignal(int sig){ rtpp_log_write(RTPP_LOG_INFO, glog, "got signal %d", sig); exit(0);}static voidehandler(void){ unlink(cmd_sock); unlink(pid_file); rtpp_log_write(RTPP_LOG_INFO, glog, "rtpproxy ended"); rtpp_log_close(glog);}intmain(int argc, char **argv){ int controlfd, i, j, k, readyfd, len, nodaemon, dmode, port, ridx, sidx; int rebuild_pending, timeout; sigset_t set, oset; struct rtpp_session *sp; struct sockaddr_un ifsun; struct sockaddr_storage ifsin, raddr; socklen_t rlen; struct itimerval tick; char buf[1024 * 8]; char ch, *bh[2], *bh6[2], *cp; double sptime, eptime; unsigned long delay; bh[0] = bh[1] = bh6[0] = bh6[1] = NULL; nodaemon = 0; dmode = 0; while ((ch = getopt(argc, argv, "vf2Rl:6:s:S:t:r:p:")) != -1) switch (ch) { case 'f': nodaemon = 1; break; case 'l': bh[0] = optarg; bh[1] = strchr(bh[0], '/'); if (bh[1] != NULL) { *bh[1] = '\0'; bh[1]++; bmode = 1; } break; case '6': bh6[0] = optarg; bh6[1] = strchr(bh6[0], '/'); if (bh6[1] != NULL) { *bh6[1] = '\0'; bh6[1]++; bmode = 1; } break; case 's': if (strncmp("udp:", optarg, 4) == 0) { umode = 1; optarg += 4; } else if (strncmp("udp6:", optarg, 5) == 0) { umode = 6; optarg += 5; } else if (strncmp("unix:", optarg, 5) == 0) { umode = 0; optarg += 5; } cmd_sock = optarg; break; case 't': tos = atoi(optarg); break; case '2': dmode = 1; break; case 'v': printf("Basic version: %d\n", CPROTOVER); for(i = 1; proto_caps[i].pc_id != NULL; ++i) { printf("Extension %s: %s\n", proto_caps[i].pc_id, proto_caps[i].pc_description); } exit(0); break; case 'r': rdir = optarg; break; case 'S': sdir = optarg; break; case 'R': rrtcp = 0; break; case 'p': pid_file = optarg; break; case '?': default: usage(); } argc -= optind; argv += optind; if (rdir == NULL && sdir != NULL) errx(1, "-S switch requires -r switch"); if (bh[0] == NULL && bh[1] == NULL && bh6[0] == NULL && bh6[1] == NULL) { if (umode != 0) errx(1, "explicit binding address has to be specified in UDP " "command mode"); bh[0] = "*"; } for (i = 0; i < 2; i++) { if (bh[i] != NULL && *bh[i] == '\0') bh[i] = NULL; if (bh6[i] != NULL && *bh6[i] == '\0') bh6[i] = NULL; } i = ((bh[0] == NULL) ? 0 : 1) + ((bh[1] == NULL) ? 0 : 1) + ((bh6[0] == NULL) ? 0 : 1) + ((bh6[1] == NULL) ? 0 : 1); if (bmode != 0) { if (bh[0] != NULL && bh6[0] != NULL) errx(1, "either IPv4 or IPv6 should be configured for external " "interface in bridging mode, not both"); if (bh[1] != NULL && bh6[1] != NULL) errx(1, "either IPv4 or IPv6 should be configured for internal " "interface in bridging mode, not both"); if (i != 2) errx(1, "incomplete configuration of the bridging mode - exactly " "2 listen addresses required, %d provided", i); } else if (i != 1) { errx(1, "exactly 1 listen addresses required, %d provided", i); } for (i = 0; i < 2; i++) { bindaddr[i] = NULL; if (bh[i] != NULL) { bindaddr[i] = alloca(sizeof(struct sockaddr_storage)); setbindhost(bindaddr[i], AF_INET, bh[i], SERVICE); continue; } if (bh6[i] != NULL) { bindaddr[i] = alloca(sizeof(struct sockaddr_storage)); setbindhost(bindaddr[i], AF_INET6, bh6[i], SERVICE); continue; } } if (bindaddr[0] == NULL) { bindaddr[0] = bindaddr[1]; bindaddr[1] = NULL; } if (umode == 0) { unlink(cmd_sock); memset(&ifsun, '\0', sizeof ifsun);#if !defined(__linux__) && !defined(__solaris__) ifsun.sun_len = strlen(cmd_sock);#endif ifsun.sun_family = AF_LOCAL; strcpy(ifsun.sun_path, cmd_sock); controlfd = socket(PF_LOCAL, SOCK_STREAM, 0); if (controlfd == -1) err(1, "can't create socket"); setsockopt(controlfd, SOL_SOCKET, SO_REUSEADDR, &controlfd, sizeof controlfd); if (bind(controlfd, sstosa(&ifsun), sizeof ifsun) < 0) err(1, "can't bind to a socket"); if (listen(controlfd, 32) != 0) err(1, "can't listen on a socket"); } else { cp = strrchr(cmd_sock, ':'); if (cp != NULL) { *cp = '\0'; cp++; } if (cp == NULL || *cp == '\0') cp = CPORT; i = (umode == 6) ? AF_INET6 : AF_INET; setbindhost(sstosa(&ifsin), i, cmd_sock, cp); controlfd = socket(i, SOCK_DGRAM, 0); if (controlfd == -1) err(1, "can't create socket"); if (bind(controlfd, sstosa(&ifsin), SS_LEN(&ifsin)) < 0) err(1, "can't bind to a socket"); }#if !defined(__solaris__) if (nodaemon == 0) { if (daemon(0, 1) == -1) err(1, "can't switch into daemon mode"); /* NOTREACHED */ for (i = 0; i < (int)FD_SETSIZE; i++) if (i != controlfd) close(i); }#endif atexit(ehandler); glog = rtpp_log_open("rtpproxy", NULL, LF_REOPEN); rtpp_log_write(RTPP_LOG_INFO, glog, "rtpproxy started, pid %d", getpid()); i = open(pid_file, O_WRONLY | O_CREAT | O_TRUNC, DEFFILEMODE); if (i >= 0) { len = sprintf(buf, "%u\n", getpid()); write(i, buf, len); close(i); } else { rtpp_log_ewrite(RTPP_LOG_ERR, glog, "can't open pidfile for writing"); } signal(SIGHUP, fatsignal); signal(SIGINT, fatsignal); signal(SIGKILL, fatsignal); signal(SIGPIPE, SIG_IGN); signal(SIGTERM, fatsignal); signal(SIGXCPU, fatsignal); signal(SIGXFSZ, fatsignal); signal(SIGVTALRM, fatsignal); signal(SIGPROF, fatsignal); signal(SIGUSR1, fatsignal); signal(SIGUSR2, fatsignal); fds[0].fd = controlfd; fds[0].events = POLLIN; fds[0].revents = 0; rebuild_tables(); memset(&tick, 0, sizeof(tick)); tick.it_interval.tv_sec = TIMETICK; tick.it_value.tv_sec = TIMETICK; signal(SIGALRM, SIG_IGN); setitimer(ITIMER_REAL, &tick, NULL); sigemptyset(&set); sigaddset(&set, SIGALRM); signal(SIGALRM, alarmhandler); rebuild_pending = 0; sptime = 0; while(1) { if (rtp_nsessions > 0) timeout = RTPS_TICKS_MIN; else timeout = INFTIM; sigprocmask(SIG_UNBLOCK, &set, &oset); eptime = getctime(); delay = (eptime - sptime) * 1000000.0; if (delay < (1000000 / POLL_LIMIT)) { usleep((1000000 / POLL_LIMIT) - delay); sptime = getctime(); } else { sptime = eptime; } i = poll(fds, nsessions + 1, timeout); if (i < 0 && errno == EINTR) continue; sigprocmask(SIG_BLOCK, &set, &oset); if (rtp_nsessions > 0) { for (j = 0; j < rtp_nsessions; j++) { sp = rtp_servers[j]; for (sidx = 0; sidx < 2; sidx++) { if (sp->rtps[sidx] == NULL || sp->addr[sidx] == NULL) continue; while ((len = rtp_server_get(sp->rtps[sidx])) != RTPS_LATER) { if (len == RTPS_EOF) { rtp_server_free(sp->rtps[sidx]); sp->rtps[sidx] = NULL; rebuild_pending = 1; break; } for (k = (dmode && len < LBR_THRS) ? 2 : 1; k > 0; k--) { sendto(sp->fds[sidx], sp->rtps[sidx]->buf, len, 0, sp->addr[sidx], SA_LEN(sp->addr[sidx])); } } } } if (i == 0) continue; } for (readyfd = 0; readyfd < nsessions + 1; readyfd++) { if ((fds[readyfd].revents & POLLIN) == 0) continue; if (readyfd == 0) { if (umode == 0) { rlen = sizeof(ifsun); controlfd = accept(fds[readyfd].fd, sstosa(&ifsun), &rlen); if (controlfd == -1) { rtpp_log_ewrite(RTPP_LOG_ERR, glog, "can't accept connection on control socket"); continue; } } else { controlfd = fds[readyfd].fd; } handle_command(controlfd); if (umode == 0) { close(controlfd); } /* * Don't use continue here, because we have cleared all * revents in rebuild_tables(). */ break; }drain: rlen = sizeof(raddr); len = recvfrom(fds[readyfd].fd, buf, sizeof(buf), 0, sstosa(&raddr), &rlen); if (len <= 0) continue; sp = sessions[readyfd - 1]; if (sp->complete == 0) continue; for (i = 0; i < 2; i++) { if (fds[readyfd].fd == sp->fds[i]) { ridx = i; break; } } /* * Can't happen. */ if (i == 2) abort(); i = 0; if (sp->addr[ridx] != NULL) { /* Check that the packet is authentic, drop if it isn't */ if (sp->asymmetric[ridx] == 0) { if (memcmp(sp->addr[ridx], &raddr, rlen) != 0) { if (sp->canupdate[ridx] == 0) continue; /* Signal that an address have to be updated */ i = 1; } } else { /* * For asymmetric clients don't check * source port since it may be different. */ if (!ishostseq(sp->addr[ridx], sstosa(&raddr))) continue; } sp->pcount[ridx]++; } else { sp->pcount[ridx]++; sp->addr[ridx] = malloc(rlen); if (sp->addr[ridx] == NULL) { sp->pcount[3]++; rtpp_log_write(RTPP_LOG_ERR, sp->log, "can't allocate memory for remote address - " "removing session"); remove_session(GET_RTP(sp), NULL); rebuild_tables(); /* * Don't use continue here, because we have cleared all * revents in rebuild_tables(). */ break; } /* Signal that an address have to be updated. */ i = 1; } /* Update recorded address if it's necessary. */ if (i != 0) { memcpy(sp->addr[ridx], &raddr, rlen); sp->canupdate[ridx] = 0; port = ntohs(satosin(&raddr)->sin_port); rtpp_log_write(RTPP_LOG_INFO, sp->log, "%s's address filled in: %s:%d (%s)", (ridx == 0) ? "callee" : "caller", addr2char(sstosa(&raddr)), port, (sp->rtp == NULL) ? "RTP" : "RTCP"); /* * Check if we have updated RTP while RTCP is still * empty or contains address that differs from one we * used when updating RTP. Try to guess RTCP if so, * should be handy for non-NAT'ed clients, and some * NATed as well. */ if (sp->rtcp != NULL && (sp->rtcp->addr[ridx] == NULL || !ishostseq(sp->rtcp->addr[ridx], sstosa(&raddr)))) { if (sp->rtcp->addr[ridx] == NULL) { sp->rtcp->addr[ridx] = malloc(rlen); if (sp->rtcp->addr[ridx] == NULL) { sp->pcount[3]++; rtpp_log_write(RTPP_LOG_ERR, sp->log, "can't allocate memory for remote address - " "removing session"); remove_session(sp, NULL); /* * Don't use continue here, because we have cleared all * revents in rebuild_tables(). */ rebuild_tables(); break; } } memcpy(sp->rtcp->addr[ridx], &raddr, rlen); satosin(sp->rtcp->addr[ridx])->sin_port = htons(port + 1); /* Use guessed value as the only true one for asymmetric clients */ sp->rtcp->canupdate[ridx] = NOT(sp->rtcp->asymmetric[ridx]); rtpp_log_write(RTPP_LOG_INFO, sp->log, "guessing RTCP port " "for %s to be %d", (ridx == 0) ? "callee" : "caller", port + 1); } } /* Select socket for sending packet out. */ sidx = (ridx == 0) ? 1 : 0; GET_RTP(sp)->ttl = SESSION_TIMEOUT; /* * Check that we have some address to which packet is to be * sent out, drop otherwise. */ if (sp->addr[sidx] == NULL || GET_RTP(sp)->rtps[sidx] != NULL) { sp->pcount[3]++; goto do_record; } sp->pcount[2]++; for (i = (dmode && len < LBR_THRS) ? 2 : 1; i > 0; i--) { sendto(sp->fds[sidx], buf, len, 0, sp->addr[sidx], SA_LEN(sp->addr[sidx])); }do_record: if (sp->rrcs[ridx] != NULL && GET_RTP(sp)->rtps[ridx] == NULL) rwrite(sp, sp->rrcs[ridx], sstosa(&raddr), buf, len); /* Repeat since we may have several packets queued */ goto drain; } if (rebuild_pending != 0) { rebuild_tables(); rebuild_pending = 0; } } exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -