📄 endpointserviceimpl.java
字号:
if (null != serviceParam) { listener = incomingMessageListeners.get(serviceName + "/" + serviceParam); } // Didn't find it with param, maybe there is a generic listener for the service if (listener == null) { listener = incomingMessageListeners.get(serviceName); } // Didn't find it still, try the compatibility name. if (listener == null) { listener = incomingMessageListeners.get(serviceName + serviceParam); if ((null != listener) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Found handler only via compatibility listener : " + serviceName + serviceParam); } } return listener; } /** * {@inheritDoc} */ public EndpointListener removeIncomingMessageListener(String serviceName, String serviceParam) { if (null == serviceName) { throw new IllegalArgumentException("serviceName must not be null"); } if (-1 != serviceName.indexOf('/')) { throw new IllegalArgumentException("serviceName may not contain '/' characters"); } String address = serviceName; if (null != serviceParam) { address += "/" + serviceParam; } EndpointListener removedListener; synchronized (incomingMessageListeners) { removedListener = incomingMessageListeners.remove(address); } if (parentEndpoint != null) { if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { parentEndpoint.removeIncomingMessageListener(serviceName, serviceParam); } else { parentEndpoint.removeIncomingMessageListener(myServiceName, address); } } return removedListener; } /** * Returns a local transport that can send to the given address. For now * this is based only on the protocol name. * * @param addr the endpoint address * @return the transport if the address protocol is supported by this transport */ private MessageSender getLocalSenderForAddress(EndpointAddress addr) { Iterator<MessageTransport> localTransports = getAllLocalTransports(); while (localTransports.hasNext()) { MessageTransport transpt = localTransports.next(); if (!transpt.getProtocolName().equals(addr.getProtocolName())) { continue; } if (!(transpt instanceof MessageSender)) { continue; } return (MessageSender) transpt; } return null; } /** * {@inheritDoc} * <p/> * Note: canonical messenger itself does not do any address rewriting. * Any address rewriting must be specified when getting a channel. However, * canonical knows the default group redirection for its owning endpoint and * will automatically skip redirection if it is the same. */ public Messenger getCanonicalMessenger(EndpointAddress addr, Object hint) { if (addr == null) { throw new IllegalArgumentException("null endpoint address not allowed."); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { Throwable trace = new Throwable("Stack Trace"); StackTraceElement elements[] = trace.getStackTrace(); int position = 1; while (elements[position].getClassName().startsWith("net.jxta.impl.endpoint.EndpointService")) { position++; } if ((elements.length - 1) == position) { position--; } LOG.fine("Get Messenger for " + addr + " by " + elements[position]); } // Check the canonical map. synchronized (messengerMap) { Reference<Messenger> ref = messengerMap.get(addr); if (ref != null) { Messenger found = ref.get(); // If it is USABLE, return it. if ((found != null) && ((found.getState() & Messenger.USABLE) != 0)) { return found; } // It has been GCed or is no longer USABLE. Make room for a new one. messengerMap.remove(addr); } if (getLocalSenderForAddress(addr) != null) { OutboundMeter messengerMeter = null; if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) { messengerMeter = endpointServiceMonitor.getOutboundMeter(addr); } // The hint is saved in the canonical messenger and will be used // when that virtual messenger first faces the need to create a // transport messenger. As of now, the logical dest is unknown. Messenger m = new CanonicalMessenger(vmQueueSize, addr, null, hint, messengerMeter); messengerMap.put(m.getDestinationAddress(), new SoftReference<Messenger>(m)); return m; } } // If we're here, we do not have any such transport. // Try our ancestors enpoints, if any. if (parentEndpoint == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Could not create messenger for : " + addr); } return null; } return parentEndpoint.getCanonicalMessenger(addr, hint); } /** * Return only the message transport registered locally. */ protected Iterator<MessageTransport> getAllLocalTransports() { List<MessageTransport> transportList; synchronized (messageTransports) { transportList = new ArrayList<MessageTransport>(messageTransports); } return transportList.iterator(); } /** * Returns a messenger for the specified address from one of the Message * Transports registered with this very endpoint service. Message * Transports inherited from parent groups will not be used. * * @param addr The destination address of the desired Messenger. * @param hint A hint provided to the Message Transport which may assist it * in creating the messenger. * @return A Messenger for the specified destination address or {@code null} * if no Messenger could be created. */ private Messenger getLocalTransportMessenger(EndpointAddress addr, Object hint) { MessageSender sender = getLocalSenderForAddress(addr); Messenger messenger = null; if (sender != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Trying address \'" + addr + "\' with : " + sender); } messenger = sender.getMessenger(addr, hint); } if (messenger == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Couldn\'t create messenger for : " + addr); } } return messenger; } /** * {@inheritDoc} */ public synchronized void addIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { if (null == listener) { throw new IllegalArgumentException("listener must be non-null"); } FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name); incomingFilterListeners.add(aFilter); } /** * {@inheritDoc} */ public synchronized void addOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { if (null == listener) { throw new IllegalArgumentException("listener must be non-null"); } FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name); outgoingFilterListeners.add(aFilter); } /** * {@inheritDoc} */ public synchronized MessageFilterListener removeIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { Iterator<FilterListenerAndMask> eachListener = incomingFilterListeners.iterator(); while (eachListener.hasNext()) { FilterListenerAndMask aFilter = eachListener.next(); if (listener == aFilter.listener) { eachListener.remove(); return listener; } } return null; } /** * {@inheritDoc} */ public synchronized MessageFilterListener removeOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) { Iterator<FilterListenerAndMask> eachListener = outgoingFilterListeners.iterator(); while (eachListener.hasNext()) { FilterListenerAndMask aFilter = eachListener.next(); if ((listener == aFilter.listener) && ((null != namespace) ? namespace.equals(aFilter.namespace) : (null == aFilter.namespace)) && ((null != name) ? name.equals(aFilter.name) : (null == aFilter.name))) { eachListener.remove(); return listener; } } return null; } /** * {@inheritDoc} * * <p/>Redistribute the event to those interested. */ public boolean messengerReady(MessengerEvent event) { // FIXME - jice@jxta.org 20040413: now that we share messengers, we // should be able to get rid of most of this mess, and in the router, // and the relay too. Messenger messenger = event.getMessenger(); Messenger messengerForHere; EndpointAddress connAddr = event.getConnectionAddress(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("New " + messenger + " for : " + messenger.getDestinationAddress() + " (" + messenger.getLogicalDestinationAddress() + ")"); } int highestPrec = EndpointService.HighPrecedence; int lowestPrec = EndpointService.LowPrecedence; // If there's no connection address we just pass the messenger around // everywhere; it is unspecified which group it is for. // Else, we must figure out if it is for this group, or must be // passed upStack (if any). if (connAddr != null) { String cgServiceName = connAddr.getServiceName(); // See if there is a mangling. If not, this means this was sent // within this group through a local xport, so it is for here. Else // it may be for here (from below) or for upstack. if (cgServiceName == null || !cgServiceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { // FIXME: jice@jxta.org - 20030512 If we restrict use to "here" // we make most backchannels useless. So, the statement below is // commented out. Ideally we should not have to worry about the // group targetting of connections, only messages. However the // way the relay and the router have to split messengers makes // it necessary. This may only be fixed by re-organizing // globally the management of incoming messengers in the // endpoint, so that router and relay no-longer need to claim // exclusive use of messengers. Since relay clients set the // group properly, their messengers are not affected by this // branch of the code. // lowestPrec = EndpointService.LowPrecedence + 1; } else if (!myServiceName.equals(cgServiceName)) { // This is for upstack only highestPrec = EndpointService.LowPrecedence; } else { // Mangling present and this is for here (and therefore this is // from below). We must demangle. Wrapping is figured later, // since we may also have to wrap if there the lowestPrec = EndpointService.LowPrecedence + 1; String serviceParam = connAddr.getServiceParameter(); String realService = null; String realParam = null; if (null != serviceParam) { int slashAt = serviceParam.indexOf('/'); if (-1 == slashAt) { realService = serviceParam; } else { realService = serviceParam.substring(0, slashAt); realParam = serviceParam.substring(slashAt + 1); } } connAddr = new EndpointAddress(connAddr, realService, realParam); } } // We make a channel in all cases, the channel will decide if the desired grp redirection // requires address rewriting or not. // As for a MessageWatcher for implementing sendMessage-with-listener, we do not provide one // mostly because it is difficult to get a hold on the only appropriate one: that of the endpoint
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -