📄 main.c
字号:
return controlfd;}static voidprocess_rtp_servers(struct cfg *cf, double ctime){ int j, k, sidx, len, skipfd; struct rtpp_session *sp; skipfd = 0; for (j = 0; j < cf->rtp_nsessions; j++) { sp = cf->rtp_servers[j]; if (sp == NULL) { skipfd++; continue; } if (skipfd > 0) { cf->rtp_servers[j - skipfd] = cf->rtp_servers[j]; sp->sridx = j - skipfd; } for (sidx = 0; sidx < 2; sidx++) { if (sp->rtps[sidx] == NULL || sp->addr[sidx] == NULL) continue; while ((len = rtp_server_get(sp->rtps[sidx], ctime)) != RTPS_LATER) { if (len == RTPS_EOF) { rtp_server_free(sp->rtps[sidx]); sp->rtps[sidx] = NULL; if (sp->rtps[0] == NULL && sp->rtps[1] == NULL) { assert(cf->rtp_servers[sp->sridx] == sp); cf->rtp_servers[sp->sridx] = NULL; sp->sridx = -1; } break; } for (k = (cf->dmode && len < LBR_THRS) ? 2 : 1; k > 0; k--) { sendto(sp->fds[sidx], sp->rtps[sidx]->buf, len, 0, sp->addr[sidx], SA_LEN(sp->addr[sidx])); } } } } cf->rtp_nsessions -= skipfd;}static voidrxmit_packets(struct cfg *cf, struct rtpp_session *sp, int ridx, double ctime){ int ndrain, i, port; struct rtp_packet *packet = NULL; /* Repeat since we may have several packets queued on the same socket */ for (ndrain = 0; ndrain < 5; ndrain++) { if (packet != NULL) rtp_packet_free(packet); packet = rtp_recv(sp->fds[ridx]); if (packet == NULL) break; packet->rtime = ctime; i = 0; if (sp->addr[ridx] != NULL) { /* Check that the packet is authentic, drop if it isn't */ if (sp->asymmetric[ridx] == 0) { if (memcmp(sp->addr[ridx], &packet->raddr, packet->rlen) != 0) { if (sp->canupdate[ridx] == 0) { /* * Continue, since there could be good packets in * queue. */ continue; } /* Signal that an address have to be updated */ i = 1; } } else { /* * For asymmetric clients don't check * source port since it may be different. */ if (!ishostseq(sp->addr[ridx], sstosa(&packet->raddr))) /* * Continue, since there could be good packets in * queue. */ continue; } sp->pcount[ridx]++; } else { sp->pcount[ridx]++; sp->addr[ridx] = malloc(packet->rlen); if (sp->addr[ridx] == NULL) { sp->pcount[3]++; rtpp_log_write(RTPP_LOG_ERR, sp->log, "can't allocate memory for remote address - " "removing session"); remove_session(cf, GET_RTP(sp)); /* Break, sp is invalid now */ break; } /* Signal that an address have to be updated. */ i = 1; } /* * Update recorded address if it's necessary. Set "untrusted address" * flag in the session state, so that possible future address updates * from that client won't get address changed immediately to some * bogus one. */ if (i != 0) { sp->untrusted_addr[ridx] = 1; memcpy(sp->addr[ridx], &packet->raddr, packet->rlen); sp->canupdate[ridx] = 0; port = ntohs(satosin(&packet->raddr)->sin_port); rtpp_log_write(RTPP_LOG_INFO, sp->log, "%s's address filled in: %s:%d (%s)", (ridx == 0) ? "callee" : "caller", addr2char(sstosa(&packet->raddr)), port, (sp->rtp == NULL) ? "RTP" : "RTCP"); /* * Check if we have updated RTP while RTCP is still * empty or contains address that differs from one we * used when updating RTP. Try to guess RTCP if so, * should be handy for non-NAT'ed clients, and some * NATed as well. */ if (sp->rtcp != NULL && (sp->rtcp->addr[ridx] == NULL || !ishostseq(sp->rtcp->addr[ridx], sstosa(&packet->raddr)))) { if (sp->rtcp->addr[ridx] == NULL) { sp->rtcp->addr[ridx] = malloc(packet->rlen); if (sp->rtcp->addr[ridx] == NULL) { sp->pcount[3]++; rtpp_log_write(RTPP_LOG_ERR, sp->log, "can't allocate memory for remote address - " "removing session"); remove_session(cf, sp); /* Break, sp is invalid now */ break; } } memcpy(sp->rtcp->addr[ridx], &packet->raddr, packet->rlen); satosin(sp->rtcp->addr[ridx])->sin_port = htons(port + 1); /* Use guessed value as the only true one for asymmetric clients */ sp->rtcp->canupdate[ridx] = NOT(sp->rtcp->asymmetric[ridx]); rtpp_log_write(RTPP_LOG_INFO, sp->log, "guessing RTCP port " "for %s to be %d", (ridx == 0) ? "callee" : "caller", port + 1); } } if (sp->resizers[ridx].output_nsamples > 0) rtp_resizer_enqueue(&sp->resizers[ridx], &packet); if (packet != NULL) send_packet(cf, sp, ridx, packet); } if (packet != NULL) rtp_packet_free(packet);}static voidsend_packet(struct cfg *cf, struct rtpp_session *sp, int ridx, struct rtp_packet *packet){ int i, sidx; GET_RTP(sp)->ttl = cf->max_ttl; /* Select socket for sending packet out. */ sidx = (ridx == 0) ? 1 : 0; /* * Check that we have some address to which packet is to be * sent out, drop otherwise. */ if (sp->addr[sidx] == NULL || GET_RTP(sp)->rtps[sidx] != NULL) { sp->pcount[3]++; } else { sp->pcount[2]++; for (i = (cf->dmode && packet->size < LBR_THRS) ? 2 : 1; i > 0; i--) { sendto(sp->fds[sidx], packet->buf, packet->size, 0, sp->addr[sidx], SA_LEN(sp->addr[sidx])); } } if (sp->rrcs[ridx] != NULL && GET_RTP(sp)->rtps[ridx] == NULL) rwrite(sp, sp->rrcs[ridx], packet);}static voidprocess_rtp(struct cfg *cf, double ctime, int alarm_tick){ int readyfd, skipfd, ridx; struct rtpp_session *sp; struct rtp_packet *packet; /* Relay RTP/RTCP */ skipfd = 0; for (readyfd = 1; readyfd < cf->nsessions; readyfd++) { sp = cf->sessions[readyfd]; if (alarm_tick != 0 && sp != NULL && sp->rtcp != NULL && sp->sidx[0] == readyfd) { if (sp->ttl == 0) { rtpp_log_write(RTPP_LOG_INFO, sp->log, "session timeout"); remove_session(cf, sp); } else { sp->ttl--; } } if (cf->pfds[readyfd].fd == -1) { /* Deleted session, count and move one */ skipfd++; continue; } /* Find index of the call leg within a session */ for (ridx = 0; ridx < 2; ridx++) if (cf->pfds[readyfd].fd == sp->fds[ridx]) break; /* * Can't happen. */ assert(ridx != 2); /* Compact pfds[] and sessions[] by eliminating removed sessions */ if (skipfd > 0) { cf->pfds[readyfd - skipfd] = cf->pfds[readyfd]; cf->sessions[readyfd - skipfd] = cf->sessions[readyfd]; sp->sidx[ridx] = readyfd - skipfd;; } if (sp->complete != 0) { if ((cf->pfds[readyfd].revents & POLLIN) != 0) rxmit_packets(cf, sp, ridx, ctime); if (sp->resizers[ridx].output_nsamples > 0) { while ((packet = rtp_resizer_get(&sp->resizers[ridx], ctime)) != NULL) { send_packet(cf, sp, ridx, packet); rtp_packet_free(packet); } } } } /* Trim any deleted sessions at the end */ cf->nsessions -= skipfd;}static voidprocess_commands(struct cfg *cf){ int controlfd, i; socklen_t rlen; struct sockaddr_un ifsun; if ((cf->pfds[0].revents & POLLIN) == 0) return; do { if (cf->umode == 0) { rlen = sizeof(ifsun); controlfd = accept(cf->pfds[0].fd, sstosa(&ifsun), &rlen); if (controlfd == -1) { if (errno != EWOULDBLOCK) rtpp_log_ewrite(RTPP_LOG_ERR, cf->glog, "can't accept connection on control socket"); break; } } else { controlfd = cf->pfds[0].fd; } i = handle_command(cf, controlfd); if (cf->umode == 0) { close(controlfd); } } while (i == 0);}intmain(int argc, char **argv){ int i, len, timeout, controlfd, alarm_tick; double sptime, eptime, last_tick_time; unsigned long delay; struct cfg cf; char buf[256]; memset(&cf, 0, sizeof(cf)); init_hash_table(&cf); init_config(&cf, argc, argv); controlfd = init_controlfd(&cf);#if !defined(__solaris__) if (cf.nodaemon == 0) { if (daemon(0, 0) == -1) err(1, "can't switch into daemon mode"); /* NOTREACHED */ }#endif atexit(ehandler); glog = cf.glog = rtpp_log_open("rtpproxy", NULL, LF_REOPEN); rtpp_log_write(RTPP_LOG_INFO, cf.glog, "rtpproxy started, pid %d", getpid()); i = open(pid_file, O_WRONLY | O_CREAT | O_TRUNC, DEFFILEMODE); if (i >= 0) { len = sprintf(buf, "%u\n", getpid()); write(i, buf, len); close(i); } else { rtpp_log_ewrite(RTPP_LOG_ERR, cf.glog, "can't open pidfile for writing"); } signal(SIGHUP, fatsignal); signal(SIGINT, fatsignal); signal(SIGKILL, fatsignal); signal(SIGPIPE, SIG_IGN); signal(SIGTERM, fatsignal); signal(SIGXCPU, fatsignal); signal(SIGXFSZ, fatsignal); signal(SIGVTALRM, fatsignal); signal(SIGPROF, fatsignal); signal(SIGUSR1, fatsignal); signal(SIGUSR2, fatsignal); if (cf.run_uname != NULL || cf.run_gname != NULL) { if (drop_privileges(&cf, cf.run_uname, cf.run_gname) != 0) { rtpp_log_ewrite(RTPP_LOG_ERR, cf.glog, "can't switch to requested user/group"); exit(1); } } cf.pfds[0].fd = controlfd; cf.pfds[0].events = POLLIN; cf.pfds[0].revents = 0; cf.sessions[0] = NULL; cf.nsessions = 1; cf.rtp_nsessions = 0; sptime = 0; last_tick_time = 0; for (;;) { if (cf.rtp_nsessions > 0 || cf.nsessions > 1) timeout = RTPS_TICKS_MIN; else timeout = TIMETICK * 1000; eptime = getctime(); delay = (eptime - sptime) * 1000000.0; if (delay < (1000000 / POLL_LIMIT)) { usleep((1000000 / POLL_LIMIT) - delay); sptime = getctime(); } else { sptime = eptime; } i = poll(cf.pfds, cf.nsessions, timeout); if (i < 0 && errno == EINTR) continue; eptime = getctime(); if (cf.rtp_nsessions > 0) { process_rtp_servers(&cf, eptime); } if (eptime > last_tick_time + TIMETICK) { alarm_tick = 1; last_tick_time = eptime; } else { alarm_tick = 0; } process_rtp(&cf, eptime, alarm_tick); if (i > 0) { process_commands(&cf); } } exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -