⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointserviceimpl.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
                                        if (EndpointMeterBuildSettings.ENDPOINT_METERING) {                        metrics.numFilteredOut++;                    }                    break;                }                                sender.propagate((Message) filtered.clone(), serviceName, serviceParam, null);                                if (EndpointMeterBuildSettings.ENDPOINT_METERING) {                    metrics.numPropagatedTo++;                }            } catch (Exception e) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Failed propagating message "+filtered+ " on message transport " + aTransport, e);                }                if (EndpointMeterBuildSettings.ENDPOINT_METERING) {                    metrics.numErrorsPropagated++;                }                continue;            }        }    }        /**     * {@inheritDoc}     **/    public void propagate(Message srcMsg, String serviceName, String serviceParam)    throws IOException {        long startPropagationTime = 0;                Metrics metrics = new Metrics();                // Keep the orig unchanged for metering reference and caller's benefit, but        // we are forced to clone it here, because we add a header.        Message myMsg = (Message) srcMsg.clone();                if (EndpointMeterBuildSettings.ENDPOINT_METERING) {            startPropagationTime = System.currentTimeMillis();        }                // Add our header.        MessageElement srcHdrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME, localPeerId, (MessageElement) null);                myMsg.replaceMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, srcHdrElement);                // Do the local transports with the plain address.        Iterator eachProto = getAllLocalTransports();                propagateThroughAll(eachProto, (Message) myMsg.clone(), serviceName, serviceParam, metrics);                // Do the parent transports with a mangled address.        if (parentEndpoint != null) {            eachProto = parentEndpoint.getAllMessageTransports();            //FIXME what happens when service name, and/or param are null            propagateThroughAll(eachProto, (Message) myMsg.clone(), myServiceName, serviceName + "/" + serviceParam, metrics);        }                if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {            PropagationMeter propagationMeter = endpointServiceMonitor.getPropagationMeter(serviceName, serviceParam);                        propagationMeter.registerPropagateMessageStats(metrics.numPropagatedTo, metrics.numFilteredOut, metrics.numErrorsPropagated,            System.currentTimeMillis() - startPropagationTime);        }    }        /**     *  Process the filters for this message.     **/    private Message processFilters(Message message,                                   EndpointAddress srcAddress,                                   EndpointAddress dstAddress,                                   boolean         incoming) {                Iterator eachFilter = incoming ? incomingFilterListeners.iterator() : outgoingFilterListeners.iterator();                while (eachFilter.hasNext()) {            FilterListenerAndMask aFilter = (FilterListenerAndMask) eachFilter.next();                        Message.ElementIterator eachElement = message.getMessageElements();                        while (eachElement.hasNext()) {                MessageElement anElement = (MessageElement) eachElement.next();                                if ((null != aFilter.namespace) && (!aFilter.namespace.equals(eachElement.getNamespace()))) {                    continue;                }                                if ((null != aFilter.name) && (!aFilter.name.equals(anElement.getElementName()))) {                    continue;                }                                message = aFilter.listener.filterMessage(message, srcAddress, dstAddress);                                if (null == message) {                    return null;                }            }        }                // If we got here, no filter has rejected the message. Keep processing it.        return message;    }        private static EndpointAddress demangleAddress(EndpointAddress mangled) {        String serviceName = mangled.getServiceName();                if (!serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {            // not a mangled address            return mangled;        }                String serviceParam = mangled.getServiceParameter();                if (null == serviceParam) {            // it has no param, its a null destination.            return new EndpointAddress(mangled, null, null);        }                int slashAt = serviceParam.indexOf('/');                if (-1 == slashAt) {            // param has no param portion.            return new EndpointAddress(mangled, serviceParam, null);        }                return new EndpointAddress(mangled, serviceParam.substring(0, slashAt), serviceParam.substring(slashAt + 1));    }        /**     * {@inheritDoc}     **/    public void processIncomingMessage(Message msg, EndpointAddress srcAddress, EndpointAddress dstAddress) {                // check for propagate loopback.        MessageElement srcPeerElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME);                if (null != srcPeerElement) {            msg.removeMessageElement(srcPeerElement);            String srcPeer = srcPeerElement.toString();                        if (localPeerId.equals(srcPeer)) {                // This is a loopback. Discard.                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug( msg + " is a propagate loopback. Discarded");                }                                if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                    endpointMeter.discardedLoopbackDemuxMessage();                }                                return;            }        }                if (null == dstAddress) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("null destination address, discarding message " + msg.toString());            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.invalidIncomingMessage();            }                        return;        }                // Decode the destination address.        // We want:        // 1 - a version of the address that does not have the grp redirection.        // 2 - a version of the serviceName that includes BOTH the group redirection and the original service name.        // 3 - the original service param; without the original service name stuck to it.        // So, basically we want the original serviceName part stuck to the group mangling, not stuck to the original        // serviceParam. We do that by cut/pasting from both the mangled and demangled versions of the address.                EndpointAddress demangledAddress = demangleAddress(dstAddress);        String decodedServiceName = demangledAddress.getServiceName();        String decodedServiceParam = demangledAddress.getServiceParameter();                // Do filters for this message:        // FIXME - jice 20040417 : filters are likely broken, now. They do not see messages        // from xports in parent groups.  For those messages that are seen, demangled address seems to be the useful one.        msg = processFilters(msg, srcAddress, demangledAddress, true);                // If processFilters retuns null, the message is to be discarded.        if (msg == null) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Message discarded during filter processing");            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.incomingMessageFilteredOut();            }                        return;        }                if ((null == decodedServiceName) || (0 == decodedServiceName.length())) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("dest serviceName must not be null, discarding message " + msg);            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.invalidIncomingMessage();            }                        return;        }                // Now that we know the original service name is valid, finish building the decoded version.        if (demangledAddress != dstAddress) {            decodedServiceName = dstAddress.getServiceName() + "/" + decodedServiceName;        }                // First, try the regular destination        EndpointListener h = null;                if (null != decodedServiceParam) {            h = (EndpointListener) incomingMessageListeners.get(decodedServiceName + "/" + decodedServiceParam);        }                // Didn't find it with param, maybe there is a generic listener for the service        if (h == null) {            h = (EndpointListener) incomingMessageListeners.get(decodedServiceName);        }                // Didn't find it still, try the compatibility name.        if (h == null) {            h = (EndpointListener) incomingMessageListeners.get(decodedServiceName + decodedServiceParam);        }                // Still no listener? oh well.        if (h == null) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn(                "No listener for '" + dstAddress + "' in group " + group + "\ndecodedServiceName :" + decodedServiceName                + "\tdecodedServiceParam :" + decodedServiceParam);            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.noListenerForIncomingMessage();            }                        return; // noone cares for this message        }                // call the listener                try {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Calling listener '" + dstAddress + "' for " + msg.toString());            }                        h.processIncomingMessage(msg, srcAddress, demangledAddress);                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.incomingMessageSentToEndpointListener();            }        } catch (Throwable all) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Uncaught throwable from listener for " + dstAddress, all);            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.errorProcessingIncomingMessage();            }        }    }        /**     * Handles the given incoming message by calling the listener specified     * by its destination as returned by the getDestAddress() method of the     * message.     *     * @param msg The message to be delivered.     */    public void demux(Message msg) {                // Get the message destination        MessageElement dstAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS,        EndpointServiceImpl.MESSAGE_DESTINATION_NAME);                if (null == dstAddressElement) {            // No destination address... Just discard            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn( msg + " has no destination address. Discarded");            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.noDestinationAddressForDemuxMessage();            }                        return;        }                msg.removeMessageElement(dstAddressElement);        EndpointAddress dstAddress = new EndpointAddress(dstAddressElement.toString());                // Get the message source        MessageElement srcAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME);                if (null == srcAddressElement) {            // No destination address... Just discard            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn( msg + " has no source address. Discarded");            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {                endpointMeter.noSourceAddressForDemuxMessage();            }                        return;        }        msg.removeMessageElement(srcAddressElement);        EndpointAddress msgScrAddress = new EndpointAddress(srcAddressElement.toString());                processIncomingMessage(msg, msgScrAddress, dstAddress);                if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {            endpointMeter.demuxMessageProcessed();        }    }        /**     * {@inheritDoc}     *     * @deprecated Try and get a messenger instead     **/    public boolean ping(EndpointAddress addr) {        return true;    }        /**     * {@inheritDoc}     **/    public MessengerEventListener addMessageTransport(MessageTransport transpt) {                synchronized (messageTransports) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -