📄 router.c
字号:
if(comp->tq != NULL) { log_debug(ZONE, "%s port %d is throttled, jqueueing packet", comp->ip, comp->port); jqueue_push(comp->tq, nad, 0); return; } /* packets go raw to normal components */ if(!comp->legacy) { sx_nad_write(comp->s, nad); return; } log_debug(ZONE, "packet for legacy component, munging"); attr = nad_find_attr(nad, 0, -1, "error", NULL); if(attr >= 0) { if(NAD_AVAL_L(nad, attr) == 3 && strncmp("400", NAD_AVAL(nad, attr), 3) == 0) stanza_error(nad, 1, stanza_err_BAD_REQUEST); else stanza_error(nad, 1, stanza_err_SERVICE_UNAVAILABLE); } sx_nad_write_elem(comp->s, nad, 1);}static void _router_route_log_sink(xht log_sinks, const char *key, void *val, void *arg) { component_t comp = (component_t) val; nad_t nad = (nad_t) arg; log_debug(ZONE, "copying route to '%s' (%s, port %d)", key, comp->ip, comp->port); _router_comp_write(comp, nad_copy(nad));}static void _router_process_route(component_t comp, nad_t nad) { int atype, ato, afrom; struct jid_st sto, sfrom; jid_t to = NULL, from = NULL; component_t target; if(nad_find_attr(nad, 0, -1, "error", NULL) >= 0) { log_debug(ZONE, "dropping error packet, trying to avoid loops"); nad_free(nad); return; } atype = nad_find_attr(nad, 0, -1, "type", NULL); ato = nad_find_attr(nad, 0, -1, "to", NULL); afrom = nad_find_attr(nad, 0, -1, "from", NULL); sto.pc = sfrom.pc = comp->r->pc; if(ato >= 0) to = jid_reset(&sto, NAD_AVAL(nad, ato), NAD_AVAL_L(nad, ato)); if(afrom >= 0) from = jid_reset(&sfrom, NAD_AVAL(nad, afrom), NAD_AVAL_L(nad, afrom)); /* unicast */ if(atype < 0) { if(to == NULL || from == NULL) { log_debug(ZONE, "unicast route with missing or invalid to or from, bouncing"); nad_set_attr(nad, 0, -1, "error", "400", 3); _router_comp_write(comp, nad); return; } log_debug(ZONE, "unicast route from %s to %s", from->domain, to->domain); /* check the from */ if(xhash_get(comp->routes, from->domain) == NULL) { log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to send a packet from '%s', but that name is not bound to this component", comp->ip, comp->port, from->domain); nad_set_attr(nad, 0, -1, "error", "401", 3); _router_comp_write(comp, nad); return; } /* find a target */ target = xhash_get(comp->r->routes, to->domain); if(target == NULL) { if(comp->r->default_route != NULL && strcmp(from->domain, comp->r->default_route) == 0) { log_debug(ZONE, "%s is unbound, bouncing", from->domain); nad_set_attr(nad, 0, -1, "error", "404", 3); _router_comp_write(comp, nad); return; } target = xhash_get(comp->r->routes, comp->r->default_route); } if(target == NULL) { log_debug(ZONE, "%s is unbound, and no default route, bouncing", to->domain); nad_set_attr(nad, 0, -1, "error", "404", 3); _router_comp_write(comp, nad); return; } /* copy to any log sinks */ if(xhash_count(comp->r->log_sinks) > 0) xhash_walk(comp->r->log_sinks, _router_route_log_sink, (void *) nad); /* push it out */ log_debug(ZONE, "writing route for '%s' to %s, port %d", to->domain, target->ip, target->port); _router_comp_write(target, nad); return; } /* broadcast */ if(NAD_AVAL_L(nad, atype) == 9 && strncmp("broadcast", NAD_AVAL(nad, atype), 9) == 0) { if(from == NULL) { log_debug(ZONE, "broadcast route with missing or invalid from, bouncing"); nad_set_attr(nad, 0, -1, "error", "400", 3); _router_comp_write(comp, nad); return; } log_debug(ZONE, "broadcast route from %s", from->domain); /* check the from */ if(xhash_get(comp->routes, from->domain) == NULL) { log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] tried to send a packet from '%s', but that name is not bound to this component", comp->ip, comp->port, from->domain); nad_set_attr(nad, 0, -1, "error", "401", 3); _router_comp_write(comp, nad); return; } /* loop the components and distribute */ if(xhash_iter_first(comp->r->components)) do { xhash_iter_get(comp->r->components, NULL, (void **) &target); if(target != comp) { log_debug(ZONE, "writing broadcast to %s, port %d", target->ip, target->port); _router_comp_write(target, nad_copy(nad)); } } while(xhash_iter_next(comp->r->components)); nad_free(nad); return; } log_debug(ZONE, "unknown route type '%.*s', dropping", NAD_AVAL_L(nad, atype), NAD_AVAL(nad, atype)); nad_free(nad);}static void _router_process_throttle(component_t comp, nad_t nad) { jqueue_t tq; nad_t pkt; if(comp->tq == NULL) { _router_comp_write(comp, nad); log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] throttling packets on request", comp->ip, comp->port); comp->tq = jqueue_new(); } else { log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] unthrottling packets on request", comp->ip, comp->port); tq = comp->tq; comp->tq = NULL; _router_comp_write(comp, nad); while((pkt = jqueue_pull(tq)) != NULL) _router_comp_write(comp, pkt); jqueue_free(tq); }}static int _router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) { component_t comp = (component_t) arg; sx_buf_t buf = (sx_buf_t) data; int rlen, len, attr, ns, sns; sx_error_t *sxe; nad_t nad; struct jid_st sto, sfrom; jid_t to, from; alias_t alias; switch(e) { case event_WANT_READ: log_debug(ZONE, "want read"); mio_read(comp->r->mio, comp->fd); break; case event_WANT_WRITE: log_debug(ZONE, "want write"); mio_write(comp->r->mio, comp->fd); break; case event_READ: log_debug(ZONE, "reading from %d", comp->fd); /* check rate limits */ if(comp->rate != NULL) { if(rate_check(comp->rate) == 0) { /* inform the app if we haven't already */ if(!comp->rate_log) { log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] is being byte rate limited", comp->ip, comp->port); comp->rate_log = 1; } log_debug(ZONE, "%d is throttled, delaying read", comp->fd); buf->len = 0; return 0; } /* find out how much we can have */ rlen = rate_left(comp->rate); if(rlen > buf->len) rlen = buf->len; } /* no limit, just read as much as we can */ else rlen = buf->len; /* do the read */ len = recv(comp->fd, buf->data, rlen, 0); /* update rate limits */ if(comp->rate != NULL && len > 0) { comp->rate_log = 0; rate_add(comp->rate, len); } if(len < 0) { if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) { buf->len = 0; return 0; } log_debug(ZONE, "read failed: %s", strerror(errno)); sx_kill(comp->s); return -1; } else if(len == 0) { /* they went away */ sx_kill(comp->s); return -1; } log_debug(ZONE, "read %d bytes", len); buf->len = len; return len; case event_WRITE: log_debug(ZONE, "writing to %d", comp->fd); len = send(comp->fd, buf->data, buf->len, 0); if(len >= 0) { log_debug(ZONE, "%d bytes written", len); return len; } if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) return 0; log_debug(ZONE, "write failed: %s", strerror(errno)); sx_kill(comp->s); return -1; case event_ERROR: sxe = (sx_error_t *) data; log_write(comp->r->log, LOG_NOTICE, "[%s, port=%d] error: %s (%s)", comp->ip, comp->port, sxe->generic, sxe->specific); break; case event_STREAM: /* legacy check */ if(s->ns == NULL || strcmp("jabber:component:accept", s->ns) != 0) return 0; /* component, old skool */ comp->legacy = 1; /* enabled? */ if(comp->r->local_secret == NULL) { sx_error(s, stream_err_INVALID_NAMESPACE, "support for legacy components not available"); /* !!! correct error? */ sx_close(s); return 0; } /* sanity */ if(s->req_to == NULL) { sx_error(s, stream_err_HOST_UNKNOWN, "no 'to' attribute on stream header"); sx_close(s); return 0; } break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -