⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointserviceimpl.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
                }                if (null == filtered) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("   message " + myMsg + " discarded upon filter decision");                    }                    if (EndpointMeterBuildSettings.ENDPOINT_METERING) {                        metrics.numFilteredOut++;                    }                    break;                }                propagater.propagate(filtered.clone(), serviceName, serviceParam, initialTTL);                if (EndpointMeterBuildSettings.ENDPOINT_METERING) {                    metrics.numPropagatedTo++;                }            } catch (Exception e) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "Failed propagating message " + filtered + " on message transport " + aTransport, e);                }                if (EndpointMeterBuildSettings.ENDPOINT_METERING) {                    metrics.numErrorsPropagated++;                }            }        }    }    /**     * {@inheritDoc}     */    public void propagate(Message msg, String serviceName, String serviceParam) {        propagate(msg, serviceName, serviceParam, Integer.MAX_VALUE);    }    /**     * {@inheritDoc}     */    public void propagate(Message msg, String serviceName, String serviceParam, int initialTTL) {        long startPropagationTime = 0;        if (null == serviceName) {            throw new IllegalArgumentException("serviceName may not be null");        }        Metrics metrics = null;        if (EndpointMeterBuildSettings.ENDPOINT_METERING) {            metrics = new Metrics();        }        // Keep the orig unchanged for metering reference and caller's benefit, but        // we are forced to clone it here, because we add a header.        msg = msg.clone();        if (EndpointMeterBuildSettings.ENDPOINT_METERING) {            startPropagationTime = System.currentTimeMillis();        }        // Add our header.        MessageElement srcHdrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME, localPeerId, null);        msg.replaceMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, srcHdrElement);        // Do the local transports with the plain address.        Iterator<MessageTransport> eachProto = getAllLocalTransports();        propagateThroughAll(eachProto, msg.clone(), serviceName, serviceParam, initialTTL, metrics);        // Do the parent transports with a mangled address.        if (parentEndpoint != null) {            eachProto = parentEndpoint.getAllMessageTransports();            StringBuilder mangled = new StringBuilder(serviceName);            if (null != serviceParam) {                mangled.append('/');                mangled.append(serviceParam);            }            propagateThroughAll(eachProto, msg.clone(), myServiceName, mangled.toString(), initialTTL, metrics);        }        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {            PropagationMeter propagationMeter = endpointServiceMonitor.getPropagationMeter(serviceName, serviceParam);            propagationMeter.registerPropagateMessageStats(metrics.numPropagatedTo, metrics.numFilteredOut, metrics.numErrorsPropagated,                    System.currentTimeMillis() - startPropagationTime);        }    }    /**     * Process the filters for this message.     */    private Message processFilters(Message message, EndpointAddress srcAddress, EndpointAddress dstAddress, boolean incoming) {        Iterator<FilterListenerAndMask> eachFilter = incoming                ? incomingFilterListeners.iterator()                : outgoingFilterListeners.iterator();        while (eachFilter.hasNext()) {            FilterListenerAndMask aFilter = eachFilter.next();            Message.ElementIterator eachElement = message.getMessageElements();            while (eachElement.hasNext()) {                MessageElement anElement = eachElement.next();                if ((null != aFilter.namespace) && (!aFilter.namespace.equals(eachElement.getNamespace()))) {                    continue;                }                if ((null != aFilter.name) && (!aFilter.name.equals(anElement.getElementName()))) {                    continue;                }                message = aFilter.listener.filterMessage(message, srcAddress, dstAddress);                if (null == message) {                    return null;                }            }        }        // If we got here, no filter has rejected the message. Keep processing it.        return message;    }    private static EndpointAddress demangleAddress(EndpointAddress mangled) {        String serviceName = mangled.getServiceName();        if (null == serviceName) {            // not a mangled address            return mangled;        }        if (!serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {            // not a mangled address            return mangled;        }        String serviceParam = mangled.getServiceParameter();        if (null == serviceParam) {            // it has no param, its a null destination.            // XXX bondolo 20050907 I'm not sure this is correct.            return new EndpointAddress(mangled, null, null);        }        int slashAt = serviceParam.indexOf('/');        if (-1 == slashAt) {            // param has no param portion.            return new EndpointAddress(mangled, serviceParam, null);        }        return new EndpointAddress(mangled, serviceParam.substring(0, slashAt), serviceParam.substring(slashAt + 1));    }    /**     * {@inheritDoc}     */    public void processIncomingMessage(Message msg, EndpointAddress srcAddress, EndpointAddress dstAddress) {        // check for propagate loopback.        MessageElement srcPeerElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME);        if (null != srcPeerElement) {            msg.removeMessageElement(srcPeerElement);            String srcPeer = srcPeerElement.toString();            if (localPeerId.equals(srcPeer)) {                // This is a loopback. Discard.                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine(msg + " is a propagate loopback. Discarded");                }                if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                    endpointMeter.discardedLoopbackDemuxMessage();                }                return;            }        }        if (null == srcAddress) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("null src address, discarding message " + msg);            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.invalidIncomingMessage();            }            return;        }        if (null == dstAddress) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("null destination address, discarding message " + msg);            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.invalidIncomingMessage();            }            return;        }        // Decode the destination address.        // We want:        // 1 - a version of the address that does not have the grp redirection.        // 2 - a version of the serviceName that includes BOTH the group redirection and the original service name.        // 3 - the original service param; without the original service name stuck to it.        // So, basically we want the original serviceName part stuck to the group mangling, not stuck to the original        // serviceParam. We do that by cut/pasting from both the mangled and demangled versions of the address.        EndpointAddress demangledAddress = demangleAddress(dstAddress);        String decodedServiceName = demangledAddress.getServiceName();        String decodedServiceParam = demangledAddress.getServiceParameter();        if ((null == decodedServiceName) || (0 == decodedServiceName.length())) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("dest serviceName must not be null, discarding message " + msg);            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.invalidIncomingMessage();            }            return;        }        // Do filters for this message:        // FIXME - jice 20040417 : filters are likely broken, now. They do not see messages        // from xports in parent groups.  For those messages that are seen, demangled address seems to be the useful one.        msg = processFilters(msg, srcAddress, demangledAddress, true);        // If processFilters retuns null, the message is to be discarded.        if (msg == null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Message discarded during filter processing");            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.incomingMessageFilteredOut();            }            return;        }        // Now that we know the original service name is valid, finish building the decoded version.        if (demangledAddress != dstAddress) {            decodedServiceName = dstAddress.getServiceName() + "/" + decodedServiceName;        }        // Look up the listener        EndpointListener listener = getIncomingMessageListener(decodedServiceName, decodedServiceParam);        // No listener? oh well.        if (listener == null) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("No listener for \'" + dstAddress + "\' in group " +                        group + "\n\tdecodedServiceName :" +                        decodedServiceName + "\tdecodedServiceParam :" +                        decodedServiceParam);            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.noListenerForIncomingMessage();            }            return; // noone cares for this message        }        // call the listener        try {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                if (null != decodedServiceParam) {                    LOG.fine("Calling listener for \'" + decodedServiceName + "/" + decodedServiceParam + "\' with " + msg);                } else {                    LOG.fine("Calling listener for \'" + decodedServiceName + "\' with " + msg);                }            }            listener.processIncomingMessage(msg, srcAddress, demangledAddress);            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.incomingMessageSentToEndpointListener();            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.demuxMessageProcessed();            }        } catch (Throwable all) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, "Uncaught throwable from listener for " + dstAddress, all);            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.errorProcessingIncomingMessage();            }        }    }    /**     * {@inheritDoc}     */    public void demux(Message msg) {        // Get the message destination        MessageElement dstAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS,                EndpointServiceImpl.MESSAGE_DESTINATION_NAME);        if (null == dstAddressElement) {            // No destination address... Just discard            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning(msg + " has no destination address. Discarded");            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.noDestinationAddressForDemuxMessage();            }            return;        }        msg.removeMessageElement(dstAddressElement);        EndpointAddress dstAddress = new EndpointAddress(dstAddressElement.toString());        // Get the message source        MessageElement srcAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME);        if (null == srcAddressElement) {            // No src address... Just discard            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning(msg + " has no source address. Discarded");            }            if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.noSourceAddressForDemuxMessage();            }            return;        }        msg.removeMessageElement(srcAddressElement);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -