📄 resolverserviceimpl.java
字号:
String handlerName = srdimsg.getHandlerName(); if (handlerName == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Missing handlername in response"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidSrdiMessageDiscarded(srcAddr); } return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Processing an SRDI msg for : " + handlerName + " in Group ID:"+myGroup.getPeerGroupID()); } SrdiHandler theHandler = getSrdiHandler(handlerName); if (theHandler != null) { SrdiHandlerMeter srdiHandlerMeter = null; try { long startTime = 0; if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverServiceMonitor != null)) { startTime = System.currentTimeMillis(); srdiHandlerMeter = resolverServiceMonitor.getSrdiHandlerMeter(handlerName); } theHandler.processSrdi(srdimsg); if (ResolverMeterBuildSettings.RESOLVER_METERING && (srdiHandlerMeter != null)) { srdiHandlerMeter.messageProcessed(srdimsg, System.currentTimeMillis() - startTime, srcAddr); } } catch (Throwable all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Uncaught Throwable from handler for: " + handlerName, all); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (srdiHandlerMeter != null)) { srdiHandlerMeter.errorWhileProcessing(srcAddr); } } } else { if (LOG.isEnabledFor(Level.WARN) && myGroup.isRendezvous()) { LOG.warn("No srdi handler registered :" + handlerName + " for Group ID:"+myGroup.getPeerGroupID()); } else if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No srdi handler registered :" + handlerName+ " for Group ID:"+myGroup.getPeerGroupID()); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.unknownHandlerForSrdiMessage(srcAddr, handlerName); } } } /** * Send a resolver message to a peer * * @param destPeer destination peer * @param pName service name on the destination * @param pParam service param on the destination * @param tagName tag name of the message element * @param response the body of the message element * @param gzip If <code>true</code> then encode the message body using gzip. */ private boolean sendMessage(String destPeer, String pName, String pParam, String tagName, XMLDocument response, boolean gzip) throws IOException { // Get the messenger ready ID dest; try { dest = IDFactory.fromURI(new URI(destPeer)); } catch (URISyntaxException badpeer) { IOException failure = new IOException("bad destination peerid"); failure.initCause(badpeer); throw failure; } EndpointAddress destAddress = mkAddress(dest, pName, pParam); // FIXME add route to reponses as well Messenger messenger = endpoint.getMessengerImmediate(destAddress, null); // Build the Message Message msg = new Message(); try { MessageElement msgEl; if (gzip) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPOutputStream gos = new GZIPOutputStream(baos); response.sendToStream(gos); gos.finish(); gos.close(); msgEl = new ByteArrayMessageElement(tagName, GZIP_MEDIA_TYPE, baos.toByteArray(), null); } else { msgEl = new TextDocumentMessageElement(tagName, response, null); } msg.addMessageElement("jxta", msgEl); } catch (Exception ez1) { // Not much we can do if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Failed building message", ez1); } return false; } // Send the message if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + " to " + destAddress + " " + tagName); } if (null != messenger) { // XXX 20040924 bondolo Convert this to ListenerAdaptor messenger.sendMessage(msg, null, null, new FailureListener(dest)); return true; } else { return false; } } /** * Inner class to handle incoming queries */ private class DemuxQuery implements EndpointListener { /** * {@inheritDoc} */ public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Demuxing a query message from " + srcAddr); } MessageElement element = message.getMessageElement("jxta", outQueName); if (element == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Message does not contain a query. Discarding message"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidQueryDiscarded(srcAddr); } return; } ResolverQueryMsg query; try { StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), element.getStream()); query = new ResolverQuery(asDoc); } catch (IOException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Ill formatted resolver query, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidQueryDiscarded(srcAddr); } return; } catch (IllegalArgumentException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Ill formatted resolver query, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidQueryDiscarded(srcAddr); } return; } int res = processQuery(query, srcAddr); if (ResolverService.Repropagate == res) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Repropagating query " + message + " from " + srcAddr); } repropagateQuery(message, query); } } } /** * Inner class to handle incoming responses */ private class DemuxResponse implements EndpointListener { /** * @inheritDoc */ public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Demuxing a response from " + srcAddr); } MessageElement element = (MessageElement) message.getMessageElement("jxta", inQueName); if (null == element) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Message does not contain a response. Discarding message"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidResponseDiscarded(srcAddr); } return; } ResolverResponse resp; try { StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), element.getStream()); resp = new ResolverResponse(asDoc); } catch (IOException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Ill formatted resolver response, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidResponseDiscarded(srcAddr); } return; } catch (IllegalArgumentException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Ill formatted resolver response, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidResponseDiscarded(srcAddr); } return; } processResponse(resp, srcAddr); } } /** * Inner class to handle SRDI messages */ private class DemuxSrdi implements EndpointListener { /** * @inheritDoc */ public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Demuxing an SRDI message from : " + srcAddr); } MessageElement element = (MessageElement) message.getMessageElement("jxta", srdiQueName); if (element == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Message does not contain a SRDI element. Discarding message"); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidSrdiMessageDiscarded(srcAddr); } return; } ResolverSrdiMsgImpl srdimsg = null; try { if (element.getMimeType().equals(GZIP_MEDIA_TYPE)) { InputStream gzipStream = new GZIPInputStream(element.getStream()); StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, gzipStream); srdimsg = new ResolverSrdiMsgImpl(asDoc, membership); } else { StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), element.getStream()); srdimsg = new ResolverSrdiMsgImpl(asDoc, membership); } } catch (IOException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Ill formatted SRDI message, ignoring.", e); } if (ResolverMeterBuildSettings.RESOLVER_METERING && (resolverMeter != null)) { resolverMeter.invalidSrdiMessageDiscarded(srcAddr); } return; } processSrdi(srdimsg, srcAddr); } } /** * Listener to find bad destinations and clean srdi tables for them. */ class FailureListener implements OutgoingMessageEventListener { final ID dest; FailureListener(ID dest) { this.dest = dest; } /** * {@inheritDoc} */ public void messageSendFailed(OutgoingMessageEvent event) { // Ignore the failure if it's a case of queue overflow. if (event.getFailure() == null) { return; } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Clearing SRDI tables for failed peer : " + dest); } Iterator it = Arrays.asList(srdiHandlers.values().toArray()).iterator(); while (it.hasNext()) { SrdiHandler theHandler = (SrdiHandler) it.next(); try { theHandler.messageSendFailed((PeerID) dest, event); } catch (Throwable all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Uncaught Throwable from handler : " + theHandler, all); } } } } /** * {@inheritDoc} */ public void messageSendSucceeded(OutgoingMessageEvent event) { // great! } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -