⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointserviceimpl.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
                /**         * close this canonical messenger.         **/        public void close() {// No way. Not form the outside.        }                /**         * Drop the current messenger.         **/        protected void closeImpl() {            if (cachedMessenger != null) {                cachedMessenger.close();                cachedMessenger = null;            } else {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Internal messenger error: close requested while not connected.");                }            }        }                /**         * Get a transport messenger to the destination.         *         * FIXME - jice@jxta.org 20040413: do better hint management.         **/        protected boolean connectImpl() {            if (cachedMessenger != null) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Internal messenger error: connect requested while connected.");                }                cachedMessenger.close();                cachedMessenger = null;            }                        // Consume the hint, if any.            Object theHint = hint;                        hint = null;            cachedMessenger = getLocalTransportMessenger(getDestinationAddress(), theHint);                        if (cachedMessenger == null) {                return false;            }                        // FIXME - jice@jxta.org 20040413: it's not too clean: we assume            // that all transports use BlockingMessenger as the base class for            // their messengers. If they don't we can't force them to hold the            // strong reference to the canonical messenger.            try {                ((BlockingMessenger) cachedMessenger).setOwner(this);            } catch (ClassCastException cce) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error(                    "Transport messengers must all extend BlockingMessenger for now. " + cachedMessenger + " may remain open beyond its use.");                }            }            return true;        }                /**         *  {@inheritDoc}         **/        protected EndpointAddress getLogicalDestinationImpl() {            if (cachedMessenger == null) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Internal messenger error: logical destination requested while not connected.");                }                return null;            }            return cachedMessenger.getLogicalDestinationAddress();        }                /**         *  {@inheritDoc}         **/        protected void sendMessageBImpl(Message msg, String service, String param) throws IOException {            if (cachedMessenger == null) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Internal messenger error: send requested while not connected.");                }                throw new IOException("Internal messenger error.");            }            try {                cachedMessenger.sendMessageB(msg, service, param);            } catch (IOException any) {                // FIXME - jice@jxta.org 20040413: beware of the funky runtime ones.                cachedMessenger = null;                throw any;            }        }    }        /**     *  Create a new EndpointService.     **/    public EndpointServiceImpl() {}        /**     *  Initialize the application passing it its peer group and advertisement.     *     *  @param group PeerGroup this application is started from     *  @param assignedID The ID which this instance should be known by.     *  @param impl The advertisement for this application     *     *  @exception PeerGroupException failure to initialize this application.     */    public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl)    throws PeerGroupException {        if (initialized) {            throw new PeerGroupException("Cannot initialize service more than once");        }                // There's no config of interest in our implAdv, but we must be able        // to return it if queried. We also need our assigned ID; that's the        // selector for the element of the peer adv that we have to update.        this.implAdv = (ModuleImplAdvertisement) impl;        this.assignedID = assignedID;        this.group = group;        this.localPeerId = group.getPeerID().toString();                this.myServiceName = ChannelMessenger.InsertedServicePrefix + group.getPeerGroupID().getUniqueValue().toString();                ConfigParams confAdv = (ConfigParams) group.getConfigAdvertisement();        XMLElement paramBlock = null;                if (confAdv != null) {            paramBlock = (XMLElement) confAdv.getServiceParam(assignedID);        }                if (paramBlock != null) {            // get our two tunables: virtual messenger queue size, and whether to use the parent endpoint            Enumeration param;                        param = paramBlock.getChildren("MessengerQueueSize");            if (param.hasMoreElements()) {                String textQSz = ((XMLElement) param.nextElement()).getTextValue();                                try {                    vmQueueSize = Integer.parseInt(textQSz.trim());                } catch (NumberFormatException e) {                    if (LOG.isEnabledFor(Level.WARN)) {                        LOG.warn("could not parse MessengerQueueSize string", e);                    }                }            }                        param = paramBlock.getChildren("UseParentEndpoint");            if (param.hasMoreElements()) { // if it's absent, the default is "true"                String textUPE = ((XMLElement) param.nextElement()).getTextValue();                                useParentEndpoint = textUPE.trim().equalsIgnoreCase("true");            }                    }                parentGroup = group.getParentGroup();                if (useParentEndpoint && parentGroup != null) {                        parentEndpoint = parentGroup.getEndpointService();                        parentEndpoint.addMessengerEventListener(this, EndpointService.LowPrecedence);        }                initialized = true;                if (LOG.isEnabledFor(Level.INFO)) {            StringBuffer configInfo = new StringBuffer("Configuring Endpoint Service : " + assignedID);                        configInfo.append("\n\tImplementation :");            configInfo.append("\n\t\tImpl Description : " + implAdv.getDescription());            configInfo.append("\n\t\tImpl URI : " + implAdv.getUri());            configInfo.append("\n\t\tImpl Code : " + implAdv.getCode());                        configInfo.append("\n\tGroup Params :");            configInfo.append("\n\t\tGroup : " + group.getPeerGroupName());            configInfo.append("\n\t\tGroup ID : " + group.getPeerGroupID());            configInfo.append("\n\t\tPeer ID : " + group.getPeerID());                        configInfo.append("\n\tConfiguration :");            if (null == parentGroup) {                configInfo.append("\n\t\tHome Group : (none)");            } else {                configInfo.append("\n\t\tHome Group : " + parentGroup.getPeerGroupName() + " / " + parentGroup.getPeerGroupID());            }            configInfo.append("\n\t\tVirtual Messenger Queue Size : " + vmQueueSize);            if (group.getPeerGroupID().equals(PeerGroupID.worldPeerGroupID)) {                configInfo.append("\n\tQuota Incoming Message Params :");                configInfo.append("\n\t\tMax message size : " + QuotaIncomingMessageListener.GmaxMsgSize);                configInfo.append("\n\t\tMax message senders : " + QuotaIncomingMessageListener.GmaxSenders);            }                        LOG.info(configInfo);        }    }        /**     *  {@inheritDoc}     **/    public int startApp(String[] args) {        // Fix-Me: when Load order Issue is resolved this should fail        // until it is able to get a non-failing service Monitor (or        // null = not monitoring) Fix-Me: it is ok because of the hack        // in StdPeerGroup that starts endpoint service first        if (EndpointMeterBuildSettings.ENDPOINT_METERING) { // Fix-Me: Move to startApp() when load order issue is resolved            endpointServiceMonitor = (EndpointServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.endpointServiceMonitorClassID);                        if (endpointServiceMonitor != null) {                endpointMeter = endpointServiceMonitor.getEndpointMeter();            }        }                if (parentEndpoint != null) {                        Iterator parentMTs = parentEndpoint.getAllMessageTransports();                        synchronized (this) {                while (parentMTs.hasNext()) {                    addProtoToAdv((MessageTransport) parentMTs.next());                }            }        }                return 0;    }        /**     *  {@inheritDoc}     *     * <p/>The protocols and services are going     * to be stopped as well. When they are, they will unreference us and     * we'll go into oblivion.     */    public void stopApp() {                if (parentEndpoint != null) {            parentEndpoint.removeMessengerEventListener(this, EndpointService.LowPrecedence);        }                // Clear up the passiveMessengersListeners        for (int i = 0; i < 3; ++i) {            List list = passiveMessengerListeners[i];                        if (list != null) {                list.clear();            }        }                // Clear up the HashMap        if (incomingMessageListeners != null) {            try {                incomingMessageListeners.clear();            } catch (Exception ez) {// Not much can be done            }        }                // Avoid cross-reference problems with the GC                // group = null;        // parentEndpoint = null;        // parentGroup = null;                // The above is not really needed and until we have a very orderly shutdown, it causes NPEs        // that are hard to prevent.    }        /**     * Returns the group to which this EndpointServiceImpl is attached.     *     * @return PeerGroup the group.     */    public PeerGroup getGroup() {        return group;    }        /**     * Service objects are not manipulated directly to protect usage     * of the service. A Service interface is returned to access the service     * methods.     *     * @return Service public interface of the service     *     */    public Service getInterface() {        return new EndpointServiceInterface(this);    }        /**     * Returns the advertisment for this service.     *     * @return Advertisement the advertisement.     *     */    public Advertisement getImplAdvertisement() {        return implAdv;    }        // A vector for statistics between propagateThroughAll and its invoker.    private static class Metrics {        int numFilteredOut = 0;        int numPropagatedTo = 0;        int numErrorsPropagated = 0;    }        private void propagateThroughAll(Iterator eachProto,                                     Message myMsg,                                     String serviceName,                                     String serviceParam,                                     Metrics metrics) {       Message filtered = null;        while (eachProto.hasNext()) {            MessageTransport aTransport = (MessageTransport) eachProto.next();            try {                if (!(aTransport instanceof MessageSender)) {                    continue;                }                MessageSender sender = (MessageSender) aTransport;                if (!sender.isPropagateEnabled()) {                    //no sense in consuming resources                    continue;                }                if (null == filtered) {                    // run process filters only once                    filtered = processFilters(myMsg,                                              sender.getPublicAddress(),                                               new EndpointAddress(group.getPeerGroupID(), serviceName, serviceParam),                                               false);                }                                if (null == filtered) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("   message "+filtered+ " discarded upon filter decision");                    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -