📄 endpointserviceimpl.java
字号:
* @param hint route hint * @param messengerMeter the metering object if any */ public CanonicalMessenger(int vmQueueSize, EndpointAddress destination, EndpointAddress logicalDestination, Object hint, OutboundMeter messengerMeter) { super(group.getPeerGroupID(), destination, logicalDestination, vmQueueSize); this.hint = hint; } /** * close this canonical messenger. */ @Override public void close() { // No way. Not form the outside. } /** * Drop the current messenger. */ @Override protected void closeImpl() { if (cachedMessenger != null) { cachedMessenger.close(); cachedMessenger = null; } else { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Internal messenger error: close requested while not connected."); } } } /** * Get a transport messenger to the destination. * <p/> * FIXME 20040413 jice : Do better hint management. */ @Override protected boolean connectImpl() { if (cachedMessenger != null) { if ((cachedMessenger.getState() & Messenger.TERMINAL) != 0) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.SEVERE)) { LOG.fine("Closing TERMINAL internal messenger : attempting requested connect."); } cachedMessenger.close(); cachedMessenger = null; } else { return true; } } // Consume the hint, if any. Object theHint = hint; hint = null; cachedMessenger = getLocalTransportMessenger(getDestinationAddress(), theHint); if (cachedMessenger == null) { return false; } // FIXME 20040413 jice : it's not too clean: we assume // that all transports use BlockingMessenger as the base class for // their messengers. If they don't we can't force them to hold the // strong reference to the canonical messenger. try { ((BlockingMessenger) cachedMessenger).setOwner(this); } catch (ClassCastException cce) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Transport messengers must all extend BlockingMessenger for now. " + cachedMessenger + " may remain open beyond its use."); } } return true; } /** * {@inheritDoc} */ @Override protected EndpointAddress getLogicalDestinationImpl() { if (cachedMessenger == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Internal messenger error: logical destination requested while not connected."); } return null; } return cachedMessenger.getLogicalDestinationAddress(); } /** * {@inheritDoc} */ @Override protected void sendMessageBImpl(Message msg, String service, String param) throws IOException { if (cachedMessenger == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Internal messenger error: send requested while not connected."); } throw new IOException("Internal messenger error."); } try { cachedMessenger.sendMessageB(msg, service, param); } catch (IOException any) { cachedMessenger = null; throw any; } catch (RuntimeException any) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure sending " + msg, any); } throw any; } } } /** * Create a new EndpointService. */ public EndpointServiceImpl() { } /** * {@inheritDoc} */ public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException { if (initialized) { throw new PeerGroupException("Cannot initialize service more than once"); } this.group = group; // The selector for the element of the peer adv params that we have to update. this.assignedID = assignedID; this.implAdvertisement = (ModuleImplAdvertisement) impl; this.localPeerId = group.getPeerID().toString(); this.myServiceName = ChannelMessenger.InsertedServicePrefix + group.getPeerGroupID().getUniqueValue().toString(); ConfigParams confAdv = group.getConfigAdvertisement(); XMLElement paramBlock = null; if (confAdv != null) { paramBlock = (XMLElement) confAdv.getServiceParam(assignedID); } if (paramBlock != null) { // get our two tunables: virtual messenger queue size, and whether to use the parent endpoint Enumeration param; param = paramBlock.getChildren("MessengerQueueSize"); if (param.hasMoreElements()) { String textQSz = ((XMLElement) param.nextElement()).getTextValue(); try { Integer requestedSize = Integer.parseInt(textQSz.trim()); if (requestedSize > 0) { vmQueueSize = requestedSize; } else { LOG.warning("Illegal MessengerQueueSize : " + textQSz); } } catch (NumberFormatException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "could not parse MessengerQueueSize string", e); } } } param = paramBlock.getChildren("UseParentEndpoint"); if (param.hasMoreElements()) { String textUPE = ((XMLElement) param.nextElement()).getTextValue(); useParentEndpoint = textUPE.trim().equalsIgnoreCase("true"); } } PeerGroup parentGroup = group.getParentGroup(); if (useParentEndpoint && parentGroup != null) { parentEndpoint = parentGroup.getEndpointService(); parentEndpoint.addMessengerEventListener(this, EndpointService.LowPrecedence); } initialized = true; if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) { StringBuilder configInfo = new StringBuilder("Configuring Endpoint Service : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: "); configInfo.append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : ").append(group); configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID()); configInfo.append("\n\tConfiguration :"); if (null == parentGroup) { configInfo.append("\n\t\tHome Group : (none)"); } else { configInfo.append("\n\t\tHome Group : ").append(parentGroup.getPeerGroupName()).append(" / ").append( parentGroup.getPeerGroupID()); } configInfo.append("\n\t\tUsing home group endpoint : ").append(parentEndpoint); configInfo.append("\n\t\tVirtual Messenger Queue Size : ").append(vmQueueSize); LOG.config(configInfo.toString()); } } /** * {@inheritDoc} */ public int startApp(String[] args) { if (!initialized) { return -1; } // FIXME when Load order Issue is resolved this should fail // until it is able to get a non-failing service Monitor (or // null = not monitoring) // FIXME it is ok because of the hack in StdPeerGroup that starts // endpoint service first if (EndpointMeterBuildSettings.ENDPOINT_METERING) { // Fix-Me: Move to startApp() when load order issue is resolved endpointServiceMonitor = (EndpointServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.endpointServiceMonitorClassID); if (endpointServiceMonitor != null) { endpointMeter = endpointServiceMonitor.getEndpointMeter(); } } if (parentEndpoint != null) { Iterator<MessageTransport> parentMTs = parentEndpoint.getAllMessageTransports(); synchronized (this) { while (parentMTs.hasNext()) { addProtoToAdv(parentMTs.next()); } } } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Endpoint Service started."); } return Module.START_OK; } /** * {@inheritDoc} * <p/> * The transports and services are going to be stopped as well. When * they are, they will dereference us and we'll go into oblivion. */ public void stopApp() { if (parentEndpoint != null) { parentEndpoint.removeMessengerEventListener(this, EndpointService.LowPrecedence); } // Clear up the passiveMessengersListeners int prec = EndpointService.HighPrecedence; while (prec >= EndpointService.LowPrecedence) { passiveMessengerListeners[prec--].clear(); } // Clear up any messengers. messengerMap.clear(); directMessengerMap.clear(); // Clear up the listeners incomingMessageListeners.clear(); // Forget about any message filters. incomingFilterListeners.clear(); outgoingFilterListeners.clear(); // Forget any message transports messageTransports.clear(); // Avoid cross-reference problems with the GC // group = null; // parentEndpoint = null; // parentGroup = null; // The above is not really needed and until we have a very orderly // shutdown, it causes NPEs that are hard to prevent. if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Endpoint Service stopped."); } } /** * {@inheritDoc} */ public PeerGroup getGroup() { return group; } /** * {@inheritDoc} * <p/> * We create a new instance each time because our interface actually * has state (channel messengers and listener callback adaptor). */ public EndpointService getInterface() { return new EndpointServiceInterface(this); } /** * {@inheritDoc} */ public ModuleImplAdvertisement getImplAdvertisement() { return implAdvertisement; } // A vector for statistics between propagateThroughAll and its invoker. private static class Metrics { int numFilteredOut = 0; int numPropagatedTo = 0; int numErrorsPropagated = 0; } private void propagateThroughAll(Iterator<MessageTransport> eachProto, Message myMsg, String serviceName, String serviceParam, int initialTTL, Metrics metrics) { Message filtered = null; while (eachProto.hasNext()) { MessageTransport aTransport = eachProto.next(); try { if (!(aTransport instanceof MessagePropagater)) { continue; } MessagePropagater propagater = (MessagePropagater) aTransport; if (null == filtered) { // run process filters only once filtered = processFilters(myMsg, propagater.getPublicAddress(), new EndpointAddress(group.getPeerGroupID(), serviceName, serviceParam), false);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -