main.c

来自「大名鼎鼎的 RTP Proxy 源代码 在OpenSER 中成熟应用的」· C语言 代码 · 共 782 行 · 第 1/2 页

C
782
字号
process_rtp_servers(struct cfg *cf, double ctime){    int j, k, sidx, len, skipfd;    struct rtpp_session *sp;    skipfd = 0;    for (j = 0; j < cf->rtp_nsessions; j++) {		sp = cf->rtp_servers[j];		if (sp == NULL) {		    skipfd++;		    continue;		}		if (skipfd > 0) {		    cf->rtp_servers[j - skipfd] = cf->rtp_servers[j];		    sp->sridx = j - skipfd;		}		for (sidx = 0; sidx < 2; sidx++) {		    if (sp->rtps[sidx] == NULL || sp->addr[sidx] == NULL)			continue;		    while ((len = rtp_server_get(sp->rtps[sidx], ctime)) != RTPS_LATER) {				if (len == RTPS_EOF) {				    rtp_server_free(sp->rtps[sidx]);				    sp->rtps[sidx] = NULL;				    if (sp->rtps[0] == NULL && sp->rtps[1] == NULL) {					assert(cf->rtp_servers[sp->sridx] == sp);					cf->rtp_servers[sp->sridx] = NULL;					sp->sridx = -1;				    }				    break;				}				for (k = (cf->dmode && len < LBR_THRS) ? 2 : 1; k > 0; k--) {				    sendto(sp->fds[sidx], sp->rtps[sidx]->buf, len, 0,				      sp->addr[sidx], SA_LEN(sp->addr[sidx]));				}		    }		}    }    cf->rtp_nsessions -= skipfd;}static voidrxmit_packets(struct cfg *cf, struct rtpp_session *sp, int ridx,  double ctime){    int ndrain, i, port;    struct rtp_packet *packet = NULL;    /* Repeat since we may have several packets queued on the same socket */    for (ndrain = 0; ndrain < 5; ndrain++) {		if (packet != NULL)		    rtp_packet_free(packet);		packet = rtp_recv(sp->fds[ridx]);	//接收RTP数据包			if (packet == NULL)		    break;		packet->rtime = ctime;		i = 0;		if (sp->addr[ridx] != NULL) {		    /* Check that the packet is authentic, drop if it isn't */		    if (sp->asymmetric[ridx] == 0) {				if (memcmp(sp->addr[ridx], &packet->raddr, packet->rlen) != 0) {				    if (sp->canupdate[ridx] == 0) {					/*					 * Continue, since there could be good packets in					 * queue.					 */					continue;				    }				    /* Signal that an address have to be updated */				    i = 1;				}		    } else {			/*			 * For asymmetric clients don't check			 * source port since it may be different.			 */			if (!ishostseq(sp->addr[ridx], sstosa(&packet->raddr)))			    /*			     * Continue, since there could be good packets in			     * queue.			     */			    continue;		    }		    sp->pcount[ridx]++;					} else {		    sp->pcount[ridx]++;		    sp->addr[ridx] = malloc(packet->rlen);		    if (sp->addr[ridx] == NULL) {			sp->pcount[3]++;			rtpp_log_write(RTPP_LOG_ERR, sp->log,			  "can't allocate memory for remote address - "			  "removing session");			remove_session(cf, GET_RTP(sp));			/* Break, sp is invalid now */			break;		    }		    /* Signal that an address have to be updated. */		    i = 1;		}		/*		 * Update recorded address if it's necessary. Set "untrusted address"		 * flag in the session state, so that possible future address updates		 * from that client won't get address changed immediately to some		 * bogus one.		 */		if (i != 0) {		    sp->untrusted_addr[ridx] = 1;		    memcpy(sp->addr[ridx], &packet->raddr, packet->rlen);		    sp->canupdate[ridx] = 0;		    port = ntohs(satosin(&packet->raddr)->sin_port);		    rtpp_log_write(RTPP_LOG_INFO, sp->log,		      "%s's address filled in: %s:%d (%s)",		      (ridx == 0) ? "callee" : "caller",		      addr2char(sstosa(&packet->raddr)), port,		      (sp->rtp == NULL) ? "RTP" : "RTCP");		    /*		     * Check if we have updated RTP while RTCP is still		     * empty or contains address that differs from one we		     * used when updating RTP. Try to guess RTCP if so,		     * should be handy for non-NAT'ed clients, and some		     * NATed as well.		     */		    if (sp->rtcp != NULL && (sp->rtcp->addr[ridx] == NULL ||		      !ishostseq(sp->rtcp->addr[ridx], sstosa(&packet->raddr)))) {			if (sp->rtcp->addr[ridx] == NULL) {			    sp->rtcp->addr[ridx] = malloc(packet->rlen);			    if (sp->rtcp->addr[ridx] == NULL) {				sp->pcount[3]++;				rtpp_log_write(RTPP_LOG_ERR, sp->log,				  "can't allocate memory for remote address - "				  "removing session");				remove_session(cf, sp);				/* Break, sp is invalid now */				break;			    }			}			memcpy(sp->rtcp->addr[ridx], &packet->raddr, packet->rlen);			satosin(sp->rtcp->addr[ridx])->sin_port = htons(port + 1);			/* Use guessed value as the only true one for asymmetric clients */			sp->rtcp->canupdate[ridx] = NOT(sp->rtcp->asymmetric[ridx]);			rtpp_log_write(RTPP_LOG_INFO, sp->log, "guessing RTCP port "			  "for %s to be %d",			  (ridx == 0) ? "callee" : "caller", port + 1);		    }		}		if (sp->resizers[ridx].output_nsamples > 0)		    rtp_resizer_enqueue(&sp->resizers[ridx], &packet);				if (packet != NULL)		    send_packet(cf, sp, ridx, packet);								//发送数据包    }    if (packet != NULL)	rtp_packet_free(packet);}static voidsend_packet(struct cfg *cf, struct rtpp_session *sp, int ridx,  struct rtp_packet *packet){    int i, sidx;    GET_RTP(sp)->ttl = cf->max_ttl;    /* Select socket for sending packet out. */    sidx = (ridx == 0) ? 1 : 0;    /*     * Check that we have some address to which packet is to be     * sent out, drop otherwise.     */    if (sp->addr[sidx] == NULL || GET_RTP(sp)->rtps[sidx] != NULL) {		sp->pcount[3]++;    } else {		sp->pcount[2]++;		for (i = (cf->dmode && packet->size < LBR_THRS) ? 2 : 1; i > 0; i--) {		    sendto(sp->fds[sidx], packet->buf, packet->size, 0, sp->addr[sidx],		      SA_LEN(sp->addr[sidx]));		}    }    if (sp->rrcs[ridx] != NULL && GET_RTP(sp)->rtps[ridx] == NULL)	rwrite(sp, sp->rrcs[ridx], packet);}static voidprocess_rtp(struct cfg *cf, double ctime, int alarm_tick){    int readyfd, skipfd, ridx;    struct rtpp_session *sp;    struct rtp_packet *packet;    /* Relay RTP/RTCP */    skipfd = 0;    for (readyfd = 1; readyfd < cf->nsessions; readyfd++) {	//		sp = cf->sessions[readyfd];		if (alarm_tick != 0 && sp != NULL && sp->rtcp != NULL &&		  sp->sidx[0] == readyfd) {		    if (sp->ttl == 0) {				rtpp_log_write(RTPP_LOG_INFO, sp->log, "session timeout");				remove_session(cf, sp);		    } else {				sp->ttl--;		    }		}		if (cf->pfds[readyfd].fd == -1) {		    /* Deleted session, count and move one */		    skipfd++;		    continue;		}	/* Find index of the call leg within a session */	for (ridx = 0; ridx < 2; ridx++)	    if (cf->pfds[readyfd].fd == sp->fds[ridx])		break;		/*		 * Can't happen.		 */		assert(ridx != 2);		/* Compact pfds[] and sessions[] by eliminating removed sessions */		if (skipfd > 0) {		    cf->pfds[readyfd - skipfd] = cf->pfds[readyfd];		    cf->sessions[readyfd - skipfd] = cf->sessions[readyfd];		    sp->sidx[ridx] = readyfd - skipfd;;		}		if (sp->complete != 0) {		    if ((cf->pfds[readyfd].revents & POLLIN) != 0)				rxmit_packets(cf, sp, ridx, ctime);			    if (sp->resizers[ridx].output_nsamples > 0) {					while ((packet = rtp_resizer_get(&sp->resizers[ridx], ctime)) != NULL) {					    send_packet(cf, sp, ridx, packet);					    rtp_packet_free(packet);					}		    }		}    }    /* Trim any deleted sessions at the end */    cf->nsessions -= skipfd;}static voidprocess_commands(struct cfg *cf){    int controlfd, i;    socklen_t rlen;    struct sockaddr_un ifsun;    if ((cf->pfds[0].revents & POLLIN) == 0)	return;    do {		if (cf->umode == 0) {		    rlen = sizeof(ifsun);		    controlfd = accept(cf->pfds[0].fd, sstosa(&ifsun), &rlen);		    if (controlfd == -1) {			if (errno != EWOULDBLOCK)			    rtpp_log_ewrite(RTPP_LOG_ERR, cf->glog,			      "can't accept connection on control socket");			break;		    }		} else {		    controlfd = cf->pfds[0].fd;		}		i = handle_command(cf, controlfd);		if (cf->umode == 0) {		    close(controlfd);		}    } while (i == 0);}intmain(int argc, char **argv){    int i, len, timeout, controlfd, alarm_tick;    double sptime, eptime, last_tick_time;    unsigned long delay;    struct cfg cf;    char buf[256];    memset(&cf, 0, sizeof(cf));    init_hash_table(&cf);		/*randtable 设定随机数*/    init_config(&cf, argc, argv);    controlfd = init_controlfd(&cf);#if !defined(__solaris__)    if (cf.nodaemon == 0) {	if (daemon(0, 0) == -1)	    err(1, "can't switch into daemon mode");	    /* NOTREACHED */    }#endif    atexit(ehandler);    glog = cf.glog = rtpp_log_open("rtpproxy", NULL, LF_REOPEN);    rtpp_log_write(RTPP_LOG_INFO, cf.glog, "rtpproxy started, pid %d", getpid());    i = open(pid_file, O_WRONLY | O_CREAT | O_TRUNC, DEFFILEMODE);    if (i >= 0) {	len = sprintf(buf, "%u\n", getpid());	write(i, buf, len);	close(i);    } else {	rtpp_log_ewrite(RTPP_LOG_ERR, cf.glog, "can't open pidfile for writing");    }    signal(SIGHUP, fatsignal);    signal(SIGINT, fatsignal);    signal(SIGKILL, fatsignal);    signal(SIGPIPE, SIG_IGN);    signal(SIGTERM, fatsignal);    signal(SIGXCPU, fatsignal);    signal(SIGXFSZ, fatsignal);    signal(SIGVTALRM, fatsignal);    signal(SIGPROF, fatsignal);    signal(SIGUSR1, fatsignal);    signal(SIGUSR2, fatsignal);    if (cf.run_uname != NULL || cf.run_gname != NULL) {	if (drop_privileges(&cf, cf.run_uname, cf.run_gname) != 0) {	    rtpp_log_ewrite(RTPP_LOG_ERR, cf.glog,	      "can't switch to requested user/group");	    exit(1);	}    }    cf.pfds[0].fd = controlfd;    cf.pfds[0].events = POLLIN;    cf.pfds[0].revents = 0;    cf.sessions[0] = NULL;    cf.nsessions = 1;    cf.rtp_nsessions = 0;    sptime = 0;    last_tick_time = 0;    for (;;) {		if (cf.rtp_nsessions > 0 || cf.nsessions > 1)		    timeout = RTPS_TICKS_MIN;		else		    timeout = TIMETICK * 1000;		eptime = getctime();		delay = (eptime - sptime) * 1000000.0;		if (delay < (1000000 / POLL_LIMIT)) {		    usleep((1000000 / POLL_LIMIT) - delay);		    sptime = getctime();		} else {		    sptime = eptime;		}				i = poll(cf.pfds, cf.nsessions, timeout);		if (i < 0 && errno == EINTR)		    continue;				eptime = getctime();				if (cf.rtp_nsessions > 0) {		    process_rtp_servers(&cf, eptime);		}				if (eptime > last_tick_time + TIMETICK) {		    alarm_tick = 1;		    last_tick_time = eptime;		} else {		    alarm_tick = 0;		}				process_rtp(&cf, eptime, alarm_tick);				if (i > 0) {		    process_commands(&cf);		}    }    exit(0);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?