📄 tport.c
字号:
su_timer_set(t, tport_tick, mr);}/** Flush idle connections. */int tport_flush(tport_t *tp){ tport_t *tp_next; if (tp == NULL) return -1; /* Go through all secondary transports, zap idle ones */ for (tp = tprb_first(tp->tp_pri->pri_secondary); tp; tp = tp_next) { tp_next = tprb_succ(tp); if (tp->tp_refs != 0) continue; SU_DEBUG_1(("tport_flush(%p): %szapping\n", tp, tport_is_closed(tp) ? "" : "closing and ")); if (!tport_is_closed(tp)) tport_close(tp); tport_zap_secondary(tp); } return 0;}/**Convert sockaddr_t to a transport name. * * @retval 0 when successful * @retval -1 upon an error */int tport_convert_addr(su_home_t *home, tp_name_t *tpn, char const *protoname, char const *canon, su_sockaddr_t const *su){ tp_name_t name[1] = {{ NULL }}; char const *host; char buf[TPORT_HOSTPORTSIZE]; char port[8]; int canonlen = canon ? strlen(canon) : 0; if (su == NULL) host = "*"; else if (!SU_SOCKADDR_INADDR_ANY(su)) host = tport_hostport(buf, sizeof(buf), su, 0); else if (canonlen && su->su_family == AF_INET && strspn(canon, "0123456789.") == canonlen) host = canon;#if SU_HAVE_IN6 else if (canonlen && su->su_family == AF_INET6 && strspn(canon, "0123456789abcdefABCDEF:.") == canonlen) host = canon;#endif else host = localipname(su->su_family, buf, sizeof(buf)); if (host == NULL) return -1; if (su == NULL) strcpy(port, "*"); else snprintf(port, sizeof(port), "%u", ntohs(su->su_port)); name->tpn_proto = protoname; name->tpn_host = host; name->tpn_canon = canon ? canon : host; name->tpn_port = port; return tport_name_dup(home, tpn, name);}/** Set transport object name. @internal */staticint tport_setname(tport_t *self, char const *protoname, su_addrinfo_t const *ai, char const *canon){ su_addrinfo_t *selfai = self->tp_addrinfo; if (tport_convert_addr(self->tp_home, self->tp_name, protoname, canon, (su_sockaddr_t *)ai->ai_addr) < 0) return -1; if (tport_is_secondary(self)) self->tp_ident = self->tp_pri->pri_primary->tp_ident; selfai->ai_flags = ai->ai_flags & TP_AI_MASK; selfai->ai_family = ai->ai_family; selfai->ai_socktype = ai->ai_socktype; selfai->ai_protocol = ai->ai_protocol; selfai->ai_canonname = (char *)self->tp_name->tpn_canon; if (ai->ai_addr) { assert(ai->ai_family), assert(ai->ai_socktype), assert(ai->ai_protocol); memcpy(self->tp_addr, ai->ai_addr, selfai->ai_addrlen = ai->ai_addrlen); } return 0;}/**Resolve protocol name. * * Convert a protocol name to IP protocol number and socket type used by * su_getaddrinfo(). * * @param hints hints with the protocol number and socktype [OUT] * @param proto protocol name [IN] * @param flags hint flags */staticint getprotohints(su_addrinfo_t *hints, char const *proto, int flags){ memset(hints, 0, sizeof *hints); hints->ai_flags = flags; hints->ai_canonname = (char *)proto;#if HAVE_TLS if (strcasecmp(proto, "tls") == 0) proto = "tcp";#endif#if HAVE_SCTP if (strcasecmp(proto, "sctp") == 0) { hints->ai_protocol = IPPROTO_SCTP; hints->ai_socktype = SOCK_STREAM; return 0; }#endif if (strcasecmp(proto, "udp") == 0) { hints->ai_protocol = IPPROTO_UDP; hints->ai_socktype = SOCK_DGRAM; return 0; } if (strcasecmp(proto, "tcp") == 0) { hints->ai_protocol = IPPROTO_TCP; hints->ai_socktype = SOCK_STREAM; return 0; } return -1;}/** Get local IP. * * Get primary local IP address in URI format (IPv6 address will be * []-quoted). */staticchar *localipname(int pf, char *buf, int bufsiz){ su_localinfo_t *li = NULL, hints[1] = {{ LI_NUMERIC | LI_CANONNAME }}; int n, error; hints->li_family = pf;#if SU_HAVE_IN6 if (pf == AF_INET6) { /* Link-local addresses are not usable on IPv6 */ hints->li_scope = LI_SCOPE_GLOBAL | LI_SCOPE_SITE /* | LI_SCOPE_HOST */; }#endif if ((error = su_getlocalinfo(hints, &li))) {#if SU_HAVE_IN6 if (error == ELI_NOADDRESS && pf == AF_INET6) { hints->li_family = AF_INET; error = su_getlocalinfo(hints, &li); if (error == ELI_NOADDRESS) { hints->li_family = AF_INET6; hints->li_scope |= LI_SCOPE_HOST; error = su_getlocalinfo(hints, &li); } if (error == ELI_NOADDRESS) { hints->li_family = AF_INET; error = su_getlocalinfo(hints, &li); } }#endif if (error) { SU_DEBUG_1(("tport: su_getlocalinfo: %s\n", su_gli_strerror(error))); return NULL; } } assert(li); assert(li->li_canonname); n = strlen(li->li_canonname); if (li->li_family == AF_INET) { if (n >= bufsiz) return NULL; memcpy(buf, li->li_canonname, n + 1); } else { if (n + 2 >= bufsiz) return NULL; memcpy(buf + 1, li->li_canonname, n); buf[0] = '['; buf[++n] = ']'; buf[++n] = '\0'; } su_freelocalinfo(li); return buf;}/** Process errors from transport. */void tport_error_report(tport_t *self, int errcode, su_sockaddr_t const *addr){ char const *errmsg; if (errcode == 0) return; else if (errcode > 0) errmsg = su_strerror(errcode); else errcode = 0, errmsg = "stream closed"; if (addr && addr->su_family == AF_UNSPEC) addr = NULL; /* Mark this connection as unusable */ if (errcode > 0 && tport_is_connected(self)) self->tp_reusable = 0; if (addr == NULL && tport_is_connection_oriented(self)) addr = self->tp_addr; /* Report error */ if (addr && tport_pending_error(self, addr, errcode)) ; else if (self->tp_master->mr_tpac->tpac_error) { char *dstname = NULL; char hp[TPORT_HOSTPORTSIZE]; if (addr) dstname = tport_hostport(hp, sizeof hp, addr, 1); STACK_ERROR(self, errcode, dstname); } else { if (tport_is_primary(self)) SU_DEBUG_3(("%s(%p): %s (with %s)\n", __func__, self, errmsg, self->tp_protoname)); else SU_DEBUG_3(("%s(%p): %s (with %s/%s:%s)\n", __func__, self, errmsg, self->tp_protoname, self->tp_host, self->tp_port)); } /* Close connection */ if (!self->tp_closed && errcode > 0 && tport_is_connected(self)) tport_close(self);}/** Accept a new connection. * * The function tport_accept() accepts a new connection and creates a * secondary transport object for the new socket. */int tport_accept(tport_primary_t *pri, int events){ tport_t *self; su_addrinfo_t ai[1]; su_sockaddr_t su[1]; socklen_t sulen = sizeof su; su_socket_t s = SOCKET_ERROR, l = pri->pri_primary->tp_socket; char const *reason = "accept"; if (events & SU_WAIT_ERR) tport_error_event(pri->pri_primary); if (!(events & SU_WAIT_ACCEPT)) return 0; memcpy(ai, pri->pri_primary->tp_addrinfo, sizeof ai); ai->ai_canonname = NULL; s = accept(l, &su->su_sa, &sulen); if (s < 0) { tport_error_report(pri->pri_primary, su_errno(), NULL); return 0; } ai->ai_addr = &su->su_sa, ai->ai_addrlen = sulen; /* Alloc a new transport object, then register socket events with it */ self = tport_alloc_secondary(pri, s, 1, &reason); if (self) { int i; su_root_t *root = self->tp_master->mr_root; su_wakeup_f wakeup = tport_wakeup; int events = SU_WAIT_IN|SU_WAIT_ERR|SU_WAIT_HUP; su_wait_t wait[1] = { SU_WAIT_INIT }; SU_CANONIZE_SOCKADDR(su); if (/* Create wait object with appropriate events. */ su_wait_create(wait, s, events) != -1 /* Register socket to root */ && (i = su_root_register(root, wait, wakeup, self, 0)) != -1) { self->tp_index = i; self->tp_conn_orient = 1; self->tp_events = events; if (tport_setname(self, pri->pri_protoname, ai, NULL) != -1) { SU_DEBUG_5(("%s(%p): new connection from " TPN_FORMAT "\n", __func__, self, TPN_ARGS(self->tp_name))); tprb_append(&pri->pri_secondary, self); /* Return succesfully */ return 0; } } else su_wait_destroy(wait); /* Failure: shutdown socket, */ tport_close(self); tport_zap_secondary(self); } /* XXX - report error ? */ return 0;}/** Allocate a new message object */msg_t *tport_msg_alloc(tport_t const *self, unsigned size){ if (self) { tport_master_t *mr = self->tp_master; msg_t *msg = mr->mr_tpac->tpac_alloc(mr->mr_stack, mr->mr_log, NULL, size, self, NULL); if (msg) { su_addrinfo_t *mai = msg_addrinfo(msg); su_addrinfo_t const *tai = self->tp_addrinfo; mai->ai_family = tai->ai_family; mai->ai_protocol = tai->ai_protocol; mai->ai_socktype = tai->ai_socktype; } return msg; } else { return NULL; }}/** Process events for socket waiting to be connected */static int tport_connected(su_root_magic_t *magic, su_wait_t *w, tport_t *self){ int events = su_wait_events(w, self->tp_socket); tport_master_t *mr = self->tp_master; su_wait_t wait[1] = { SU_WAIT_INIT }; int error; SU_DEBUG_7(("tport_connected(%p): events%s%s\n", self, events & SU_WAIT_CONNECT ? " CONNECTED" : "", events & SU_WAIT_ERR ? " ERR" : ""));#if HAVE_POLL assert(w->fd == self->tp_socket);#endif if (events & SU_WAIT_ERR) tport_error_event(self); if (!(events & SU_WAIT_CONNECT) || self->tp_closed) { return 0; } error = su_soerror(self->tp_socket); if (error) { tport_error_report(self, error, NULL); return 0; } su_root_deregister(mr->mr_root, self->tp_index); self->tp_index = -1; self->tp_events = SU_WAIT_IN | SU_WAIT_ERR; if (su_wait_create(wait, self->tp_socket, self->tp_events) == -1 || (self->tp_index = su_root_register(mr->mr_root, wait, tport_wakeup, self, 0)) == -1) { tport_close(self); } else if (self->tp_queue && self->tp_queue[self->tp_qhead]) { tport_send_event(self); } return 0;}/** Process events for primary socket */static int tport_wakeup_pri(su_root_magic_t *m, su_wait_t *w, tport_t *self){ tport_primary_t *pri = self->tp_pri; int events = su_wait_events(w, self->tp_socket);#if HAVE_POLL assert(w->fd == self->tp_socket);#endif SU_DEBUG_7(("%s(%p): events%s%s%s%s%s%s\n", "tport_wakeup_pri", self, events & SU_WAIT_IN ? " IN" : "", SU_WAIT_ACCEPT != SU_WAIT_IN && (events & SU_WAIT_ACCEPT) ? " ACCEPT" : "", events & SU_WAIT_OUT ? " OUT" : "", events & SU_WAIT_HUP ? " HUP" : "", events & SU_WAIT_ERR ? " ERR" : "", self->tp_closed ? " (closed)" : "")); if (pri->pri_vtable->vtp_wakeup_pri) return pri->pri_vtable->vtp_wakeup_pri(pri, events); else return tport_base_wakeup(self, events);}/** Process events for connected socket */static int tport_wakeup(su_root_magic_t *magic, su_wait_t *w, tport_t *self){ int events = su_wait_events(w, self->tp_socket);#if HAVE_POLL assert(w->fd == self->tp_socket);#endif SU_DEBUG_7(("%s(%p): events%s%s%s%s%s\n", "tport_wakeup", self, events & SU_WAIT_IN ? " IN" : "", events & SU_WAIT_OUT ? " OUT" : "", events & SU_WAIT_HUP ? " HUP" : "", events & SU_WAIT_ERR ? " ERR" : "", self->tp_closed ? " (closed)" : "")); if (self->tp_pri->pri_vtable->vtp_wakeup) return self->tp_pri->pri_vtable->vtp_wakeup(self, events); else return tport_base_wakeup(self, events);}static int tport_base_wakeup(tport_t *self, int events){ int error = 0; if (events & SU_WAIT_ERR) error = tport_error_event(self); if ((events & SU_WAIT_OUT) && !self->tp_closed) tport_send_event(self); if ((events & SU_WAIT_IN) && !self->tp_closed) tport_recv_event(self); if ((events & SU_WAIT_HUP) && !self->tp_closed) tport_hup_event(self); if (error) tport_error_report(self, error, NULL); return 0;}/** Stop reading from socket until tport_continue() is called. */int tport_stall(tport_t *self){ return tport_set_events(self, 0, SU_WAIT_IN);}/** Continue reading from socket. */int tport_continue(tport_t *self){ if (self == NULL || self->tp_recv_close) return -1; return tport_set_events(self, SU_WAIT_IN, 0);}/** Process "hangup" event. * */void tport_hup_event(tport_t *self){ SU_DEBUG_7(("%s(%p)\n", __func__, self)); if (self->tp_msg) { su_time_t now = su_no
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -