📄 transport_udp.c
字号:
/* $Id: transport_udp.c 1266 2007-05-11 15:14:34Z bennylp $ */
/*
* Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <pjmedia/transport_udp.h>
#include <pj/addr_resolv.h>
#include <pj/assert.h>
#include <pj/errno.h>
#include <pj/ioqueue.h>
#include <pj/log.h>
#include <pj/rand.h>
#include <pj/string.h>
/* Maximum size of incoming RTP packet */
#define RTP_LEN 1500
/* Maximum size of incoming RTCP packet */
#define RTCP_LEN 600
/* Maximum pending write operations */
#define MAX_PENDING 4
/* Pending write buffer */
typedef struct pending_write
{
char buffer[RTP_LEN];
pj_ioqueue_op_key_t op_key;
} pending_write;
struct transport_udp
{
pjmedia_transport base; /**< Base transport. */
pj_pool_t *pool; /**< Memory pool */
unsigned options; /**< Transport options. */
void *user_data; /**< Only valid when attached */
pj_bool_t attached; /**< Has attachment? */
pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */
pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */
void (*rtp_cb)( void*, /**< To report incoming RTP. */
const void*,
pj_ssize_t);
void (*rtcp_cb)( void*, /**< To report incoming RTCP. */
const void*,
pj_ssize_t);
unsigned tx_drop_pct; /**< Percent of tx pkts to drop. */
unsigned rx_drop_pct; /**< Percent of rx pkts to drop. */
pj_sock_t rtp_sock; /**< RTP socket */
pj_sockaddr_in rtp_addr_name; /**< Published RTP address. */
pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */
pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */
unsigned rtp_write_op_id;/**< Next write_op to use */
pending_write rtp_pending_write[MAX_PENDING]; /**< Pending write */
pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */
unsigned rtp_src_cnt; /**< How many pkt from this addr. */
int rtp_addrlen; /**< Address length. */
char rtp_pkt[RTP_LEN];/**< Incoming RTP packet buffer */
pj_sock_t rtcp_sock; /**< RTCP socket */
pj_sockaddr_in rtcp_addr_name; /**< Published RTCP address. */
pj_sockaddr_in rtcp_src_addr; /**< Actual source RTCP address. */
int rtcp_addr_len; /**< Length of RTCP src address. */
pj_ioqueue_key_t *rtcp_key; /**< RTCP socket key in ioqueue */
pj_ioqueue_op_key_t rtcp_read_op; /**< Pending read operation */
pj_ioqueue_op_key_t rtcp_write_op; /**< Pending write operation */
char rtcp_pkt[RTCP_LEN];/**< Incoming RTCP packet buffer */
};
static void on_rx_rtp( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read);
static void on_rx_rtcp(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read);
static pj_status_t transport_get_info(pjmedia_transport *tp,
pjmedia_sock_info *info);
static pj_status_t transport_attach( pjmedia_transport *tp,
void *user_data,
const pj_sockaddr_t *rem_addr,
const pj_sockaddr_t *rem_rtcp,
unsigned addr_len,
void (*rtp_cb)(void*,
const void*,
pj_ssize_t),
void (*rtcp_cb)(void*,
const void*,
pj_ssize_t));
static void transport_detach( pjmedia_transport *tp,
void *strm);
static pj_status_t transport_send_rtp( pjmedia_transport *tp,
const void *pkt,
pj_size_t size);
static pj_status_t transport_send_rtcp(pjmedia_transport *tp,
const void *pkt,
pj_size_t size);
static pjmedia_transport_op transport_udp_op =
{
&transport_get_info,
&transport_attach,
&transport_detach,
&transport_send_rtp,
&transport_send_rtcp,
&pjmedia_transport_udp_close
};
/**
* Create UDP stream transport.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt,
const char *name,
int port,
unsigned options,
pjmedia_transport **p_tp)
{
return pjmedia_transport_udp_create2(endpt, name, NULL, port, options,
p_tp);
}
/**
* Create UDP stream transport.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_create2(pjmedia_endpt *endpt,
const char *name,
const pj_str_t *addr,
int port,
unsigned options,
pjmedia_transport **p_tp)
{
pjmedia_sock_info si;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && port && p_tp, PJ_EINVAL);
pj_bzero(&si, sizeof(pjmedia_sock_info));
si.rtp_sock = si.rtcp_sock = PJ_INVALID_SOCKET;
/* Create RTP socket */
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtp_sock);
if (status != PJ_SUCCESS)
goto on_error;
/* Bind RTP socket */
pj_sockaddr_in_init(&si.rtp_addr_name, addr, (pj_uint16_t)port);
status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,
sizeof(si.rtp_addr_name));
if (status != PJ_SUCCESS)
goto on_error;
/* Create RTCP socket */
status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtcp_sock);
if (status != PJ_SUCCESS)
goto on_error;
/* Bind RTCP socket */
pj_sockaddr_in_init(&si.rtcp_addr_name, addr, (pj_uint16_t)(port+1));
status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name,
sizeof(si.rtcp_addr_name));
if (status != PJ_SUCCESS)
goto on_error;
/* Create UDP transport by attaching socket info */
return pjmedia_transport_udp_attach( endpt, name, &si, options, p_tp);
on_error:
if (si.rtp_sock != PJ_INVALID_SOCKET)
pj_sock_close(si.rtp_sock);
if (si.rtcp_sock != PJ_INVALID_SOCKET)
pj_sock_close(si.rtcp_sock);
return status;
}
/**
* Create UDP stream transport from existing socket info.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
const char *name,
const pjmedia_sock_info *si,
unsigned options,
pjmedia_transport **p_tp)
{
struct transport_udp *tp;
pj_pool_t *pool;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback rtp_cb, rtcp_cb;
pj_ssize_t size;
unsigned i;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && si && p_tp, PJ_EINVAL);
/* Check name */
if (!name)
name = "udpmedia";
/* Get ioqueue instance */
ioqueue = pjmedia_endpt_get_ioqueue(endpt);
/* Create transport structure */
pool = pjmedia_endpt_create_pool(endpt, name, 512, 512);
if (!pool)
return PJ_ENOMEM;
tp = PJ_POOL_ZALLOC_T(pool, struct transport_udp);
tp->pool = pool;
tp->options = options;
pj_ansi_strcpy(tp->base.name, name);
tp->base.op = &transport_udp_op;
tp->base.type = PJMEDIA_TRANSPORT_TYPE_UDP;
/* Copy socket infos */
tp->rtp_sock = si->rtp_sock;
tp->rtp_addr_name = si->rtp_addr_name;
tp->rtcp_sock = si->rtcp_sock;
tp->rtcp_addr_name = si->rtcp_addr_name;
/* If address is 0.0.0.0, use host's IP address */
if (tp->rtp_addr_name.sin_addr.s_addr == 0) {
pj_in_addr hostip;
status = pj_gethostip(&hostip);
if (status != PJ_SUCCESS)
goto on_error;
tp->rtp_addr_name.sin_addr = hostip;
}
/* Same with RTCP */
if (tp->rtcp_addr_name.sin_addr.s_addr == 0) {
tp->rtcp_addr_name.sin_addr.s_addr = tp->rtp_addr_name.sin_addr.s_addr;
}
/* Setup RTP socket with the ioqueue */
pj_bzero(&rtp_cb, sizeof(rtp_cb));
rtp_cb.on_read_complete = &on_rx_rtp;
status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtp_sock, tp,
&rtp_cb, &tp->rtp_key);
if (status != PJ_SUCCESS)
goto on_error;
pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op));
for (i=0; i<PJ_ARRAY_SIZE(tp->rtp_pending_write); ++i)
pj_ioqueue_op_key_init(&tp->rtp_pending_write[i].op_key,
sizeof(tp->rtp_pending_write[i].op_key));
/* Kick of pending RTP read from the ioqueue */
tp->rtp_addrlen = sizeof(tp->rtp_src_addr);
size = sizeof(tp->rtp_pkt);
status = pj_ioqueue_recvfrom(tp->rtp_key, &tp->rtp_read_op,
tp->rtp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC,
&tp->rtp_src_addr, &tp->rtp_addrlen);
if (status != PJ_EPENDING)
goto on_error;
/* Setup RTCP socket with ioqueue */
pj_bzero(&rtcp_cb, sizeof(rtcp_cb));
rtcp_cb.on_read_complete = &on_rx_rtcp;
status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtcp_sock, tp,
&rtcp_cb, &tp->rtcp_key);
if (status != PJ_SUCCESS)
goto on_error;
pj_ioqueue_op_key_init(&tp->rtcp_read_op, sizeof(tp->rtcp_read_op));
pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op));
/* Kick of pending RTCP read from the ioqueue */
size = sizeof(tp->rtcp_pkt);
tp->rtcp_addr_len = sizeof(tp->rtcp_src_addr);
status = pj_ioqueue_recvfrom( tp->rtcp_key, &tp->rtcp_read_op,
tp->rtcp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC,
&tp->rtcp_src_addr, &tp->rtcp_addr_len);
if (status != PJ_EPENDING)
goto on_error;
/* Done */
*p_tp = &tp->base;
return PJ_SUCCESS;
on_error:
pjmedia_transport_udp_close(&tp->base);
return status;
}
/*
* Get media socket info.
*/
PJ_DEF(pj_status_t)
pjmedia_transport_udp_get_info( pjmedia_transport *tp,
pjmedia_transport_udp_info *inf)
{
return transport_get_info(tp, &inf->skinfo);
}
/**
* Close UDP transport.
*/
PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp)
{
struct transport_udp *udp = (struct transport_udp*) tp;
/* Sanity check */
PJ_ASSERT_RETURN(tp, PJ_EINVAL);
/* Must not close while application is using this */
PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP);
if (udp->rtp_key) {
pj_ioqueue_unregister(udp->rtp_key);
udp->rtp_key = NULL;
} else if (udp->rtp_sock != PJ_INVALID_SOCKET) {
pj_sock_close(udp->rtp_sock);
udp->rtp_sock = PJ_INVALID_SOCKET;
}
if (udp->rtcp_key) {
pj_ioqueue_unregister(udp->rtcp_key);
udp->rtcp_key = NULL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -