📄 endpointserviceimpl.java
字号:
// check if it is already installed. if (!messageTransports.contains(transpt)) { clearProtoFromAdv(transpt); // just to be safe messageTransports.add(transpt); addProtoToAdv(transpt); // FIXME: For now, we return this. Later we might return something else, so that we can take // advantage of the fact that we know that the event is from a local transport. // That will help cleaning up the incoming messenger mess. return this; } } return null; } /** * {@inheritDoc} **/ public boolean removeMessageTransport(MessageTransport transpt) { boolean removed = false; synchronized (messageTransports) { removed = messageTransports.remove(transpt); } if (removed) { clearProtoFromAdv(transpt); } return removed; } /** * {@inheritDoc} **/ public Iterator getAllMessageTransports() { if (null != parentEndpoint) { return new SequenceIterator(getAllLocalTransports(), parentEndpoint.getAllMessageTransports()); } else { return getAllLocalTransports(); } } /** * {@inheritDoc} **/ public MessageTransport getMessageTransport(String name) { Iterator allTransports = getAllMessageTransports(); while (allTransports.hasNext()) { MessageTransport transpt = (MessageTransport) allTransports.next(); if (transpt.getProtocolName().equals(name)) { return transpt; } } return null; } private void addProtoToAdv(MessageTransport proto) { boolean relay = false; try { if (!(proto instanceof MessageReceiver)) { return; } // no value to publish for the router endpoint address if (proto instanceof EndpointRouter) { // register the corresponding group to relay connection events addActiveRelayListener(group); return; } // register this group to Relay connection events if (proto instanceof RelayClient) { relay = true; ((RelayClient) proto).addActiveRelayListener(group); } // get the list of addresses Iterator allAddresses = ((MessageReceiver) proto).getPublicAddresses(); Vector ea = new Vector(); while (allAddresses.hasNext()) { EndpointAddress anEndpointAddress = (EndpointAddress) allAddresses.next(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Adding endpoint address to route advertisement : " + anEndpointAddress); } ea.add(anEndpointAddress.toString()); } PeerAdvertisement padv = group.getPeerAdvertisement(); StructuredDocument myParam = (StructuredDocument) padv.getServiceParam(assignedID); RouteAdvertisement route = null; if (myParam != null) { Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType()); if( paramChilds.hasMoreElements()) { // we have an advertisement just add the new access points XMLElement param = (XMLElement) paramChilds.nextElement(); route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement( param); route.addDestEndpointAddresses(ea); if (relay) { // need to add the relay info if we have some Vector hops = ((RelayClient) proto).getActiveRelays(group); if ((hops != null) && !hops.isEmpty()) { route.setHops(hops); } } } } if( null == route ) { // None yet, so create a new Route Advertisement // create the RouteAdvertisement that will contain the route to // the peer. At this point we only know the peer endpoint addresses // no hops are known // create the destination access point AccessPointAdvertisement destAP = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); // we don't need to include the PeerID since it's already in // the PeerAdv just add the set of endpoints destAP.setEndpointAddresses(ea); // create the route advertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); route.setDest(destAP); if (relay) { // need to add the relay info if we have some Vector hops = ((RelayClient) proto).getActiveRelays(group); if ((hops != null) && !hops.isEmpty()) { route.setHops(hops); } } } // create the param route XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm"); XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8); StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc); padv.putServiceParam(assignedID, newParam); // publish the new advertisement DiscoveryService discovery = group.getDiscoveryService(); if (discovery != null) { discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION); } } catch (Exception ex) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Exception adding message transport ", ex); } } } private void clearProtoFromAdv(MessageTransport transpt) { try { if (!(transpt instanceof MessageReceiver)) { return; } // no value to publish the router endpoint address if (transpt instanceof EndpointRouter) { // register the corresponding group in the relay removeActiveRelayListener(group); return; } // register this group to Relay connection events if (transpt instanceof RelayClient) { ((RelayClient) transpt).removeActiveRelayListener(group); } Iterator allAddresses = ((MessageReceiver) transpt).getPublicAddresses(); Vector ea = new Vector(); while (allAddresses.hasNext()) { EndpointAddress anEndpointAddress = (EndpointAddress) allAddresses.next(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Removing endpoint address from route advertisement : " + anEndpointAddress); } ea.add(anEndpointAddress.toString()); } PeerAdvertisement padv = group.getPeerAdvertisement(); XMLDocument myParam = (XMLDocument) padv.getServiceParam(assignedID); if (myParam == null) { return; } Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType()); if ( !paramChilds.hasMoreElements()) { return; } XMLElement param = (XMLElement) paramChilds.nextElement(); RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param); route.removeDestEndpointAddresses(ea); // update the new route to a new parm structure. XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm"); XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8); StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc); // put the parms back. padv.putServiceParam(assignedID, newParam); // publish the new advertisement DiscoveryService discovery = group.getDiscoveryService(); if (discovery != null) { discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION); } } catch (Exception ex) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Exception removing messsage transport ", ex); } } } /** * {@inheritDoc} **/ public boolean addMessengerEventListener(MessengerEventListener listener, int prio) { int priority = prio; if (priority > EndpointService.HighPrecedence) { priority = EndpointService.HighPrecedence; } if (priority < EndpointService.LowPrecedence) { priority = EndpointService.LowPrecedence; } passiveMessengerListeners[priority].add(listener); return true; } /** * {@inheritDoc} **/ public boolean removeMessengerEventListener(MessengerEventListener listener, int prio) { int priority = prio; if (priority > EndpointService.HighPrecedence) { priority = EndpointService.HighPrecedence; } if (priority < EndpointService.LowPrecedence) { priority = EndpointService.LowPrecedence; } passiveMessengerListeners[priority].remove(listener); return true; } /** * {@inheritDoc} **/ public boolean addIncomingMessageListener(EndpointListener listener, String serviceName, String serviceParam) { if (null == listener) { throw new IllegalArgumentException("EndpointListener must be non-null"); } if (null == serviceName) { throw new IllegalArgumentException("serviceName must not be null"); } if (-1 != serviceName.indexOf('/')) { throw new IllegalArgumentException("serviceName may not contain '/' characters"); } String address = serviceName; if (null != serviceParam) { address += "/" + serviceParam; } synchronized (incomingMessageListeners) { if (incomingMessageListeners.containsKey(address)) { return false; } InboundMeter incomingMessageListenerMeter = null; if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) { incomingMessageListenerMeter = endpointServiceMonitor.getInboundMeter(serviceName, serviceParam); } if (!(listener instanceof QuotaIncomingMessageListener)) { listener = new QuotaIncomingMessageListener(address, listener, incomingMessageListenerMeter); } incomingMessageListeners.put(address, listener); } if (parentEndpoint != null) { if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { // The listener name is already re-written. // The listener is already a quota listener; we made extra sure of that before tucking it into our local map. parentEndpoint.addIncomingMessageListener(listener, serviceName, serviceParam); } else { parentEndpoint.addIncomingMessageListener(listener, myServiceName, address); } } return true; } /** * {@inheritDoc} **/ public EndpointListener removeIncomingMessageListener(String serviceName,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -