📄 sip_transport.c
字号:
pj_ssize_t size)
{
pjsip_tx_data *tdata = token;
PJ_UNUSED_ARG(transport);
/* Mark pending off so that app can resend/reuse txdata from inside
* the callback.
*/
tdata->is_pending = 0;
/* Call callback, if any. */
if (tdata->cb) {
(*tdata->cb)(tdata->token, tdata, size);
}
/* Decrement reference count. */
pjsip_tx_data_dec_ref(tdata);
}
/* This function is called by endpoint for on_tx_request() and on_tx_response()
* notification.
*/
static pj_status_t mod_on_tx_msg(pjsip_tx_data *tdata)
{
/* Allocate buffer if necessary. */
if (tdata->buf.start == NULL) {
tdata->buf.start = pj_pool_alloc( tdata->pool, PJSIP_MAX_PKT_LEN);
tdata->buf.cur = tdata->buf.start;
tdata->buf.end = tdata->buf.start + PJSIP_MAX_PKT_LEN;
}
/* Do we need to reprint? */
if (!pjsip_tx_data_is_valid(tdata)) {
pj_ssize_t size;
size = pjsip_msg_print( tdata->msg, tdata->buf.start,
tdata->buf.end - tdata->buf.start);
if (size < 0) {
return PJSIP_EMSGTOOLONG;
}
pj_assert(size != 0);
tdata->buf.cur[size] = '\0';
tdata->buf.cur += size;
}
return PJ_SUCCESS;
}
/*
* Send a SIP message using the specified transport.
*/
PJ_DEF(pj_status_t) pjsip_transport_send( pjsip_transport *tr,
pjsip_tx_data *tdata,
const pj_sockaddr_t *addr,
int addr_len,
void *token,
void (*cb)(void *token,
pjsip_tx_data *tdata,
pj_ssize_t))
{
pj_status_t status;
PJ_ASSERT_RETURN(tr && tdata && addr, PJ_EINVAL);
/* Is it currently being sent? */
if (tdata->is_pending) {
pj_assert(!"Invalid operation step!");
PJ_LOG(2,(THIS_FILE, "Unable to send %s: message is pending",
pjsip_tx_data_get_info(tdata)));
return PJSIP_EPENDINGTX;
}
/* Fill in tp_info. */
tdata->tp_info.transport = tr;
pj_memcpy(&tdata->tp_info.dst_addr, addr, addr_len);
tdata->tp_info.dst_addr_len = addr_len;
if (((pj_sockaddr*)addr)->sa_family == PJ_AF_INET) {
const char *str_addr;
str_addr = pj_inet_ntoa(((pj_sockaddr_in*)addr)->sin_addr);
pj_ansi_strcpy(tdata->tp_info.dst_name, str_addr);
tdata->tp_info.dst_port = pj_ntohs(((pj_sockaddr_in*)addr)->sin_port);
} else {
pj_ansi_strcpy(tdata->tp_info.dst_name, "<unknown>");
tdata->tp_info.dst_port = 0;
}
/* Distribute to modules.
* When the message reach mod_msg_print, the contents of the message will
* be "printed" to contiguous buffer.
*/
if (tr->tpmgr->on_tx_msg) {
status = (*tr->tpmgr->on_tx_msg)(tr->endpt, tdata);
if (status != PJ_SUCCESS)
return status;
}
/* Save callback data. */
tdata->token = token;
tdata->cb = cb;
/* Add reference counter. */
pjsip_tx_data_add_ref(tdata);
/* Mark as pending. */
tdata->is_pending = 1;
/* Send to transport. */
status = (*tr->send_msg)(tr, tdata, addr, addr_len, (void*)tdata,
&transport_send_callback);
if (status != PJ_EPENDING) {
tdata->is_pending = 0;
pjsip_tx_data_dec_ref(tdata);
}
return status;
}
static void transport_idle_callback(pj_timer_heap_t *timer_heap,
struct pj_timer_entry *entry)
{
pjsip_transport *tp = entry->user_data;
pj_assert(tp != NULL);
PJ_UNUSED_ARG(timer_heap);
entry->id = PJ_FALSE;
pjsip_transport_destroy(tp);
}
/*
* Add ref.
*/
PJ_DEF(pj_status_t) pjsip_transport_add_ref( pjsip_transport *tp )
{
PJ_ASSERT_RETURN(tp != NULL, PJ_EINVAL);
if (pj_atomic_inc_and_get(tp->ref_cnt) == 1) {
pj_lock_acquire(tp->tpmgr->lock);
/* Verify again. */
if (pj_atomic_get(tp->ref_cnt) == 1) {
if (tp->idle_timer.id != PJ_FALSE) {
pjsip_endpt_cancel_timer(tp->tpmgr->endpt, &tp->idle_timer);
tp->idle_timer.id = PJ_FALSE;
}
}
pj_lock_release(tp->tpmgr->lock);
}
return PJ_SUCCESS;
}
/*
* Dec ref.
*/
PJ_DEF(pj_status_t) pjsip_transport_dec_ref( pjsip_transport *tp )
{
PJ_ASSERT_RETURN(tp != NULL, PJ_EINVAL);
pj_assert(pj_atomic_get(tp->ref_cnt) > 0);
if (pj_atomic_dec_and_get(tp->ref_cnt) == 0) {
pj_lock_acquire(tp->tpmgr->lock);
/* Verify again. */
if (pj_atomic_get(tp->ref_cnt) == 0) {
pj_time_val delay;
/* If transport is in graceful shutdown, then this is the
* last user who uses the transport. Schedule to destroy the
* transport immediately. Otherwise schedule idle timer.
*/
if (tp->is_shutdown) {
delay.sec = delay.msec = 0;
} else {
delay.sec = PJSIP_TRANSPORT_IDLE_TIME;
delay.msec = 0;
}
pj_assert(tp->idle_timer.id == 0);
tp->idle_timer.id = PJ_TRUE;
pjsip_endpt_schedule_timer(tp->tpmgr->endpt, &tp->idle_timer,
&delay);
}
pj_lock_release(tp->tpmgr->lock);
}
return PJ_SUCCESS;
}
/**
* Register a transport.
*/
PJ_DEF(pj_status_t) pjsip_transport_register( pjsip_tpmgr *mgr,
pjsip_transport *tp )
{
int key_len;
pj_uint32_t hval;
void *entry;
/* Init. */
tp->tpmgr = mgr;
pj_bzero(&tp->idle_timer, sizeof(tp->idle_timer));
tp->idle_timer.user_data = tp;
tp->idle_timer.cb = &transport_idle_callback;
/*
* Register to hash table (see Trac ticket #42).
*/
key_len = sizeof(tp->key.type) + tp->addr_len;
pj_lock_acquire(mgr->lock);
/* If entry already occupied, unregister previous entry */
hval = 0;
entry = pj_hash_get(mgr->table, &tp->key, key_len, &hval);
if (entry != NULL)
pj_hash_set(NULL, mgr->table, &tp->key, key_len, hval, NULL);
/* Register new entry */
pj_hash_set(tp->pool, mgr->table, &tp->key, key_len, hval, tp);
pj_lock_release(mgr->lock);
TRACE_((THIS_FILE,"Transport %s registered: type=%s, remote=%s:%d",
tp->obj_name,
pjsip_transport_get_type_name(tp->key.type),
pj_inet_ntoa(((pj_sockaddr_in*)&tp->key.rem_addr)->sin_addr),
pj_ntohs(((pj_sockaddr_in*)&tp->key.rem_addr)->sin_port)));
return PJ_SUCCESS;
}
/* Force destroy transport (e.g. during transport manager shutdown. */
static pj_status_t destroy_transport( pjsip_tpmgr *mgr,
pjsip_transport *tp )
{
int key_len;
pj_uint32_t hval;
void *entry;
TRACE_((THIS_FILE, "Transport %s is being destroyed", tp->obj_name));
pj_lock_acquire(tp->lock);
pj_lock_acquire(mgr->lock);
/*
* Unregister timer, if any.
*/
//pj_assert(tp->idle_timer.id == PJ_FALSE);
if (tp->idle_timer.id != PJ_FALSE) {
pjsip_endpt_cancel_timer(mgr->endpt, &tp->idle_timer);
tp->idle_timer.id = PJ_FALSE;
}
/*
* Unregister from hash table (see Trac ticket #42).
*/
key_len = sizeof(tp->key.type) + tp->addr_len;
hval = 0;
entry = pj_hash_get(mgr->table, &tp->key, key_len, &hval);
if (entry == (void*)tp)
pj_hash_set(NULL, mgr->table, &tp->key, key_len, hval, NULL);
pj_lock_release(mgr->lock);
/* Destroy. */
return tp->destroy(tp);
}
/*
* Start graceful shutdown procedure for this transport.
*/
PJ_DEF(pj_status_t) pjsip_transport_shutdown(pjsip_transport *tp)
{
pjsip_tpmgr *mgr;
pj_status_t status;
TRACE_((THIS_FILE, "Transport %s shutting down", tp->obj_name));
pj_lock_acquire(tp->lock);
mgr = tp->tpmgr;
pj_lock_acquire(mgr->lock);
/* Do nothing if transport is being shutdown already */
if (tp->is_shutdown) {
pj_lock_release(tp->lock);
pj_lock_release(mgr->lock);
return PJ_SUCCESS;
}
status = PJ_SUCCESS;
/* Instruct transport to shutdown itself */
if (tp->do_shutdown)
status = tp->do_shutdown(tp);
if (status == PJ_SUCCESS)
tp->is_shutdown = PJ_TRUE;
pj_lock_release(tp->lock);
pj_lock_release(mgr->lock);
return status;
}
/**
* Unregister transport.
*/
PJ_DEF(pj_status_t) pjsip_transport_destroy( pjsip_transport *tp)
{
/* Must have no user. */
PJ_ASSERT_RETURN(pj_atomic_get(tp->ref_cnt) == 0, PJSIP_EBUSY);
/* Destroy. */
return destroy_transport(tp->tpmgr, tp);
}
/*****************************************************************************
*
* TRANSPORT FACTORY
*
*****************************************************************************/
PJ_DEF(pj_status_t) pjsip_tpmgr_register_tpfactory( pjsip_tpmgr *mgr,
pjsip_tpfactory *tpf)
{
pjsip_tpfactory *p;
pj_status_t status;
pj_lock_acquire(mgr->lock);
/* Check that no factory with the same type has been registered. */
status = PJ_SUCCESS;
for (p=mgr->factory_list.next; p!=&mgr->factory_list; p=p->next) {
if (p->type == tpf->type) {
status = PJSIP_ETYPEEXISTS;
break;
}
if (p == tpf) {
status = PJ_EEXISTS;
break;
}
}
if (status != PJ_SUCCESS) {
pj_lock_release(mgr->lock);
return status;
}
pj_list_insert_before(&mgr->factory_list, tpf);
pj_lock_release(mgr->lock);
return PJ_SUCCESS;
}
/**
* Unregister factory.
*/
PJ_DEF(pj_status_t) pjsip_tpmgr_unregister_tpfactory( pjsip_tpmgr *mgr,
pjsip_tpfactory *tpf)
{
pj_lock_acquire(mgr->lock);
pj_assert(pj_list_find_node(&mgr->factory_list, tpf) == tpf);
pj_list_erase(tpf);
pj_lock_release(mgr->lock);
return PJ_SUCCESS;
}
/*****************************************************************************
*
* TRANSPORT MANAGER
*
*****************************************************************************/
/*
* Create a new transport manager.
*/
PJ_DEF(pj_status_t) pjsip_tpmgr_create( pj_pool_t *pool,
pjsip_endpoint *endpt,
void (*rx_cb)(pjsip_endpoint*,
pj_status_t,
pjsip_rx_data *),
pj_status_t (*tx_cb)(pjsip_endpoint*,
pjsip_tx_data*),
pjsip_tpmgr **p_mgr)
{
pjsip_tpmgr *mgr;
pj_status_t status;
PJ_ASSERT_RETURN(pool && endpt && rx_cb && p_mgr, PJ_EINVAL);
/* Register mod_msg_print module. */
status = pjsip_endpt_register_module(endpt, &mod_msg_print);
if (status != PJ_SUCCESS)
return status;
/* Create and initialize transport manager. */
mgr = pj_pool_zalloc(pool, sizeof(*mgr));
mgr->endpt = endpt;
mgr->on_rx_msg = rx_cb;
mgr->on_tx_msg = tx_cb;
pj_list_init(&mgr->factory_list);
mgr->table = pj_hash_create(pool, PJSIP_TPMGR_HTABLE_SIZE);
if (!mgr->table)
return PJ_ENOMEM;
status = pj_lock_create_recursive_mutex(pool, "tmgr%p", &mgr->lock);
if (status != PJ_SUCCESS)
return status;
#if defined(PJ_DEBUG) && PJ_DEBUG!=0
status = pj_atomic_create(pool, 0, &mgr->tdata_counter);
if (status != PJ_SUCCESS)
return status;
#endif
PJ_LOG(5, (THIS_FILE, "Transport manager created."));
*p_mgr = mgr;
return PJ_SUCCESS;
}
/*
* Find out the appropriate local address info (IP address and port) to
* advertise in Contact header based on the remote address to be
* contacted. The local address info would be the address name of the
* transport or listener which will be used to send the request.
*
* In this implementation, it will only select the transport based on
* the transport type in the request.
*/
PJ_DEF(pj_status_t) pjsip_tpmgr_find_local_addr( pjsip_tpmgr *tpmgr,
pj_pool_t *pool,
pjsip_transport_type_e type,
const pjsip_tpselector *sel,
pj_str_t *ip_addr,
int *port)
{
pj_status_t status = PJSIP_EUNSUPTRANSPORT;
unsigned flag;
/* Sanity checks */
PJ_ASSERT_RETURN(tpmgr && pool && ip_addr && port, PJ_EINVAL);
ip_addr->slen = 0;
*port = 0;
flag = pjsip_transport_get_flag_from_type(type);
if (sel && sel->type == PJSIP_TPSELECTOR_TRANSPORT &&
sel->u.transport)
{
pj_strdup(pool, ip_addr, &sel->u.transport->local_name.host);
*port = sel->u.transport->local_name.port;
status = PJ_SUCCESS;
} else if (sel && sel->type == PJSIP_TPSELECTOR_LISTENER &&
sel->u.listener)
{
pj_strdup(pool, ip_addr, &sel->u.listener->addr_name.host);
*port = sel->u.listener->addr_name.port;
status = PJ_SUCCESS;
} else if ((flag & PJSIP_TRANSPORT_DATAGRAM) != 0) {
pj_sockaddr_in remote;
pjsip_transport *tp;
pj_sockaddr_in_init(&remote, NULL, 0);
status = pjsip_tpmgr_acquire_transport(tpmgr, type, &remote,
sizeof(remote), NULL, &tp);
if (status == PJ_SUCCESS) {
pj_strdup(pool, ip_addr, &tp->local_name.host);
*port = tp->local_name.port;
status = PJ_SUCCESS;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -