📄 tport.c
字号:
mr = su_home_clone(NULL, sizeof *mr); if (!mr) return NULL; SU_DEBUG_7(("%s(): %p\n", "tport_create", mr)); mr->mr_stack = stack; mr->mr_tpac = tpac; mr->mr_root = root; mr->mr_master->tp_master = mr; mr->mr_master->tp_params = tpp = mr->mr_params; mr->mr_master->tp_reusable = 1; tpp->tpp_mtu = UINT_MAX; tpp->tpp_thrprqsize = THRP_PENDING; tpp->tpp_qsize = TPORT_QUEUESIZE; tpp->tpp_sdwn_error = 1; tpp->tpp_idle = UINT_MAX; tpp->tpp_timeout = UINT_MAX; tpp->tpp_sigcomp_lifetime = UINT_MAX; tpn = mr->mr_master->tp_name; tpn->tpn_proto = "*"; tpn->tpn_host = "*"; tpn->tpn_canon = "*"; tpn->tpn_port = "*"; ta_start(ta, tag, value); tport_set_params(mr->mr_master, ta_tags(ta)); tport_open_log(mr, ta_args(ta)); tick = 5000; /* For testing, usually 30000 is enough */ if (tpp->tpp_idle < 4 * tick) tick = tpp->tpp_idle / 4; if (tpp->tpp_timeout < 4 * tick) tick = tpp->tpp_timeout / 4; if (tick < 200) tick = 200; mr->mr_timer = su_timer_create(su_root_task(root), tick); su_timer_set(mr->mr_timer, tport_tick, mr); ta_end(ta); return mr->mr_master;}/** Destroy the master transport. */void tport_destroy(tport_t *self){ tport_master_t *mr; static tp_stack_class_t tport_destroy_tpac[1] = {{ sizeof tport_destroy_tpac, /* tpac_recv */ tport_destroy_recv, /* tpac_error */ tport_destroy_error, /* tpac_alloc */ tport_destroy_alloc, /* tpac_comp_accept */ NULL }}; SU_DEBUG_7(("%s(%p)\n", __func__, self)); if (self == NULL) return; assert(tport_is_master(self)); if (!tport_is_master(self)) return; mr = (tport_master_t *)self; mr->mr_tpac = tport_destroy_tpac; while (mr->mr_primaries) tport_zap_primary(mr->mr_primaries);#if HAVE_SIGCOMP if (mr->mr_compartment) sigcomp_compartment_unref(mr->mr_compartment), mr->mr_compartment = NULL;#endif if (mr->mr_dump_file) fclose(mr->mr_dump_file), mr->mr_dump_file = NULL; if (mr->mr_timer) su_timer_destroy(mr->mr_timer), mr->mr_timer = NULL; su_home_zap(mr->mr_home);}/** Allocate a primary transport */static tport_primary_t *tport_alloc_primary(tport_master_t *mr){ tport_primary_t *pri, **next; for (next = &mr->mr_primaries; *next; next = &(*next)->pri_next) ; if ((pri = su_home_clone(mr->mr_home, sizeof (*pri)))) { tport_t *tp = pri->pri_primary; tp->tp_master = mr; tp->tp_pri = pri; tp->tp_socket = SOCKET_ERROR; tp->tp_magic = mr->mr_master->tp_magic; tp->tp_params = pri->pri_params; memcpy(tp->tp_params, mr->mr_params, sizeof (*tp->tp_params)); tp->tp_reusable = mr->mr_master->tp_reusable; SU_DEBUG_5(("%s(%p): new primary tport %p\n", __func__, mr, pri)); } *next = pri; return pri;}/**Create a primary transport object with socket. * * Creates a primary transport object with a server socket, and then * registers the socket with suitable events to the root. * * @param dad parent (master or primary) transport object * @param ai pointer to addrinfo structure * @param canon canonical name of node * @param protoname name of the protocol */statictport_primary_t *tport_listen(tport_master_t *mr, su_addrinfo_t const *ai, char const *canon, char const *protoname, int port, tagi_t *tags){ tport_primary_t *pri = NULL; su_socket_t s = SOCKET_ERROR; int index = 0, events = 0, nat_bound = 0; su_wakeup_f wakeup = NULL; su_wait_t wait[1] = { SU_WAIT_INIT }; su_sockaddr_t su[1]; socklen_t sulen = ai->ai_addrlen; int err; int errlevel = 3; char buf[TPORT_HOSTPORTSIZE]; /* Log an error, return error */#define TPORT_LISTEN_ERROR(errno, what) \ ((void)(err = errno, s != SOCKET_ERROR ? su_close(s) : 0, \ (SU_LOG_LEVEL >= errlevel ? \ su_llog(tport_log, errlevel, \ "%s(%p): %s(pf=%d %s/%s): %s\n", \ __func__, mr, #what, ai->ai_family, \ protoname, \ tport_hostport(buf, sizeof(buf), su, 2), \ su_strerror(err)) : (void)0), \ tport_zap_primary(pri), \ su_seterrno(err)), \ (void *)NULL) if (sulen > sizeof(su)) return NULL; memcpy(su, ai->ai_addr, sulen); if (port > 0) su->su_port = htons(port); /* Create a primary transport object for another transport. */ pri = tport_alloc_primary(mr); if (pri == NULL) return TPORT_LISTEN_ERROR(errno, tport_alloc_primary); pri->pri_family = ai->ai_family; pri->pri_socktype = ai->ai_socktype; pri->pri_protocol = ai->ai_protocol; if (tport_set_params(pri->pri_primary, TAG_NEXT(tags)) < 0) return TPORT_LISTEN_ERROR(su_errno(), tport_set_params); if (ai->ai_protocol == IPPROTO_SCTP) { if (pri->pri_params->tpp_mtu > TP_SCTP_MSG_MAX) pri->pri_params->tpp_mtu = TP_SCTP_MSG_MAX; } s = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);#if SU_HAVE_IN6 if (s == SOCKET_ERROR) { if (ai->ai_family == AF_INET6 && su_errno() == EAFNOSUPPORT) errlevel = 7; return TPORT_LISTEN_ERROR(su_errno(), socket); }#endif /* Passive open, do bind() (and listen() if connection-oriented). */#ifdef __linux__ /* Linux does not allow reusing TCP port while this one is open, so we can safely call su_setreuseaddr() before bind(). */ if (ai->ai_socktype == SOCK_STREAM || ai->ai_socktype == SOCK_SEQPACKET) su_setreuseaddr(s, 1);#endif nat_bound = tport_nat_traverse_nat(mr, su, ai, s); if (!nat_bound /* || !mr->mr_nat->stun_enabled */) { /* STUN has a problem or is not enabled */ if (bind(s, &su->su_sa, sulen) == SOCKET_ERROR) { if (su_errno() == EADDRINUSE) errlevel = 7; return TPORT_LISTEN_ERROR(su_errno(), bind); } if (getsockname(s, &su->su_sa, &sulen) == SOCKET_ERROR) return TPORT_LISTEN_ERROR(su_errno(), getsockname);#if defined (__linux__) && defined (SU_HAVE_IN6) if (ai->ai_family == AF_INET6) { if (SU_SOCKADDR_INADDR_ANY(su)) /* pri->pri_family2 = AF_INET */ ; else if (IN6_IS_ADDR_V4MAPPED(&su->su_sin6.sin6_addr) || IN6_IS_ADDR_V4COMPAT(&su->su_sin6.sin6_addr)) pri->pri_family = AF_INET; }#endif } if (ai->ai_socktype == SOCK_STREAM || ai->ai_socktype == SOCK_SEQPACKET) { /* Connection-oriented protocols listen and accept connections */ wakeup = tport_accept; /* accepting function will be registered */ events = SU_WAIT_ACCEPT; /* XXX - we should take this from the current tags */ if (listen(s, mr->mr_params->tpp_qsize) == SOCKET_ERROR) return TPORT_LISTEN_ERROR(su_errno(), listen);#if !defined(__linux__) /* Allow reusing TCP sockets * * On Solaris & BSD, call setreuseaddr() after bind in order to avoid * binding to a port owned by an existing server. */ su_setreuseaddr(s, 1);#endif } else { /* Connectionless protocols sendto() and recvfrom() messages */ wakeup = tport_recv; /* receiving function will be registered */ events = SU_WAIT_IN; if (ai->ai_protocol == IPPROTO_UDP) { unsigned rmem = 0, wmem = 0;#if HAVE_IP_RECVERR if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) { int const one = 1; if (setsockopt(s, SOL_IP, IP_RECVERR, &one, sizeof(one)) < 0) { if (ai->ai_family == AF_INET) return TPORT_LISTEN_ERROR(su_errno(), IP_RECVERR); } events |= SU_WAIT_ERR; }#endif#if HAVE_IPV6_RECVERR if (ai->ai_family == AF_INET6) { int const one = 1; if (setsockopt(s, SOL_IPV6, IPV6_RECVERR, &one, sizeof(one)) < 0) return TPORT_LISTEN_ERROR(su_errno(), IPV6_RECVERR); events |= SU_WAIT_ERR; }#endif tl_gets(tags, TPTAG_UDP_RMEM_REF(rmem), TPTAG_UDP_WMEM_REF(wmem), TAG_END()); if (rmem != 0 && setsockopt(s, SOL_SOCKET, SO_RCVBUF, (void *)&rmem, sizeof rmem) < 0) { SU_DEBUG_3(("setsockopt(SO_RCVBUF): %s\n", su_strerror(su_errno()))); } if (wmem != 0 && setsockopt(s, SOL_SOCKET, SO_SNDBUF, (void *)&wmem, sizeof wmem) < 0) { SU_DEBUG_3(("setsockopt(SO_SNDBUF): %s\n", su_strerror(su_errno()))); } } } if (su_wait_create(wait, s, events) == -1) return TPORT_LISTEN_ERROR(su_errno(), su_wait_create); /* Register receiving or accepting function with events specified above */ index = su_root_register(mr->mr_root, wait, wakeup, pri->pri_primary, 0); if (index == -1) return TPORT_LISTEN_ERROR(su_errno(), su_root_register); if (tport_setname(pri->pri_primary, protoname, su, canon) == -1) return TPORT_LISTEN_ERROR(su_errno(), tport_setname); pri->pri_primary->tp_socket = s; pri->pri_primary->tp_index = index; pri->pri_primary->tp_events = events; pri->pri_primary->tp_connected = 0; pri->pri_primary->tp_conn_orient = ai->ai_socktype != SOCK_DGRAM; pri->pri_primary->tp_addr[0] = su[0]; pri->pri_primary->tp_addrlen = sulen; if (nat_bound) { /* XXX - should set also the IP address in tp_addr? */ pri->pri_natted = 1; tport_nat_set_canon(pri->pri_primary, mr->mr_nat); } SU_DEBUG_5(("%s(%p): %s " TPN_FORMAT "\n", __func__, pri, "listening at", TPN_ARGS(pri->pri_primary->tp_name))); return pri;}/** Destroy a primary transport and its secondary transports. @internal */static void tport_zap_primary(tport_primary_t *pri){ tport_primary_t **prip; if (pri == NULL) return; assert(tport_is_primary(pri->pri_primary)); if (pri->pri_threadpool) { tport_kill_threadpool(pri); SU_DEBUG_3(("%s(%p): zapped threadpool\n", __func__, pri)); } while (pri->pri_secondary) tport_zap_secondary(pri->pri_secondary); tport_nat_finish(pri); /* We have just a single-linked list for primary transports */ for (prip = &pri->pri_master->mr_primaries; *prip != pri; prip = &(*prip)->pri_next) assert(*prip); *prip = pri->pri_next; tport_zap_secondary(pri->pri_primary);}/**Allocate a secondary transport. @internal * * The function tport_alloc_secondary() creates a secondary transport * object. The new transport initally shares parameters structure with the * original transport. * * @param dad primary transport * * @return * The function tport_alloc_seconary() returns a pointer to the newly * created transport, or NULL upon an error. */statictport_t *tport_alloc_secondary(tport_primary_t *pri){ tport_master_t *mr = pri->pri_master; tport_t *self; self = su_home_clone(mr->mr_home, sizeof *self); if (self) { SU_DEBUG_7(("%s(%p): new secondary tport %p\n", __func__, pri, self)); self->tp_refs = -1; /* Freshly allocated */ self->tp_master = mr; self->tp_pri = pri; self->tp_params = pri->pri_params; self->tp_reusable = pri->pri_primary->tp_reusable; self->tp_magic = pri->pri_primary->tp_magic; } return self;}/** Create a connected transport object with socket. * * The function tport_connect() creates a secondary transport with a * connected socket. It registers the socket with suitable events to the * root. * * @param pri primary transport object * @param ai pointer to addrinfo structure * @param canon canonical name of node * @param protoname name od the protocol */statictport_t *tport_connect(tport_primary_t *pri, su_addrinfo_t *ai, tp_name_t const *tpn){ tport_master_t *mr = pri->pri_master; tport_t *self = NULL; su_socket_t s = SOCKET_ERROR; int index = 0, err; su_wait_t wait[1] = { SU_WAIT_INIT }; su_wakeup_f wakeup = tport_recv; int events = SU_WAIT_IN | SU_WAIT_ERR; int errlevel = 3; char buf[TPORT_HOSTPORTSIZE]; if (ai == NULL || ai->ai_addrlen > sizeof (self->tp_addr)) return NULL; /* Log an error, return error */#define TPORT_CONNECT_ERROR(errno, what) \ return \ ((void)(err = errno, s != SOCKET_ERROR ? su_close(s) : 0, \ su_wait_destroy(wait), \ (SU_LOG_LEVEL >= errlevel ? \ su_llog(tport_log, errlevel, \ "%s(%p): %s(pf=%d %s/%s): %s\n", \ __func__, pri, #what, ai->ai_family, \ tpn->tpn_proto, \ tport_hostport(buf, sizeof(buf), \ (void *)ai->ai_addr, 2), \ su_strerror(err)) : (void)0), \ tport_zap_secondary(self), \ su_seterrno(err)), \ (void *)NULL) if ((self = tport_alloc_secondary(pri)) == NULL) TPORT_CONNECT_ERROR(errno, tport_alloc_secondary); s = su_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);#if SU_HAVE_IN6 if (s == SOCKET_ERROR) { if (ai->ai_family == AF_INET6 && su_errno() == EAFNOSUPPORT) errlevel = 7; TPORT_CONNECT_ERROR(su_errno(), socket); }#endif if (pri->pri_primary->tp_socket != SOCKET_ERROR) { su_sockaddr_t susa; socklen_t susalen = sizeof(susa); int pri_s = pri->pri_primary->tp_socket; if (getsockname(pri_s, &susa.su_sa, &susalen) < 0) { SU_DEBUG_3(("tport_connect: getsockname(): %s\n", su_strerror(su_errno()))); } else { susa.su_port = 0; if (bind(s, &susa.su_sa, susalen) < 0) { SU_DEBUG_3(("tport_connect: bind(local-ip): %s\n", su_strerror(su_errno()))); } } } if (/* !do_tls && */ su_setblocking(s, 0) < 0) /* asynchronous connect() */ TPORT_CONNECT_ERROR(su_errno(), su_setblocking); if (ai->ai_socktype == SOCK_STREAM) { int one = 1; if (setsockopt(s, SOL_TCP, TCP_NODELAY, (void *)&one, sizeof one) == -1) TPORT_CONNECT_ERROR(su_errno(), setsockopt(TCP_NODELAY)); } if (connect(s, ai->ai_addr, ai->ai_addrlen) == SOCKET_ERROR) { err = su_errno(); if (err != EINPROGRESS && err != EAGAIN && err != EWOULDBLOCK) TPORT_CONNECT_ERROR(err, connect); events = SU_WAIT_CONNECT | SU_WAIT_ERR; wakeup = tport_connected; } if (su_wait_create(wait, s, events) == -1) TPORT_CONNECT_ERROR(su_errno(), su_wait_create);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -