📄 resolverserviceimpl.java
字号:
} } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING) && group.isRendezvous()) { LOG.warning("No srdi handler registered :" + handlerName + " for Group ID:" + group.getPeerGroupID()); } else if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No srdi handler registered :" + handlerName + " for Group ID:" + group.getPeerGroupID()); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.unknownHandlerForSrdiMessage(srcAddr, handlerName); } } } /** * Send a resolver message to a peer * * @param destPeer destination peer * @param route destination route advertisement * @param pName service name on the destination * @param pParam service param on the destination * @param tagName tag name of the message element * @param body the body of the message element * @param gzip If <code>true</code> then encode the message body using gzip. * @return {@code true} if successful */ private boolean sendMessage(String destPeer, RouteAdvertisement route, String pName, String pParam, String tagName, XMLDocument body, boolean gzip) { // Get the messenger ready ID dest; try { dest = IDFactory.fromURI(new URI(destPeer)); } catch (URISyntaxException badpeer) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "bad destination peerid : " + destPeer, badpeer); } return false; } EndpointAddress destAddress = mkAddress(dest, pName, pParam); // FIXME add route to responses as well Messenger messenger = null; if (route == null) { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("No route info available for " + destPeer); } } else { // ok we have a route let's pass it to the router if ((null == getRouteControl()) || (routeControl.addRoute(route) == RouteControl.FAILED)) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Failed to add route for " + route.getDestPeerID()); } } else { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("Added route for " + route.getDestPeerID()); } } } if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("Creating a messenger immediate for :" + destAddress); } messenger = endpoint.getMessengerImmediate(destAddress, route); if (null == messenger) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Failed creating messenger for " + destAddress); } return false; } Message msg = new Message(); try { MessageElement msgEl; if (gzip) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPOutputStream gos = new GZIPOutputStream(baos); body.sendToStream(gos); gos.finish(); gos.close(); byte gzipBytes[] = baos.toByteArray(); msgEl = new ByteArrayMessageElement(tagName, GZIP_MEDIA_TYPE, gzipBytes, null); } else { msgEl = new TextDocumentMessageElement(tagName, body, null); } msg.addMessageElement("jxta", msgEl); } catch (Exception ez1) { // Not much we can do if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed building message", ez1); } return false; } // Send the message if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " to " + destAddress + " " + tagName); } // XXX 20040924 bondolo Convert this to ListenerAdaptor messenger.sendMessage(msg, null, null, new FailureListener(dest)); return true; } private RouteControl getRouteControl() { // Obtain the route control object to manipulate route information when sending and receiving resolver queries. if (routeControl == null) { // insignificant race condition here MessageTransport endpointRouter = endpoint.getMessageTransport("jxta"); if (endpointRouter != null) { routeControl = (RouteControl) endpointRouter.transportControl(EndpointRouter.GET_ROUTE_CONTROL, null); } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Failed to get RouteControl object. Resolver will not set route hints."); } } } return routeControl; } /** * Inner class to handle incoming queries */ private class DemuxQuery implements EndpointListener { /** * {@inheritDoc} */ public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Demuxing a query message from " + srcAddr); } MessageElement element = message.getMessageElement("jxta", outQueName); if (element == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Message does not contain a query. Discarding message"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidQueryDiscarded(srcAddr); } return; } ResolverQueryMsg query; try { StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(element); query = new ResolverQuery(asDoc); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Ill formatted resolver query, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidQueryDiscarded(srcAddr); } return; } catch (IllegalArgumentException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Ill formatted resolver query, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidQueryDiscarded(srcAddr); } return; } int res = processQuery(query, srcAddr); if (ResolverService.Repropagate == res) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Repropagating query " + message + " from " + srcAddr); } repropagateQuery(message, query); } } } /** * Inner class to handle incoming responses */ private class DemuxResponse implements EndpointListener { /** * @inheritDoc */ public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Demuxing a response from " + srcAddr); } MessageElement element = message.getMessageElement("jxta", inQueName); if (null == element) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Message does not contain a response. Discarding message"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidResponseDiscarded(srcAddr); } return; } ResolverResponse resolverResponse; try { StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(element); resolverResponse = new ResolverResponse(asDoc); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Ill formatted resolver response, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidResponseDiscarded(srcAddr); } return; } catch (IllegalArgumentException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Ill formatted resolver response, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidResponseDiscarded(srcAddr); } return; } processResponse(resolverResponse, srcAddr); } } /** * Inner class to handle SRDI messages */ private class DemuxSrdi implements EndpointListener { /** * @inheritDoc */ public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Demuxing an SRDI message from : " + srcAddr); } MessageElement element = message.getMessageElement("jxta", srdiQueName); if (element == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Message does not contain a SRDI element. Discarding message"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidSrdiMessageDiscarded(srcAddr); } return; } ResolverSrdiMsgImpl srdimsg; try { if (element.getMimeType().getBaseMimeMediaType().equals(GZIP_MEDIA_TYPE)) { InputStream gzipStream = new GZIPInputStream(element.getStream()); StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, gzipStream); srdimsg = new ResolverSrdiMsgImpl(asDoc, membership); } else { StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(element); srdimsg = new ResolverSrdiMsgImpl(asDoc, membership); } } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Ill formatted SRDI message, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidSrdiMessageDiscarded(srcAddr); } return; } processSrdi(srdimsg, srcAddr); } } /** * Listener to find bad destinations and clean srdi tables for them. */ class FailureListener implements OutgoingMessageEventListener { final ID dest; FailureListener(ID dest) { this.dest = dest; } /** * {@inheritDoc} */ public void messageSendFailed(OutgoingMessageEvent event) { // Ignore the failure if it's a case of queue overflow. if (event.getFailure() == null) { return; } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Clearing SRDI tables for failed peer : " + dest); } for (Object o : Arrays.asList(srdiHandlers.values().toArray())) { SrdiHandler theHandler = (SrdiHandler) o; try { theHandler.messageSendFailed((PeerID) dest, event); } catch (Throwable all) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Uncaught Throwable from handler : " + theHandler, all); } } } } /** * {@inheritDoc} */ public void messageSendSucceeded(OutgoingMessageEvent event) {// great! } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -