📄 sip_transport.c
字号:
pj_ssize_t size){ pjsip_tx_data *tdata = token; PJ_UNUSED_ARG(transport); /* Mark pending off so that app can resend/reuse txdata from inside * the callback. */ tdata->is_pending = 0; /* Call callback, if any. */ if (tdata->cb) { (*tdata->cb)(tdata->token, tdata, size); } /* Decrement reference count. */ pjsip_tx_data_dec_ref(tdata);}/* This function is called by endpoint for on_tx_request() and on_tx_response() * notification. */static pj_status_t mod_on_tx_msg(pjsip_tx_data *tdata){ /* Allocate buffer if necessary. */ if (tdata->buf.start == NULL) { tdata->buf.start = pj_pool_alloc( tdata->pool, PJSIP_MAX_PKT_LEN); tdata->buf.cur = tdata->buf.start; tdata->buf.end = tdata->buf.start + PJSIP_MAX_PKT_LEN; } /* Do we need to reprint? */ if (!pjsip_tx_data_is_valid(tdata)) { pj_ssize_t size; size = pjsip_msg_print( tdata->msg, tdata->buf.start, tdata->buf.end - tdata->buf.start); if (size < 0) { return PJSIP_EMSGTOOLONG; } pj_assert(size != 0); tdata->buf.cur[size] = '\0'; tdata->buf.cur += size; } return PJ_SUCCESS;}/* * Send a SIP message using the specified transport. */PJ_DEF(pj_status_t) pjsip_transport_send( pjsip_transport *tr, pjsip_tx_data *tdata, const pj_sockaddr_t *addr, int addr_len, void *token, void (*cb)(void *token, pjsip_tx_data *tdata, pj_ssize_t)){ pj_status_t status; PJ_ASSERT_RETURN(tr && tdata && addr, PJ_EINVAL); /* Is it currently being sent? */ if (tdata->is_pending) { pj_assert(!"Invalid operation step!"); PJ_LOG(2,(THIS_FILE, "Unable to send %s: message is pending", pjsip_tx_data_get_info(tdata))); return PJSIP_EPENDINGTX; } /* Fill in tp_info. */ tdata->tp_info.transport = tr; pj_memcpy(&tdata->tp_info.dst_addr, addr, addr_len); tdata->tp_info.dst_addr_len = addr_len; if (((pj_sockaddr*)addr)->sa_family == PJ_AF_INET) { const char *str_addr; str_addr = pj_inet_ntoa(((pj_sockaddr_in*)addr)->sin_addr); pj_ansi_strcpy(tdata->tp_info.dst_name, str_addr); tdata->tp_info.dst_port = pj_ntohs(((pj_sockaddr_in*)addr)->sin_port); } else { pj_ansi_strcpy(tdata->tp_info.dst_name, "<unknown>"); tdata->tp_info.dst_port = 0; } /* Distribute to modules. * When the message reach mod_msg_print, the contents of the message will * be "printed" to contiguous buffer. */ if (tr->tpmgr->on_tx_msg) { status = (*tr->tpmgr->on_tx_msg)(tr->endpt, tdata); if (status != PJ_SUCCESS) return status; } /* Save callback data. */ tdata->token = token; tdata->cb = cb; /* Add reference counter. */ pjsip_tx_data_add_ref(tdata); /* Mark as pending. */ tdata->is_pending = 1; /* Send to transport. */ status = (*tr->send_msg)(tr, tdata, addr, addr_len, (void*)tdata, &transport_send_callback); if (status != PJ_EPENDING) { tdata->is_pending = 0; pjsip_tx_data_dec_ref(tdata); } return status;}static void transport_idle_callback(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry){ pjsip_transport *tp = entry->user_data; pj_assert(tp != NULL); PJ_UNUSED_ARG(timer_heap); entry->id = PJ_FALSE; pjsip_transport_destroy(tp);}/* * Add ref. */PJ_DEF(pj_status_t) pjsip_transport_add_ref( pjsip_transport *tp ){ PJ_ASSERT_RETURN(tp != NULL, PJ_EINVAL); if (pj_atomic_inc_and_get(tp->ref_cnt) == 1) { pj_lock_acquire(tp->tpmgr->lock); /* Verify again. */ if (pj_atomic_get(tp->ref_cnt) == 1) { if (tp->idle_timer.id != PJ_FALSE) { pjsip_endpt_cancel_timer(tp->tpmgr->endpt, &tp->idle_timer); tp->idle_timer.id = PJ_FALSE; } } pj_lock_release(tp->tpmgr->lock); } return PJ_SUCCESS;}/* * Dec ref. */PJ_DEF(pj_status_t) pjsip_transport_dec_ref( pjsip_transport *tp ){ PJ_ASSERT_RETURN(tp != NULL, PJ_EINVAL); pj_assert(pj_atomic_get(tp->ref_cnt) > 0); if (pj_atomic_dec_and_get(tp->ref_cnt) == 0) { pj_lock_acquire(tp->tpmgr->lock); /* Verify again. */ if (pj_atomic_get(tp->ref_cnt) == 0) { pj_time_val delay; /* If transport is in graceful shutdown, then this is the * last user who uses the transport. Schedule to destroy the * transport immediately. Otherwise schedule idle timer. */ if (tp->is_shutdown) { delay.sec = delay.msec = 0; } else { delay.sec = PJSIP_TRANSPORT_IDLE_TIME; delay.msec = 0; } pj_assert(tp->idle_timer.id == 0); tp->idle_timer.id = PJ_TRUE; pjsip_endpt_schedule_timer(tp->tpmgr->endpt, &tp->idle_timer, &delay); } pj_lock_release(tp->tpmgr->lock); } return PJ_SUCCESS;}/** * Register a transport. */PJ_DEF(pj_status_t) pjsip_transport_register( pjsip_tpmgr *mgr, pjsip_transport *tp ){ int key_len; pj_uint32_t hval; void *entry; /* Init. */ tp->tpmgr = mgr; pj_bzero(&tp->idle_timer, sizeof(tp->idle_timer)); tp->idle_timer.user_data = tp; tp->idle_timer.cb = &transport_idle_callback; /* * Register to hash table (see Trac ticket #42). */ key_len = sizeof(tp->key.type) + tp->addr_len; pj_lock_acquire(mgr->lock); /* If entry already occupied, unregister previous entry */ hval = 0; entry = pj_hash_get(mgr->table, &tp->key, key_len, &hval); if (entry != NULL) pj_hash_set(NULL, mgr->table, &tp->key, key_len, hval, NULL); /* Register new entry */ pj_hash_set(tp->pool, mgr->table, &tp->key, key_len, hval, tp); pj_lock_release(mgr->lock); TRACE_((THIS_FILE,"Transport %s registered: type=%s, remote=%s:%d", tp->obj_name, pjsip_transport_get_type_name(tp->key.type), pj_inet_ntoa(((pj_sockaddr_in*)&tp->key.rem_addr)->sin_addr), pj_ntohs(((pj_sockaddr_in*)&tp->key.rem_addr)->sin_port))); return PJ_SUCCESS;}/* Force destroy transport (e.g. during transport manager shutdown. */static pj_status_t destroy_transport( pjsip_tpmgr *mgr, pjsip_transport *tp ){ int key_len; pj_uint32_t hval; void *entry; TRACE_((THIS_FILE, "Transport %s is being destroyed", tp->obj_name)); pj_lock_acquire(tp->lock); pj_lock_acquire(mgr->lock); /* * Unregister timer, if any. */ //pj_assert(tp->idle_timer.id == PJ_FALSE); if (tp->idle_timer.id != PJ_FALSE) { pjsip_endpt_cancel_timer(mgr->endpt, &tp->idle_timer); tp->idle_timer.id = PJ_FALSE; } /* * Unregister from hash table (see Trac ticket #42). */ key_len = sizeof(tp->key.type) + tp->addr_len; hval = 0; entry = pj_hash_get(mgr->table, &tp->key, key_len, &hval); if (entry == (void*)tp) pj_hash_set(NULL, mgr->table, &tp->key, key_len, hval, NULL); pj_lock_release(mgr->lock); /* Destroy. */ return tp->destroy(tp);}/* * Start graceful shutdown procedure for this transport. */PJ_DEF(pj_status_t) pjsip_transport_shutdown(pjsip_transport *tp){ pjsip_tpmgr *mgr; pj_status_t status; TRACE_((THIS_FILE, "Transport %s shutting down", tp->obj_name)); pj_lock_acquire(tp->lock); mgr = tp->tpmgr; pj_lock_acquire(mgr->lock); /* Do nothing if transport is being shutdown already */ if (tp->is_shutdown) { pj_lock_release(tp->lock); pj_lock_release(mgr->lock); return PJ_SUCCESS; } status = PJ_SUCCESS; /* Instruct transport to shutdown itself */ if (tp->do_shutdown) status = tp->do_shutdown(tp); if (status == PJ_SUCCESS) tp->is_shutdown = PJ_TRUE; pj_lock_release(tp->lock); pj_lock_release(mgr->lock); return status;}/** * Unregister transport. */PJ_DEF(pj_status_t) pjsip_transport_destroy( pjsip_transport *tp){ /* Must have no user. */ PJ_ASSERT_RETURN(pj_atomic_get(tp->ref_cnt) == 0, PJSIP_EBUSY); /* Destroy. */ return destroy_transport(tp->tpmgr, tp);}/***************************************************************************** * * TRANSPORT FACTORY * *****************************************************************************/PJ_DEF(pj_status_t) pjsip_tpmgr_register_tpfactory( pjsip_tpmgr *mgr, pjsip_tpfactory *tpf){ pjsip_tpfactory *p; pj_status_t status; pj_lock_acquire(mgr->lock); /* Check that no factory with the same type has been registered. */ status = PJ_SUCCESS; for (p=mgr->factory_list.next; p!=&mgr->factory_list; p=p->next) { if (p->type == tpf->type) { status = PJSIP_ETYPEEXISTS; break; } if (p == tpf) { status = PJ_EEXISTS; break; } } if (status != PJ_SUCCESS) { pj_lock_release(mgr->lock); return status; } pj_list_insert_before(&mgr->factory_list, tpf); pj_lock_release(mgr->lock); return PJ_SUCCESS;}/** * Unregister factory. */PJ_DEF(pj_status_t) pjsip_tpmgr_unregister_tpfactory( pjsip_tpmgr *mgr, pjsip_tpfactory *tpf){ pj_lock_acquire(mgr->lock); pj_assert(pj_list_find_node(&mgr->factory_list, tpf) == tpf); pj_list_erase(tpf); pj_lock_release(mgr->lock); return PJ_SUCCESS;}/***************************************************************************** * * TRANSPORT MANAGER * *****************************************************************************//* * Create a new transport manager. */PJ_DEF(pj_status_t) pjsip_tpmgr_create( pj_pool_t *pool, pjsip_endpoint *endpt, void (*rx_cb)(pjsip_endpoint*, pj_status_t, pjsip_rx_data *), pj_status_t (*tx_cb)(pjsip_endpoint*, pjsip_tx_data*), pjsip_tpmgr **p_mgr){ pjsip_tpmgr *mgr; pj_status_t status; PJ_ASSERT_RETURN(pool && endpt && rx_cb && p_mgr, PJ_EINVAL); /* Register mod_msg_print module. */ status = pjsip_endpt_register_module(endpt, &mod_msg_print); if (status != PJ_SUCCESS) return status; /* Create and initialize transport manager. */ mgr = pj_pool_zalloc(pool, sizeof(*mgr)); mgr->endpt = endpt; mgr->on_rx_msg = rx_cb; mgr->on_tx_msg = tx_cb; pj_list_init(&mgr->factory_list); mgr->table = pj_hash_create(pool, PJSIP_TPMGR_HTABLE_SIZE); if (!mgr->table) return PJ_ENOMEM; status = pj_lock_create_recursive_mutex(pool, "tmgr%p", &mgr->lock); if (status != PJ_SUCCESS) return status;#if defined(PJ_DEBUG) && PJ_DEBUG!=0 status = pj_atomic_create(pool, 0, &mgr->tdata_counter); if (status != PJ_SUCCESS) return status;#endif PJ_LOG(5, (THIS_FILE, "Transport manager created.")); *p_mgr = mgr; return PJ_SUCCESS;}/* * Find out the appropriate local address info (IP address and port) to * advertise in Contact header based on the remote address to be * contacted. The local address info would be the address name of the * transport or listener which will be used to send the request. * * In this implementation, it will only select the transport based on * the transport type in the request. */PJ_DEF(pj_status_t) pjsip_tpmgr_find_local_addr( pjsip_tpmgr *tpmgr, pj_pool_t *pool, pjsip_transport_type_e type, const pjsip_tpselector *sel, pj_str_t *ip_addr, int *port){ pj_status_t status = PJSIP_EUNSUPTRANSPORT; unsigned flag; /* Sanity checks */ PJ_ASSERT_RETURN(tpmgr && pool && ip_addr && port, PJ_EINVAL); ip_addr->slen = 0; *port = 0; flag = pjsip_transport_get_flag_from_type(type); if (sel && sel->type == PJSIP_TPSELECTOR_TRANSPORT && sel->u.transport) { pj_strdup(pool, ip_addr, &sel->u.transport->local_name.host); *port = sel->u.transport->local_name.port; status = PJ_SUCCESS; } else if (sel && sel->type == PJSIP_TPSELECTOR_LISTENER && sel->u.listener) { pj_strdup(pool, ip_addr, &sel->u.listener->addr_name.host); *port = sel->u.listener->addr_name.port; status = PJ_SUCCESS; } else if ((flag & PJSIP_TRANSPORT_DATAGRAM) != 0) { pj_sockaddr_in remote; pjsip_transport *tp; pj_sockaddr_in_init(&remote, NULL, 0); status = pjsip_tpmgr_acquire_transport(tpmgr, type, &remote, sizeof(remote), NULL, &tp); if (status == PJ_SUCCESS) { pj_strdup(pool, ip_addr, &tp->local_name.host); *port = tp->local_name.port; status = PJ_SUCCESS;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -