📄 sip_transport.c
字号:
pjsip_transport_dec_ref(tp);
}
} else {
/* For connection oriented transport, enum the factories */
pjsip_tpfactory *f;
pj_lock_acquire(tpmgr->lock);
f = tpmgr->factory_list.next;
while (f != &tpmgr->factory_list) {
if (f->type == type)
break;
f = f->next;
}
if (f != &tpmgr->factory_list) {
pj_strdup(pool, ip_addr, &f->addr_name.host);
*port = f->addr_name.port;
status = PJ_SUCCESS;
}
pj_lock_release(tpmgr->lock);
}
return PJ_SUCCESS;
}
/*
* pjsip_tpmgr_destroy()
*
* Destroy transport manager.
*/
PJ_DEF(pj_status_t) pjsip_tpmgr_destroy( pjsip_tpmgr *mgr )
{
pj_hash_iterator_t itr_val;
pj_hash_iterator_t *itr;
pjsip_tpfactory *factory;
pjsip_endpoint *endpt = mgr->endpt;
PJ_LOG(5, (THIS_FILE, "Destroying transport manager"));
pj_lock_acquire(mgr->lock);
/*
* Destroy all transports.
*/
itr = pj_hash_first(mgr->table, &itr_val);
while (itr != NULL) {
pj_hash_iterator_t *next;
pjsip_transport *transport;
transport = pj_hash_this(mgr->table, itr);
next = pj_hash_next(mgr->table, itr);
destroy_transport(mgr, transport);
itr = next;
}
/*
* Destroy all factories/listeners.
*/
factory = mgr->factory_list.next;
while (factory != &mgr->factory_list) {
pjsip_tpfactory *next = factory->next;
factory->destroy(factory);
factory = next;
}
pj_lock_release(mgr->lock);
pj_lock_destroy(mgr->lock);
/* Unregister mod_msg_print. */
if (mod_msg_print.id != -1) {
pjsip_endpt_unregister_module(endpt, &mod_msg_print);
}
#if defined(PJ_DEBUG) && PJ_DEBUG!=0
/* If you encounter assert error on this line, it means there are
* leakings in transmit data (i.e. some transmit data have not been
* destroyed).
*/
//pj_assert(pj_atomic_get(mgr->tdata_counter) == 0);
if (pj_atomic_get(mgr->tdata_counter) != 0) {
PJ_LOG(3,(THIS_FILE, "Warning: %d transmit buffer(s) not freed!",
pj_atomic_get(mgr->tdata_counter)));
}
#endif
return PJ_SUCCESS;
}
/*
* pjsip_tpmgr_receive_packet()
*
* Called by tranports when they receive a new packet.
*/
PJ_DEF(pj_ssize_t) pjsip_tpmgr_receive_packet( pjsip_tpmgr *mgr,
pjsip_rx_data *rdata)
{
pjsip_transport *tr = rdata->tp_info.transport;
char *current_pkt;
pj_size_t remaining_len;
pj_size_t total_processed = 0;
/* Check size. */
pj_assert(rdata->pkt_info.len > 0);
if (rdata->pkt_info.len <= 0)
return -1;
current_pkt = rdata->pkt_info.packet;
remaining_len = rdata->pkt_info.len;
/* Must NULL terminate buffer. This is the requirement of the
* parser etc.
*/
current_pkt[remaining_len] = '\0';
/* Process all message fragments. */
while (remaining_len > 0) {
pjsip_msg *msg;
pj_size_t msg_fragment_size;
/* Initialize default fragment size. */
msg_fragment_size = remaining_len;
/* Null terminate packet. */
/* Clear and init msg_info in rdata.
* Endpoint might inspect the values there when we call the callback
* to report some errors.
*/
pj_bzero(&rdata->msg_info, sizeof(rdata->msg_info));
pj_list_init(&rdata->msg_info.parse_err);
rdata->msg_info.msg_buf = current_pkt;
rdata->msg_info.len = remaining_len;
/* For TCP transport, check if the whole message has been received. */
if ((tr->flag & PJSIP_TRANSPORT_DATAGRAM) == 0) {
pj_status_t msg_status;
msg_status = pjsip_find_msg(current_pkt, remaining_len, PJ_FALSE,
&msg_fragment_size);
if (msg_status != PJ_SUCCESS) {
if (remaining_len == PJSIP_MAX_PKT_LEN) {
mgr->on_rx_msg(mgr->endpt, PJSIP_ERXOVERFLOW, rdata);
/* Exhaust all data. */
return rdata->pkt_info.len;
} else {
/* Not enough data in packet. */
return total_processed;
}
}
}
/* Update msg_info. */
rdata->msg_info.len = msg_fragment_size;
/* Parse the message. */
rdata->msg_info.msg = msg =
pjsip_parse_rdata( current_pkt, msg_fragment_size, rdata);
/* Check for parsing syntax error */
if (msg==NULL || !pj_list_empty(&rdata->msg_info.parse_err)) {
pjsip_parser_err_report *err;
char buf[128];
pj_str_t tmp;
/* Gather syntax error information */
tmp.ptr = buf; tmp.slen = 0;
err = rdata->msg_info.parse_err.next;
while (err != &rdata->msg_info.parse_err) {
int len;
len = pj_ansi_snprintf(tmp.ptr+tmp.slen, sizeof(buf)-tmp.slen,
": %s exception when parsing '%.*s' "
"header on line %d col %d",
pj_exception_id_name(err->except_code),
(int)err->hname.slen, err->hname.ptr,
err->line, err->col);
if (len > 0 && len < (int) (sizeof(buf)-tmp.slen)) {
tmp.slen += len;
}
err = err->next;
}
/* Only print error message if there's error.
* Sometimes we receive blank packets (packets with only CRLF)
* which were sent to keep NAT bindings.
*/
if (tmp.slen) {
PJ_LOG(1, (THIS_FILE,
"Error processing %d bytes packet from %s %s:%d %.*s:\n"
"%.*s\n"
"-- end of packet.",
msg_fragment_size,
rdata->tp_info.transport->type_name,
rdata->pkt_info.src_name,
rdata->pkt_info.src_port,
(int)tmp.slen, tmp.ptr,
(int)msg_fragment_size,
rdata->msg_info.msg_buf));
}
goto finish_process_fragment;
}
/* Perform basic header checking. */
if (rdata->msg_info.cid == NULL ||
rdata->msg_info.cid->id.slen == 0 ||
rdata->msg_info.from == NULL ||
rdata->msg_info.to == NULL ||
rdata->msg_info.via == NULL ||
rdata->msg_info.cseq == NULL)
{
mgr->on_rx_msg(mgr->endpt, PJSIP_EMISSINGHDR, rdata);
goto finish_process_fragment;
}
/* Always add received parameter to the via. */
pj_strdup2(rdata->tp_info.pool,
&rdata->msg_info.via->recvd_param,
rdata->pkt_info.src_name);
/* RFC 3581:
* If message contains "rport" param, put the received port there.
*/
if (rdata->msg_info.via->rport_param == 0) {
rdata->msg_info.via->rport_param = rdata->pkt_info.src_port;
}
/* Drop response message if it has more than one Via.
*/
/* This is wrong. Proxy DOES receive responses with multiple
* Via headers! Thanks Aldo <acampi at deis.unibo.it> for pointing
* this out.
if (msg->type == PJSIP_RESPONSE_MSG) {
pjsip_hdr *hdr;
hdr = (pjsip_hdr*)rdata->msg_info.via->next;
if (hdr != &msg->hdr) {
hdr = pjsip_msg_find_hdr(msg, PJSIP_H_VIA, hdr);
if (hdr) {
mgr->on_rx_msg(mgr->endpt, PJSIP_EMULTIPLEVIA, rdata);
goto finish_process_fragment;
}
}
}
*/
/* Call the transport manager's upstream message callback.
*/
mgr->on_rx_msg(mgr->endpt, PJ_SUCCESS, rdata);
finish_process_fragment:
total_processed += msg_fragment_size;
current_pkt += msg_fragment_size;
remaining_len -= msg_fragment_size;
} /* while (rdata->pkt_info.len > 0) */
return total_processed;
}
/*
* pjsip_tpmgr_acquire_transport()
*
* Get transport suitable to communicate to remote. Create a new one
* if necessary.
*/
PJ_DEF(pj_status_t) pjsip_tpmgr_acquire_transport(pjsip_tpmgr *mgr,
pjsip_transport_type_e type,
const pj_sockaddr_t *remote,
int addr_len,
const pjsip_tpselector *sel,
pjsip_transport **tp)
{
pjsip_tpfactory *factory;
pj_status_t status;
TRACE_((THIS_FILE,"Acquiring transport type=%s, remote=%s:%d",
pjsip_transport_get_type_name(type),
pj_inet_ntoa(((pj_sockaddr_in*)remote)->sin_addr),
pj_ntohs(((pj_sockaddr_in*)remote)->sin_port)));
pj_lock_acquire(mgr->lock);
/* If transport is specified, then just use it if it is suitable
* for the destination.
*/
if (sel && sel->type == PJSIP_TPSELECTOR_TRANSPORT &&
sel->u.transport)
{
pjsip_transport *seltp = sel->u.transport;
/* See if the transport is (not) suitable */
if (seltp->key.type != type) {
pj_lock_release(mgr->lock);
return PJSIP_ETPNOTSUITABLE;
}
/* We could also verify that the destination address is reachable
* from this transport (i.e. both are equal), but if application
* has requested a specific transport to be used, assume that
* it knows what to do.
*
* In other words, I don't think destination verification is a good
* idea for now.
*/
/* Transport looks to be suitable to use, so just use it. */
pjsip_transport_add_ref(seltp);
pj_lock_release(mgr->lock);
*tp = seltp;
TRACE_((THIS_FILE, "Transport %s acquired", seltp->obj_name));
return PJ_SUCCESS;
} else if (sel && sel->type == PJSIP_TPSELECTOR_LISTENER &&
sel->u.listener)
{
/* Application has requested that a specific listener is to
* be used. In this case, skip transport hash table lookup.
*/
/* Verify that the listener type matches the destination type */
if (sel->u.listener->type != type) {
pj_lock_release(mgr->lock);
return PJSIP_ETPNOTSUITABLE;
}
/* We'll use this listener to create transport */
factory = sel->u.listener;
} else {
/*
* This is the "normal" flow, where application doesn't specify
* specific transport/listener to be used to send message to.
* In this case, lookup the transport from the hash table.
*/
struct transport_key key;
int key_len;
pjsip_transport *transport;
key_len = sizeof(key.type) + addr_len;
/* First try to get exact destination. */
key.type = type;
pj_memcpy(&key.addr, remote, addr_len);
transport = pj_hash_get(mgr->table, &key, key_len, NULL);
if (transport == NULL) {
unsigned flag = pjsip_transport_get_flag_from_type(type);
const pj_sockaddr *remote_addr = (const pj_sockaddr*)remote;
/* Ignore address for loop transports. */
if (type == PJSIP_TRANSPORT_LOOP ||
type == PJSIP_TRANSPORT_LOOP_DGRAM)
{
pj_sockaddr_in *addr = (pj_sockaddr_in*)&key.addr;
pj_bzero(addr, sizeof(pj_sockaddr_in));
key_len = sizeof(key.type) + sizeof(pj_sockaddr_in);
transport = pj_hash_get(mgr->table, &key, key_len, NULL);
}
/* For datagram INET transports, try lookup with zero address.
*/
else if ((flag & PJSIP_TRANSPORT_DATAGRAM) &&
(remote_addr->sa_family == PJ_AF_INET))
{
pj_sockaddr_in *addr = (pj_sockaddr_in*)&key.addr;
pj_bzero(addr, sizeof(pj_sockaddr_in));
addr->sin_family = PJ_AF_INET;
key_len = sizeof(key.type) + sizeof(pj_sockaddr_in);
transport = pj_hash_get(mgr->table, &key, key_len, NULL);
}
}
if (transport!=NULL && !transport->is_shutdown) {
/*
* Transport found!
*/
pjsip_transport_add_ref(transport);
pj_lock_release(mgr->lock);
*tp = transport;
TRACE_((THIS_FILE, "Transport %s acquired", transport->obj_name));
return PJ_SUCCESS;
}
/*
* Transport not found!
* Find factory that can create such transport.
*/
factory = mgr->factory_list.next;
while (factory != &mgr->factory_list) {
if (factory->type == type)
break;
factory = factory->next;
}
if (factory == &mgr->factory_list) {
/* No factory can create the transport! */
pj_lock_release(mgr->lock);
TRACE_((THIS_FILE, "No suitable factory was found either"));
return PJSIP_EUNSUPTRANSPORT;
}
}
TRACE_((THIS_FILE, "Creating new transport from factory"));
/* Request factory to create transport. */
status = factory->create_transport(factory, mgr, mgr->endpt,
remote, addr_len, tp);
if (status == PJ_SUCCESS) {
PJ_ASSERT_ON_FAIL(tp!=NULL,
{pj_lock_release(mgr->lock); return PJ_EBUG;});
pjsip_transport_add_ref(*tp);
}
pj_lock_release(mgr->lock);
return status;
}
/**
* Dump transport info.
*/
PJ_DEF(void) pjsip_tpmgr_dump_transports(pjsip_tpmgr *mgr)
{
#if PJ_LOG_MAX_LEVEL >= 3
pj_hash_iterator_t itr_val;
pj_hash_iterator_t *itr;
pjsip_tpfactory *factory;
pj_lock_acquire(mgr->lock);
#if defined(PJ_DEBUG) && PJ_DEBUG!=0
PJ_LOG(3,(THIS_FILE, " Outstanding transmit buffers: %d",
pj_atomic_get(mgr->tdata_counter)));
#endif
PJ_LOG(3, (THIS_FILE, " Dumping listeners:"));
factory = mgr->factory_list.next;
while (factory != &mgr->factory_list) {
PJ_LOG(3, (THIS_FILE, " %s %s:%.*s:%d",
factory->obj_name,
factory->type_name,
(int)factory->addr_name.host.slen,
factory->addr_name.host.ptr,
(int)factory->addr_name.port));
factory = factory->next;
}
itr = pj_hash_first(mgr->table, &itr_val);
if (itr) {
PJ_LOG(3, (THIS_FILE, " Dumping transports:"));
do {
pjsip_transport *t = pj_hash_this(mgr->table, itr);
PJ_LOG(3, (THIS_FILE, " %s %s (refcnt=%d%s)",
t->obj_name,
t->info,
pj_atomic_get(t->ref_cnt),
(t->idle_timer.id ? " [idle]" : "")));
itr = pj_hash_next(mgr->table, itr);
} while (itr);
}
pj_lock_release(mgr->lock);
#else
PJ_UNUSED_ARG(mgr);
#endif
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -