📄 endpointserviceimpl.java
字号:
/** * close this canonical messenger. **/ public void close() {// No way. Not form the outside. } /** * Drop the current messenger. **/ protected void closeImpl() { if (cachedMessenger != null) { cachedMessenger.close(); cachedMessenger = null; } else { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Internal messenger error: close requested while not connected."); } } } /** * Get a transport messenger to the destination. * * FIXME - jice@jxta.org 20040413: do better hint management. **/ protected boolean connectImpl() { if (cachedMessenger != null) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Internal messenger error: connect requested while connected."); } cachedMessenger.close(); cachedMessenger = null; } // Consume the hint, if any. Object theHint = hint; hint = null; cachedMessenger = getLocalTransportMessenger(getDestinationAddress(), theHint); if (cachedMessenger == null) { return false; } // FIXME - jice@jxta.org 20040413: it's not too clean: we assume // that all transports use BlockingMessenger as the base class for // their messengers. If they don't we can't force them to hold the // strong reference to the canonical messenger. try { ((BlockingMessenger) cachedMessenger).setOwner(this); } catch (ClassCastException cce) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error( "Transport messengers must all extend BlockingMessenger for now. " + cachedMessenger + " may remain open beyond its use."); } } return true; } /** * {@inheritDoc} **/ protected EndpointAddress getLogicalDestinationImpl() { if (cachedMessenger == null) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Internal messenger error: logical destination requested while not connected."); } return null; } return cachedMessenger.getLogicalDestinationAddress(); } /** * {@inheritDoc} **/ protected void sendMessageBImpl(Message msg, String service, String param) throws IOException { if (cachedMessenger == null) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Internal messenger error: send requested while not connected."); } throw new IOException("Internal messenger error."); } try { cachedMessenger.sendMessageB(msg, service, param); } catch (IOException any) { // FIXME - jice@jxta.org 20040413: beware of the funky runtime ones. cachedMessenger = null; throw any; } } } /** * Create a new EndpointService. **/ public EndpointServiceImpl() {} /** * Initialize the application passing it its peer group and advertisement. * * @param group PeerGroup this application is started from * @param assignedID The ID which this instance should be known by. * @param impl The advertisement for this application * * @exception PeerGroupException failure to initialize this application. */ public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException { if (initialized) { throw new PeerGroupException("Cannot initialize service more than once"); } // There's no config of interest in our implAdv, but we must be able // to return it if queried. We also need our assigned ID; that's the // selector for the element of the peer adv that we have to update. this.implAdv = (ModuleImplAdvertisement) impl; this.assignedID = assignedID; this.group = group; this.localPeerId = group.getPeerID().toString(); this.myServiceName = ChannelMessenger.InsertedServicePrefix + group.getPeerGroupID().getUniqueValue().toString(); ConfigParams confAdv = (ConfigParams) group.getConfigAdvertisement(); XMLElement paramBlock = null; if (confAdv != null) { paramBlock = (XMLElement) confAdv.getServiceParam(assignedID); } if (paramBlock != null) { // get our two tunables: virtual messenger queue size, and whether to use the parent endpoint Enumeration param; param = paramBlock.getChildren("MessengerQueueSize"); if (param.hasMoreElements()) { String textQSz = ((XMLElement) param.nextElement()).getTextValue(); try { vmQueueSize = Integer.parseInt(textQSz.trim()); } catch (NumberFormatException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("could not parse MessengerQueueSize string", e); } } } param = paramBlock.getChildren("UseParentEndpoint"); if (param.hasMoreElements()) { // if it's absent, the default is "true" String textUPE = ((XMLElement) param.nextElement()).getTextValue(); useParentEndpoint = textUPE.trim().equalsIgnoreCase("true"); } } parentGroup = group.getParentGroup(); if (useParentEndpoint && parentGroup != null) { parentEndpoint = parentGroup.getEndpointService(); parentEndpoint.addMessengerEventListener(this, EndpointService.LowPrecedence); } initialized = true; if (LOG.isEnabledFor(Level.INFO)) { StringBuffer configInfo = new StringBuffer("Configuring Endpoint Service : " + assignedID); configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tImpl Description : " + implAdv.getDescription()); configInfo.append("\n\t\tImpl URI : " + implAdv.getUri()); configInfo.append("\n\t\tImpl Code : " + implAdv.getCode()); configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : " + group.getPeerGroupName()); configInfo.append("\n\t\tGroup ID : " + group.getPeerGroupID()); configInfo.append("\n\t\tPeer ID : " + group.getPeerID()); configInfo.append("\n\tConfiguration :"); if (null == parentGroup) { configInfo.append("\n\t\tHome Group : (none)"); } else { configInfo.append("\n\t\tHome Group : " + parentGroup.getPeerGroupName() + " / " + parentGroup.getPeerGroupID()); } configInfo.append("\n\t\tVirtual Messenger Queue Size : " + vmQueueSize); if (group.getPeerGroupID().equals(PeerGroupID.worldPeerGroupID)) { configInfo.append("\n\tQuota Incoming Message Params :"); configInfo.append("\n\t\tMax message size : " + QuotaIncomingMessageListener.GmaxMsgSize); configInfo.append("\n\t\tMax message senders : " + QuotaIncomingMessageListener.GmaxSenders); } LOG.info(configInfo); } } /** * {@inheritDoc} **/ public int startApp(String[] args) { // Fix-Me: when Load order Issue is resolved this should fail // until it is able to get a non-failing service Monitor (or // null = not monitoring) Fix-Me: it is ok because of the hack // in StdPeerGroup that starts endpoint service first if (EndpointMeterBuildSettings.ENDPOINT_METERING) { // Fix-Me: Move to startApp() when load order issue is resolved endpointServiceMonitor = (EndpointServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.endpointServiceMonitorClassID); if (endpointServiceMonitor != null) { endpointMeter = endpointServiceMonitor.getEndpointMeter(); } } if (parentEndpoint != null) { Iterator parentMTs = parentEndpoint.getAllMessageTransports(); synchronized (this) { while (parentMTs.hasNext()) { addProtoToAdv((MessageTransport) parentMTs.next()); } } } return 0; } /** * {@inheritDoc} * * <p/>The protocols and services are going * to be stopped as well. When they are, they will unreference us and * we'll go into oblivion. */ public void stopApp() { if (parentEndpoint != null) { parentEndpoint.removeMessengerEventListener(this, EndpointService.LowPrecedence); } // Clear up the passiveMessengersListeners for (int i = 0; i < 3; ++i) { List list = passiveMessengerListeners[i]; if (list != null) { list.clear(); } } // Clear up the HashMap if (incomingMessageListeners != null) { try { incomingMessageListeners.clear(); } catch (Exception ez) {// Not much can be done } } // Avoid cross-reference problems with the GC // group = null; // parentEndpoint = null; // parentGroup = null; // The above is not really needed and until we have a very orderly shutdown, it causes NPEs // that are hard to prevent. } /** * Returns the group to which this EndpointServiceImpl is attached. * * @return PeerGroup the group. */ public PeerGroup getGroup() { return group; } /** * Service objects are not manipulated directly to protect usage * of the service. A Service interface is returned to access the service * methods. * * @return Service public interface of the service * */ public Service getInterface() { return new EndpointServiceInterface(this); } /** * Returns the advertisment for this service. * * @return Advertisement the advertisement. * */ public Advertisement getImplAdvertisement() { return implAdv; } // A vector for statistics between propagateThroughAll and its invoker. private static class Metrics { int numFilteredOut = 0; int numPropagatedTo = 0; int numErrorsPropagated = 0; } private void propagateThroughAll(Iterator eachProto, Message myMsg, String serviceName, String serviceParam, Metrics metrics) { Message filtered = null; while (eachProto.hasNext()) { MessageTransport aTransport = (MessageTransport) eachProto.next(); try { if (!(aTransport instanceof MessageSender)) { continue; } MessageSender sender = (MessageSender) aTransport; if (!sender.isPropagateEnabled()) { //no sense in consuming resources continue; } if (null == filtered) { // run process filters only once filtered = processFilters(myMsg, sender.getPublicAddress(), new EndpointAddress(group.getPeerGroupID(), serviceName, serviceParam), false); } if (null == filtered) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(" message "+filtered+ " discarded upon filter decision"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -