📄 out.c
字号:
for(i = 0; i < npkt; i++) { pkt = jqueue_pull(q); out_packet(s2s, pkt); } jid_free(name); nad_free(nad);}/** mio callback for outgoing conns */static int _out_mio_callback(mio_t m, mio_action_t a, int fd, void *data, void *arg) { conn_t out = (conn_t) arg; char ipport[INET6_ADDRSTRLEN + 17]; int nbytes; switch(a) { case action_READ: log_debug(ZONE, "read action on fd %d", fd); /* they did something */ out->last_activity = time(NULL); ioctl(fd, FIONREAD, &nbytes); if(nbytes == 0) { sx_kill(out->s); return 0; } return sx_can_read(out->s); case action_WRITE: log_debug(ZONE, "write action on fd %d", fd); return sx_can_write(out->s); case action_CLOSE: log_debug(ZONE, "close action on fd %d", fd); /* !!! logging */ /* !!! bounce queues */ jqueue_push(out->s2s->dead, (void *) out->s, 0); /* generate the ip/port pair */ snprintf(ipport, INET6_ADDRSTRLEN + 16, "%s/%d", out->ip, out->port); log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] disconnect", fd, out->ip, out->port); xhash_zap(out->s2s->out, ipport); xhash_free(out->states); xhash_free(out->routes); free(out->key); free(out); case action_ACCEPT: break; } return 0;}void send_dialbacks(conn_t out){ char *rkey; if (xhash_iter_first(out->routes)) { log_debug(ZONE, "sending dialback packets for %s", out->key); do { xhash_iter_get(out->routes, (const char **) &rkey, NULL); _out_dialback(out, rkey); } while(xhash_iter_next(out->routes)); } return;}static int _out_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) { conn_t out = (conn_t) arg; sx_buf_t buf = (sx_buf_t) data; int len, ns, elem, starttls = 0; sx_error_t *sxe; nad_t nad; switch(e) { case event_WANT_READ: log_debug(ZONE, "want read"); mio_read(out->s2s->mio, out->fd); break; case event_WANT_WRITE: log_debug(ZONE, "want write"); mio_write(out->s2s->mio, out->fd); break; case event_READ: log_debug(ZONE, "reading from %d", out->fd); /* do the read */ len = recv(out->fd, buf->data, buf->len, 0); if(len < 0) { if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) { buf->len = 0; return 0; } log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] read error: %s (%d)", out->fd, out->ip, out->port, strerror(errno), errno); sx_kill(s); return -1; } else if(len == 0) { /* they went away */ sx_kill(s); return -1; } log_debug(ZONE, "read %d bytes", len); buf->len = len; return len; case event_WRITE: log_debug(ZONE, "writing to %d", out->fd); len = send(out->fd, buf->data, buf->len, 0); if(len >= 0) { log_debug(ZONE, "%d bytes written", len); return len; } if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) return 0; log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] write error: %s (%d)", out->fd, out->ip, out->port, strerror(errno), errno); sx_kill(s); return -1; case event_ERROR: sxe = (sx_error_t *) data; log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] error: %s (%s)", out->fd, out->ip, out->port, sxe->generic, sxe->specific); break; case event_OPEN: log_debug(ZONE, "OPEN event for %s", out->key); break; case event_STREAM: /* check stream version - NULl = pre-xmpp (some jabber1 servers) */ log_debug(ZONE, "STREAM event for %s stream version is %s", out->key, out->s->res_version); /* first time, bring them online */ if(!out->online) { log_debug(ZONE, "outgoing conn to %s is online", out->key); /* if no stream version from either side, kick off dialback for each route, */ /* otherwise wait for stream features */ if ((out->s->res_version==NULL) || (out->s2s->sx_ssl == NULL)) { log_debug(ZONE, "no stream version, sending dialbacks for %s immediately", out->key); out->online = 1; send_dialbacks(out); } else log_debug(ZONE, "outgoing conn to %s - waiting for STREAM features", out->key); } break; case event_PACKET: nad = (nad_t) data; /* watch for the features packet - STARTTLS and/or SASL*/ if ((out->s->res_version!=NULL) && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_STREAMS) && strncmp(uri_STREAMS, NAD_NURI(nad, NAD_ENS(nad, 0)), strlen(uri_STREAMS)) == 0 && NAD_ENAME_L(nad, 0) == 8 && strncmp("features", NAD_ENAME(nad, 0), 8) == 0) { log_debug(ZONE, "got the stream features packet");#ifdef HAVE_SSL /* starttls if we can */ if(out->s2s->sx_ssl != NULL && out->s2s->local_pemfile != NULL && s->ssf == 0) { ns = nad_find_scoped_namespace(nad, uri_TLS, NULL); if(ns >= 0) { elem = nad_find_elem(nad, 0, ns, "starttls", 1); if(elem >= 0) { log_debug(ZONE, "got STARTTLS in stream features"); if(sx_ssl_client_starttls(out->s2s->sx_ssl, s, out->s2s->local_pemfile) == 0) { starttls = 1; nad_free(nad); return 0; } log_write(out->s2s->log, LOG_ERR, "unable to establish encrypted session with peer"); } } } /* If we're not establishing a starttls connection, send dialbacks */ if (!starttls) { log_debug(ZONE, "No STARTTLS, sending dialbacks for %s", out->key); out->online = 1; send_dialbacks(out); }#else out->online = 1; send_dialbacks(out);#endif } /* we only accept dialback packets */ if(NAD_ENS(nad, 0) < 0 || NAD_NURI_L(nad, NAD_ENS(nad, 0)) != 22 || strncmp("jabber:server:dialback", NAD_NURI(nad, NAD_ENS(nad, 0)), 22) != 0) { log_debug(ZONE, "got a non-dialback packet on an outgoing conn, dropping it"); nad_free(nad); return 0; } /* and then only result and verify */ if(NAD_ENAME_L(nad, 0) == 6) { if(strncmp("result", NAD_ENAME(nad, 0), 6) == 0) { _out_result(out, nad); return 0; } if(strncmp("verify", NAD_ENAME(nad, 0), 6) == 0) { _out_verify(out, nad); return 0; } } log_debug(ZONE, "unknown dialback packet, dropping it"); nad_free(nad); return 0; case event_CLOSED: mio_close(out->s2s->mio, out->fd); break; } return 0;}/** process incoming auth responses */static void _out_result(conn_t out, nad_t nad) { int attr; jid_t from, to; char *rkey, *c; jqueue_t q; int npkt, i; pkt_t pkt; attr = nad_find_attr(nad, 0, -1, "from", NULL); if(attr < 0 || (from = jid_new(out->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { log_debug(ZONE, "missing or invalid from on db result packet"); nad_free(nad); return; } attr = nad_find_attr(nad, 0, -1, "to", NULL); if(attr < 0 || (to = jid_new(out->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { log_debug(ZONE, "missing or invalid to on db result packet"); jid_free(from); nad_free(nad); return; } rkey = s2s_route_key(NULL, to->domain, from->domain); /* key is valid */ if(nad_find_attr(nad, 0, -1, "type", "valid") >= 0) { log_write(out->s2s->log, LOG_NOTICE, "outgoing route '%s' is now valid; destination=%s, port %d%s", rkey, out->ip, out->port, out->s->ssf ? ", SSL negotiated" : ""); xhash_put(out->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_VALID); /* !!! small leak here */ log_debug(ZONE, "%s valid, flushing queue", rkey); /* to domain */ c = strchr(rkey, '/'); c++; /* flush the queue */ q = (jqueue_t) xhash_get(out->s2s->outq, c); if(q == NULL || (npkt = jqueue_size(q)) == 0) { /* weird */ log_debug(ZONE, "nonexistent or empty queue for domain, we're done"); free(rkey); jid_free(from); jid_free(to); nad_free(nad); return; } log_debug(ZONE, "flushing %d packets to out_packet", npkt); for(i = 0; i < npkt; i++) { pkt = jqueue_pull(q); out_packet(out->s2s, pkt); } free(rkey); jid_free(from); jid_free(to); nad_free(nad); return; } /* invalid */ log_write(out->s2s->log, LOG_NOTICE, "outgoing route '%s' is now invalid; destination=%s, port %d", rkey, out->ip, out->port); xhash_put(out->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_INVALID); /* !!! small leak here */ log_debug(ZONE, "%s invalid, flushing queue", rkey); /* to domain */ c = strchr(rkey, '/'); c++; /* flush the queue */ q = (jqueue_t) xhash_get(out->s2s->outq, c); npkt = jqueue_size(q); if(q == NULL || npkt == 0) { /* weird */ log_debug(ZONE, "nonexistent or empty queue for domain, we're done"); free(rkey); jid_free(from); jid_free(to); nad_free(nad); return; } log_debug(ZONE, "flushing %d packets to out_packet", npkt); for(i = 0; i < npkt; i++) { pkt = jqueue_pull(q); out_packet(out->s2s, pkt); } free(rkey); jid_free(from); jid_free(to); nad_free(nad);}/** incoming stream authenticated */static void _out_verify(conn_t out, nad_t nad) { int attr, ns; jid_t from, to; conn_t in; char *rkey, *type; attr = nad_find_attr(nad, 0, -1, "from", NULL); if(attr < 0 || (from = jid_new(out->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { log_debug(ZONE, "missing or invalid from on db verify packet"); nad_free(nad); return; } attr = nad_find_attr(nad, 0, -1, "to", NULL); if(attr < 0 || (to = jid_new(out->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { log_debug(ZONE, "missing or invalid to on db verify packet"); jid_free(from); nad_free(nad); return; } attr = nad_find_attr(nad, 0, -1, "id", NULL); if(attr < 0) { log_debug(ZONE, "missing id on db verify packet"); jid_free(from); jid_free(to); nad_free(nad); return; } /* get the incoming conn */ in = xhash_getx(out->s2s->in, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr)); if(in == NULL) { log_debug(ZONE, "got a verify for incoming conn %.*s, but it doesn't exist, dropping the packet", NAD_AVAL_L(nad, attr), NAD_AVAL(nad, attr)); jid_free(from); jid_free(to); nad_free(nad); return; } rkey = s2s_route_key(NULL, to->domain, from->domain); attr = nad_find_attr(nad, 0, -1, "type", "valid"); if(attr >= 0) { xhash_put(in->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_VALID); log_write(in->s2s->log, LOG_NOTICE, "incoming route '%s' is now valid; source=%s, port %d%s", rkey, in->ip, in->port, in->s->ssf ? ", SSL negotiated" : ""); type = "valid"; } else { xhash_put(in->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_INVALID); log_write(in->s2s->log, LOG_NOTICE, "incoming route '%s' is now invalid; source=%s, port %d", rkey, in->ip, in->port); type = "invalid"; } free(rkey); nad_free(nad); /* let them know what happened */ nad = nad_new(in->s->nad_cache); ns = nad_add_namespace(nad, "jabber:server:dialback", "db"); nad_append_elem(nad, ns, "result", 0); nad_append_attr(nad, -1, "to", from->domain); nad_append_attr(nad, -1, "from", to->domain); nad_append_attr(nad, -1, "type", type); /* off it goes */ sx_nad_write(in->s, nad); jid_free(from); jid_free(to);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -