⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointserviceimpl.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
         * @param hint               route hint         * @param messengerMeter     the metering object if any         */        public CanonicalMessenger(int vmQueueSize, EndpointAddress destination, EndpointAddress logicalDestination, Object hint, OutboundMeter messengerMeter) {            super(group.getPeerGroupID(), destination, logicalDestination, vmQueueSize);            this.hint = hint;        }        /**         * close this canonical messenger.         */        @Override        public void close() {            // No way. Not form the outside.        }        /**         * Drop the current messenger.         */        @Override        protected void closeImpl() {            if (cachedMessenger != null) {                cachedMessenger.close();                cachedMessenger = null;            } else {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.severe("Internal messenger error: close requested while not connected.");                }            }        }        /**         * Get a transport messenger to the destination.         * <p/>         * FIXME 20040413 jice : Do better hint management.         */        @Override        protected boolean connectImpl() {            if (cachedMessenger != null) {                if ((cachedMessenger.getState() & Messenger.TERMINAL) != 0) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.SEVERE)) {                        LOG.fine("Closing TERMINAL internal messenger : attempting requested connect.");                    }                    cachedMessenger.close();                    cachedMessenger = null;                } else {                    return true;                }            }            // Consume the hint, if any.            Object theHint = hint;            hint = null;            cachedMessenger = getLocalTransportMessenger(getDestinationAddress(), theHint);            if (cachedMessenger == null) {                return false;            }            // FIXME 20040413 jice : it's not too clean: we assume            // that all transports use BlockingMessenger as the base class for            // their messengers. If they don't we can't force them to hold the            // strong reference to the canonical messenger.            try {                ((BlockingMessenger) cachedMessenger).setOwner(this);            } catch (ClassCastException cce) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.severe("Transport messengers must all extend BlockingMessenger for now. " +                            cachedMessenger + " may remain open beyond its use.");                }            }            return true;        }        /**         * {@inheritDoc}         */        @Override        protected EndpointAddress getLogicalDestinationImpl() {            if (cachedMessenger == null) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.severe("Internal messenger error: logical destination requested while not connected.");                }                return null;            }            return cachedMessenger.getLogicalDestinationAddress();        }        /**         * {@inheritDoc}         */        @Override        protected void sendMessageBImpl(Message msg, String service, String param) throws IOException {            if (cachedMessenger == null) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.severe("Internal messenger error: send requested while not connected.");                }                throw new IOException("Internal messenger error.");            }            try {                cachedMessenger.sendMessageB(msg, service, param);            } catch (IOException any) {                cachedMessenger = null;                throw any;            } catch (RuntimeException any) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "Failure sending " + msg, any);                }                throw any;            }        }    }    /**     * Create a new EndpointService.     */    public EndpointServiceImpl() {    }    /**     * {@inheritDoc}     */    public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {        if (initialized) {            throw new PeerGroupException("Cannot initialize service more than once");        }        this.group = group;        // The selector for the element of the peer adv params that we have to update.        this.assignedID = assignedID;        this.implAdvertisement = (ModuleImplAdvertisement) impl;        this.localPeerId = group.getPeerID().toString();        this.myServiceName = ChannelMessenger.InsertedServicePrefix + group.getPeerGroupID().getUniqueValue().toString();        ConfigParams confAdv = group.getConfigAdvertisement();        XMLElement paramBlock = null;        if (confAdv != null) {            paramBlock = (XMLElement) confAdv.getServiceParam(assignedID);        }        if (paramBlock != null) {            // get our two tunables: virtual messenger queue size, and whether to use the parent endpoint            Enumeration param;            param = paramBlock.getChildren("MessengerQueueSize");            if (param.hasMoreElements()) {                String textQSz = ((XMLElement) param.nextElement()).getTextValue();                try {                    Integer requestedSize = Integer.parseInt(textQSz.trim());                    if (requestedSize > 0) {                        vmQueueSize = requestedSize;                    } else {                        LOG.warning("Illegal MessengerQueueSize : " + textQSz);                    }                } catch (NumberFormatException e) {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.log(Level.WARNING, "could not parse MessengerQueueSize string", e);                    }                }            }            param = paramBlock.getChildren("UseParentEndpoint");            if (param.hasMoreElements()) {                String textUPE = ((XMLElement) param.nextElement()).getTextValue();                useParentEndpoint = textUPE.trim().equalsIgnoreCase("true");            }        }        PeerGroup parentGroup = group.getParentGroup();        if (useParentEndpoint && parentGroup != null) {            parentEndpoint = parentGroup.getEndpointService();            parentEndpoint.addMessengerEventListener(this, EndpointService.LowPrecedence);        }        initialized = true;        if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {            StringBuilder configInfo = new StringBuilder("Configuring Endpoint Service : " + assignedID);            if (implAdvertisement != null) {                configInfo.append("\n\tImplementation :");                configInfo.append("\n\t\tModule Spec ID: ");                configInfo.append(implAdvertisement.getModuleSpecID());                configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());                configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());                configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());            }            configInfo.append("\n\tGroup Params :");            configInfo.append("\n\t\tGroup : ").append(group);            configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());            configInfo.append("\n\tConfiguration :");            if (null == parentGroup) {                configInfo.append("\n\t\tHome Group : (none)");            } else {                configInfo.append("\n\t\tHome Group : ").append(parentGroup.getPeerGroupName()).append(" / ").append(                        parentGroup.getPeerGroupID());            }            configInfo.append("\n\t\tUsing home group endpoint : ").append(parentEndpoint);            configInfo.append("\n\t\tVirtual Messenger Queue Size : ").append(vmQueueSize);            LOG.config(configInfo.toString());        }    }    /**     * {@inheritDoc}     */    public int startApp(String[] args) {        if (!initialized) {            return -1;        }        // FIXME  when Load order Issue is resolved this should fail        // until it is able to get a non-failing service Monitor (or        // null = not monitoring)         // FIXME it is ok because of the hack in StdPeerGroup that starts         // endpoint service first        if (EndpointMeterBuildSettings.ENDPOINT_METERING) { // Fix-Me: Move to startApp() when load order issue is resolved            endpointServiceMonitor = (EndpointServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.endpointServiceMonitorClassID);            if (endpointServiceMonitor != null) {                endpointMeter = endpointServiceMonitor.getEndpointMeter();            }        }        if (parentEndpoint != null) {            Iterator<MessageTransport> parentMTs = parentEndpoint.getAllMessageTransports();            synchronized (this) {                while (parentMTs.hasNext()) {                    addProtoToAdv(parentMTs.next());                }            }        }        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info("Endpoint Service started.");        }        return Module.START_OK;    }    /**     * {@inheritDoc}     * <p/>     * The transports and services are going to be stopped as well. When     * they are, they will dereference us and we'll go into oblivion.     */    public void stopApp() {        if (parentEndpoint != null) {            parentEndpoint.removeMessengerEventListener(this, EndpointService.LowPrecedence);        }        // Clear up the passiveMessengersListeners        int prec = EndpointService.HighPrecedence;        while (prec >= EndpointService.LowPrecedence) {            passiveMessengerListeners[prec--].clear();        }        // Clear up any messengers.        messengerMap.clear();        directMessengerMap.clear();        // Clear up the listeners        incomingMessageListeners.clear();        // Forget about any message filters.        incomingFilterListeners.clear();        outgoingFilterListeners.clear();        // Forget any message transports        messageTransports.clear();        // Avoid cross-reference problems with the GC        // group = null;        // parentEndpoint = null;        // parentGroup = null;        // The above is not really needed and until we have a very orderly        // shutdown, it causes NPEs that are hard to prevent.        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info("Endpoint Service stopped.");        }    }    /**     * {@inheritDoc}     */    public PeerGroup getGroup() {        return group;    }    /**     * {@inheritDoc}     * <p/>     * We create a new instance each time because our interface actually     * has state (channel messengers and listener callback adaptor).     */    public EndpointService getInterface() {        return new EndpointServiceInterface(this);    }    /**     * {@inheritDoc}     */    public ModuleImplAdvertisement getImplAdvertisement() {        return implAdvertisement;    }    // A vector for statistics between propagateThroughAll and its invoker.    private static class Metrics {        int numFilteredOut = 0;        int numPropagatedTo = 0;        int numErrorsPropagated = 0;    }    private void propagateThroughAll(Iterator<MessageTransport> eachProto, Message myMsg, String serviceName, String serviceParam, int initialTTL, Metrics metrics) {        Message filtered = null;        while (eachProto.hasNext()) {            MessageTransport aTransport = eachProto.next();            try {                if (!(aTransport instanceof MessagePropagater)) {                    continue;                }                MessagePropagater propagater = (MessagePropagater) aTransport;                if (null == filtered) {                    // run process filters only once                    filtered = processFilters(myMsg,                            propagater.getPublicAddress(),                            new EndpointAddress(group.getPeerGroupID(), serviceName, serviceParam),                            false);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -