📄 endpointserviceimpl.java
字号:
String serviceParam) { if (null == serviceName) { throw new IllegalArgumentException("serviceName must not be null"); } if (-1 != serviceName.indexOf('/')) { throw new IllegalArgumentException("serviceName may not contain '/' characters"); } String address = serviceName; if (null != serviceParam) { address += "/" + serviceParam; } QuotaIncomingMessageListener removedListener = null; EndpointListener result = null; synchronized (incomingMessageListeners) { removedListener = (QuotaIncomingMessageListener) incomingMessageListeners.remove(address); if (removedListener != null) { result = removedListener.getListener(); // We need to explicitly close the QuotaIncomingMessageListener removedListener.close(); } } if (parentEndpoint != null) { if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { parentEndpoint.removeIncomingMessageListener(serviceName, serviceParam); } else { parentEndpoint.removeIncomingMessageListener(myServiceName, address); } } return result; } /** * Returns a local transport that can send to the given address For now this * is based only on the protocol name. **/ private MessageSender getLocalSenderForAddress(EndpointAddress addr) { Iterator localTransports = getAllLocalTransports(); while (localTransports.hasNext()) { MessageTransport transpt = (MessageTransport) localTransports.next(); if (!transpt.getProtocolName().equals(addr.getProtocolName())) { continue; } if (!(transpt instanceof MessageSender)) { // Do we allow non-senders in the list ? continue; } return (MessageSender) transpt; } return null; } /** * {@inheritDoc} * * <p/>Note: canonical messenger itself does not do any address rewritting. * Any address rewritting must be specified when getting a channel. However, * canonical knows the default group redirection for its owning endpoint and * will automatically skip redirection if it is the same. */ public Messenger getCanonicalMessenger(EndpointAddress addr, Object hint) { if (LOG.isEnabledFor(Level.DEBUG)) { Throwable trace = new Throwable("Stack Trace"); StackTraceElement elements[] = trace.getStackTrace(); int position = 1; while (elements[position].getClassName().startsWith("net.jxta.impl.endpoint.EndpointService")) { position++; } if ((elements.length - 1) == position) { position--; } LOG.debug("Get Messenger for " + addr + " by " + elements[position]); } if (addr == null) { throw new IllegalArgumentException("null endpoint address not allowed."); } // Check the canonical map. synchronized (messengerMap) { Reference ref = (Reference) messengerMap.get(addr); if (ref != null) { Messenger found = (Messenger) ref.get(); // If it is USABLE, return it. if ((found != null) && ((found.getState() & Messenger.USABLE) != 0)) { return found; } // It has been GCed or is nolonger USABLE. Make room for a new one. messengerMap.remove(ref); } if (getLocalSenderForAddress(addr) != null) { OutboundMeter messengerMeter = null; if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) { messengerMeter = endpointServiceMonitor.getOutboundMeter(addr); } // The hint is saved in the canonical messenger and will be used // when that virtual messenger first faces the need to create a // transport messenger. As of now, the logical dest is unknown. Messenger m = new CanonicalMessenger(vmQueueSize, addr, null, hint, messengerMeter); messengerMap.put(m.getDestinationAddressObject(), new SoftReference(m)); return m; } } // If we're here, we do not have any such transport. // Try our ancestors enpoints, if any. if (parentEndpoint == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Couldn't create messenger for : " + addr); } return null; } return parentEndpoint.getCanonicalMessenger(addr, hint); } /** * Return only the message transport registered locally. */ protected Iterator getAllLocalTransports() { List transportList; synchronized (messageTransports) { transportList = Arrays.asList(messageTransports.toArray()); } return transportList.iterator(); } /** * Returns a messenger from one of the transports registered with this very endpoint * service. Not one of of the parent ones. **/ protected Messenger getLocalTransportMessenger(EndpointAddress addr, Object hint) { MessageSender sender = (MessageSender) getLocalSenderForAddress(addr); Messenger messenger = null; if (sender != null) { EndpointAddress addressToUse = (EndpointAddress) addr.clone(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Trying address '" + addressToUse + "' with : " + sender); } messenger = sender.getMessenger(addressToUse, hint); } if (messenger == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Couldn't create messenger for : " + addr); } } return messenger; } /** * {@inheritDoc} **/ public synchronized void addIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { if (null == listener) { throw new IllegalArgumentException("listener must be non-null"); } FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name); incomingFilterListeners.add(aFilter); } /** * {@inheritDoc} **/ public synchronized void addOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { if (null == listener) { throw new IllegalArgumentException("listener must be non-null"); } FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name); outgoingFilterListeners.add(aFilter); } /** * {@inheritDoc} **/ public synchronized MessageFilterListener removeIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { Iterator eachListener = incomingFilterListeners.iterator(); while (eachListener.hasNext()) { FilterListenerAndMask aFilter = (FilterListenerAndMask) eachListener.next(); if (listener == aFilter.listener) { eachListener.remove(); return listener; } } return null; } /** * {@inheritDoc} **/ public synchronized MessageFilterListener removeOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { Iterator eachListener = outgoingFilterListeners.iterator(); while (eachListener.hasNext()) { FilterListenerAndMask aFilter = (FilterListenerAndMask) eachListener.next(); if ((listener == aFilter.listener) && ((null != namespace) ? namespace.equals(aFilter.namespace) : (null == aFilter.namespace)) && ((null != name) ? name.equals(aFilter.name) : (null == aFilter.name))) { eachListener.remove(); return listener; } } return null; } /** * A messenger from a transport is ready. Redistribute the event to those interrested. */ public boolean messengerReady(MessengerEvent event) { // FIXME - jice@jxta.org 20040413: now that we share messengers, we should be able to get rid of most of this // mess, and in the router, and the relay too. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("New messenger event for : " + event.getMessenger().getDestinationAddress()); } if (!(event.getSource() instanceof MessageTransport)) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("New messenger from non-transport. Ignored"); } return false; } MessageTransport source = (MessageTransport) event.getSource(); Messenger messenger = event.getMessenger(); Messenger messengerForHere = messenger; EndpointAddress connAddr = event.getConnectionAddress(); int highestPrec = EndpointService.HighPrecedence; int lowestPrec = EndpointService.LowPrecedence; // If there's no connection address we just pass the messenger // around everywhere; it is unspecified which group it is for. // Else, we must figure out if it is for this group, or must be // passed upStack (if any). if (connAddr != null) { String cgServiceName = connAddr.getServiceName(); // See if there is a mangling. If not, this means this // was sent within this group through a local xport, // so it is for here. Else it may be for here (from below) // or for upstack. if (cgServiceName == null || !cgServiceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {// FIXME: jice@jxta.org - 20030512 If we restrict use // to "here" we make most backchannels useless. So, // the statement below is commented out. Ideally we // should not have to worry about the group targetting // of connections, only messages. However the way the // relay and the router have to split messengers makes // it necessary. This may only be fixed by // re-organizing globaly the management of incoming // messengers in the endpoint, so that router and // relay no-longer need to claim exclusive use of // messengers. Since relay clients set the group // properly, their messengers are not affected by this // branch of the code. // lowestPrec = EndpointService.LowPrecedence + 1; } else if (!myServiceName.equals(cgServiceName)) { // This is for upstack only highestPrec = EndpointService.LowPrecedence; } else { // Mangling present and this is for here (and therefore this is // from below). We must demangle. Wrapping is figured // later, since we may also have to wrap if there the lowestPrec = EndpointService.LowPrecedence + 1; String serviceParam = connAddr.getServiceParameter(); String realService = null; String realParam = null; if (null != serviceParam) { int slashAt = serviceParam.indexOf('/'); if (-1 == slashAt) { realService = serviceParam; } else { realService = serviceParam.substring(0, slashAt); realParam = serviceParam.substring(slashAt + 1); } } connAddr.setServiceName(realService); connAddr.setServiceParameter(realParam); } } // We make a channel in all cases, the channel will d
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -