📄 out.c
字号:
/* * jabberd - Jabber Open Source Server * Copyright (c) 2002 Jeremie Miller, Thomas Muldowney, * Ryan Eatmon, Robert Norris * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA */#include "s2s.h"/* * we handle packets going from the router to the world, and stuff * that comes in on connections we initiated. * * action points: * * out_packet(s2s, nad) - send this packet out * - extract to domain * - check internal resolver cache for ip/port * - if not found * - add packet to queue for this domain * - ask resolver for name * - DONE * - get dbconn for this ip/port * - if dbconn not found * - add packet to queue for this domain * - create new dbconn (key ip/port) * - initiate connect to ip/port * - DONE * - if conn in progress (tcp) * - add packet to queue for this domain * - DONE * - if dbconn state valid for this domain, or packet is dialback * - send packet * - DONE * - if dbconn state invalid for this domain * - bounce packet (502) * - DONE * - add packet to queue for this domain * - if dbconn state inprogress for this domain * - DONE * - out_dialback(dbconn, from, to) * * out_dialback(dbconn, from, to) - initiate dialback * - generate dbkey: sha1(secret+remote+stream id) * - send auth request: <result to='them' from='us'>dbkey</result> * - set dbconn state for this domain to inprogress * - DONE * * out_resolve(s2s, nad) - responses from resolver * - store ip/port/ttl in resolver cache * - flush domain queue -> out_packet(s2s, nad) * - for each packet in queue for this domain * - DONE * * event_STREAM - ip/port open * - get dbconn for this sx * - for each route handled by this conn, out_dialback(dbconn, from, to) * - DONE * * event_PACKET: <result from='them' to='us' type='xxx'/> - response to our auth request * - get dbconn for this sx * - if type valid * - set dbconn state for this domain to valid * - flush dbconn queue for this domain -> out_packet(s2s, pkt) * - DONE * - set dbconn state for this domain to invalid * - bounce dbconn queue for this domain (502) * - DONE * * event_PACKET: <verify from='them' to='us' id='123' type='xxx'/> - incoming stream authenticated * - get dbconn for given id * - if type is valid * - set dbconn state for this domain to valid * - send result: <result to='them' from='us' type='xxx'/> * - DONE *//* forward decls */static int _out_mio_callback(mio_t m, mio_action_t a, int fd, void *data, void *arg);static int _out_sx_callback(sx_t s, sx_event_t e, void *data, void *arg);static void _out_result(conn_t out, nad_t nad);static void _out_verify(conn_t out, nad_t nad);/** queue the packet */static void _out_packet_queue(s2s_t s2s, pkt_t pkt) { jqueue_t q = (jqueue_t) xhash_get(s2s->outq, pkt->to->domain); if(q == NULL) { log_debug(ZONE, "creating new out packet queue for %s", pkt->to->domain); q = jqueue_new(); xhash_put(s2s->outq, pstrdup(xhash_pool(s2s->outq), pkt->to->domain), (void *) q); } log_debug(ZONE, "queueing packet for %s", pkt->to->domain); jqueue_push(q, (void *) pkt, 0);}static void _out_dialback(conn_t out, char *rkey) { char *c, *dbkey; nad_t nad; int ns; c = strchr(rkey, '/'); *c = '\0'; c++; /* kick off the dialback */ dbkey = s2s_db_key(NULL, out->s2s->local_secret, c, out->s->id); nad = nad_new(out->s->nad_cache); /* request auth */ ns = nad_add_namespace(nad, "jabber:server:dialback", "db"); nad_append_elem(nad, ns, "result", 0); nad_append_attr(nad, -1, "from", rkey); nad_append_attr(nad, -1, "to", c); nad_append_cdata(nad, dbkey, strlen(dbkey), 1); c--; *c = '/'; log_debug(ZONE, "sending auth request for %s (key %s)", rkey, dbkey); /* off it goes */ sx_nad_write(out->s, nad); free(dbkey); /* we're in progress now */ xhash_put(out->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_INPROGRESS);}/** send a packet out */void out_packet(s2s_t s2s, pkt_t pkt) { int ns; dnscache_t dns; nad_t nad; char ipport[INET6_ADDRSTRLEN + 16], *rkey; conn_t out; conn_state_t state; /* check resolver cache for ip/port */ dns = xhash_get(s2s->dnscache, pkt->to->domain); if(dns == NULL) { /* new resolution */ log_debug(ZONE, "no dns for %s, preparing for resolution", pkt->to->domain); dns = (dnscache_t) malloc(sizeof(struct dnscache_st)); memset(dns, 0, sizeof(struct dnscache_st)); strcpy(dns->name, pkt->to->domain); xhash_put(s2s->dnscache, dns->name, (void *) dns);#if 0 /* this is good for testing */ dns->pending = 0; strcpy(dns->ip, "127.0.0.1"); dns->port = 3000; dns->expiry = time(NULL) + 99999999;#endif } /* resolution in progress */ if(dns->pending) { log_debug(ZONE, "pending resolution, queueing packet"); _out_packet_queue(s2s, pkt); return; } /* has it expired (this is 0 for new cache objects, so they're always expired */ if(time(NULL) > dns->expiry) { /* it has, queue the packet */ _out_packet_queue(s2s, pkt); /* resolution required */ log_debug(ZONE, "requesting resolution for %s", pkt->to->domain); nad = nad_new(s2s->router->nad_cache); ns = nad_add_namespace(nad, uri_COMPONENT, NULL); nad_append_elem(nad, ns, "route", 0); nad_append_attr(nad, -1, "from", s2s->id); nad_append_attr(nad, -1, "to", s2s->local_resolver); ns = nad_add_namespace(nad, uri_RESOLVER, NULL); nad_append_elem(nad, ns, "resolve", 1); nad_append_attr(nad, -1, "type", "query"); nad_append_attr(nad, -1, "name", pkt->to->domain); sx_nad_write(s2s->router, nad); dns->init_time = time(NULL); dns->pending = 1; return; } /* dns is valid */ strcpy(pkt->ip, dns->ip); pkt->port = dns->port; /* generate the ip/port pair, this is the hash key for the conn */ snprintf(ipport, INET6_ADDRSTRLEN + 16, "%s/%d", pkt->ip, pkt->port); out = (conn_t) xhash_get(s2s->out, ipport); /* new route key */ rkey = s2s_route_key(NULL, pkt->from->domain, pkt->to->domain); if(out == NULL) { _out_packet_queue(s2s, pkt); /* no conn, create one */ out = (conn_t) malloc(sizeof(struct conn_st)); memset(out, 0, sizeof(struct conn_st)); out->s2s = s2s; out->key = strdup(ipport); strcpy(out->ip, pkt->ip); out->port = pkt->port; out->states = xhash_new(101); out->routes = xhash_new(101); out->init_time = time(NULL); xhash_put(s2s->out, out->key, (void *) out); xhash_put(out->routes, pstrdup(xhash_pool(out->routes), rkey), (void *) 1); log_debug(ZONE, "initiating connection to %s", ipport); /* connect */ out->fd = mio_connect(s2s->mio, pkt->port, pkt->ip, _out_mio_callback, (void *) out); if (out->fd < 0) { log_write(out->s2s->log, LOG_NOTICE, "mio_connect error connecting to %s : %s (%d)", ipport, strerror(errno), errno); } else { log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] outgoing connection", out->fd, out->ip, out->port); } out->s = sx_new(s2s->sx_env, out->fd, _out_sx_callback, (void *) out);#ifdef HAVE_SSL /* Send a stream version of 1.0 if we can do STARTTLS */ if(out->s2s->sx_ssl != NULL && out->s2s->local_pemfile != NULL) { sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", pkt->to->domain, NULL, "1.0"); } else { sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", NULL, NULL, NULL); }#else sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", NULL, NULL, NULL);#endif free(rkey); return; } /* connection in progress */ if(!out->online) { log_debug(ZONE, "connection in progress, queueing packet"); _out_packet_queue(s2s, pkt); xhash_put(out->routes, pstrdup(xhash_pool(out->routes), rkey), (void *) 1); free(rkey); return; } /* connection state */ state = (conn_state_t) xhash_get(out->states, rkey); /* valid conns or dialback packets */ if(state == conn_VALID || pkt->db) { log_debug(ZONE, "writing packet for %s to outgoing conn %s", rkey, ipport); /* send it straight out */ if(pkt->db) sx_nad_write(out->s, pkt->nad); else sx_nad_write_elem(out->s, pkt->nad, 1); jid_free(pkt->from); jid_free(pkt->to); free(pkt); free(rkey); return; } /* invalid conns */ if(state == conn_INVALID) { log_debug(ZONE, "route %s is invalid, bouncing packet", rkey); /* error and bounce */ stanza_error(pkt->nad, 1, stanza_err_SERVICE_UNAVAILABLE); stanza_tofrom(pkt->nad, 1); stanza_tofrom(pkt->nad, 0); nad_set_attr(pkt->nad, 0, -1, "from", s2s->id, 0); sx_nad_write(s2s->router, pkt->nad); jid_free(pkt->from); jid_free(pkt->to); free(pkt); free(rkey); return; } /* can't be handled yet, queue */ _out_packet_queue(s2s, pkt); /* if dialback is in progress, then we're done for now */ if(state == conn_INPROGRESS) { free(rkey); return; } /* go */ _out_dialback(out, rkey); free(rkey);}/** responses from the resolver */void out_resolve(s2s_t s2s, nad_t nad) { int attr, port = 0, ttl = 0, npkt, i; jid_t name; char ip[INET6_ADDRSTRLEN], str[16]; dnscache_t dns; jqueue_t q; pkt_t pkt; attr = nad_find_attr(nad, 1, -1, "name", NULL); name = jid_new(s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr)); /* no results, resolve failed */ if(nad->ecur == 2) { dns = xhash_get(s2s->dnscache, name->domain); free(dns); xhash_zap(s2s->dnscache, name->domain); /* bounce queue */ q = xhash_get(s2s->outq, name->domain); while((pkt = jqueue_pull(q)) != NULL) { if(pkt->nad->ecur > 1 && NAD_NURI_L(pkt->nad, NAD_ENS(pkt->nad, 1)) == strlen(uri_CLIENT) && strncmp(NAD_NURI(pkt->nad, NAD_ENS(pkt->nad, 1)), uri_CLIENT, strlen(uri_CLIENT)) == 0) sx_nad_write(s2s->router, stanza_tofrom(stanza_tofrom(stanza_error(pkt->nad, 1, stanza_err_REMOTE_SERVER_TIMEOUT), 1), 0)); else nad_free(pkt->nad); jid_free(pkt->to); jid_free(pkt->from); free(pkt); } jid_free(name); nad_free(nad); return; } snprintf(ip, INET6_ADDRSTRLEN, "%.*s", NAD_CDATA_L(nad, 2), NAD_CDATA(nad, 2)); attr = nad_find_attr(nad, 2, -1, "port", NULL); if(attr >= 0) { snprintf(str, 16, "%.*s", NAD_AVAL_L(nad, attr), NAD_AVAL(nad, attr)); port = atoi(str); } if(port == 0) port = 5269; attr = nad_find_attr(nad, 2, -1, "ttl", NULL); if(attr >= 0) { snprintf(str, 16, "%.*s", NAD_AVAL_L(nad, attr), NAD_AVAL(nad, attr)); ttl = atoi(str); } log_debug(ZONE, "%s resolved to %s, port %d, ttl %d", name->domain, ip, port, ttl); /* get the cache entry */ dns = xhash_get(s2s->dnscache, name->domain); if(dns == NULL) { log_debug(ZONE, "weird, we never requested this"); jid_free(name); nad_free(nad); return; } /* fill it out */ strcpy(dns->ip, ip); dns->port = port; dns->expiry = time(NULL) + ttl; dns->pending = 0; q = (jqueue_t) xhash_get(s2s->outq, name->domain); npkt = jqueue_size(q); if(q == NULL || npkt == 0) { /* weird */ log_debug(ZONE, "nonexistent or empty queue for domain, we're done"); jid_free(name); nad_free(nad); return; } log_debug(ZONE, "flushing %d packets to out_packet", npkt);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -