📄 endpointserviceimpl.java
字号:
EndpointAddress msgScrAddress = new EndpointAddress(srcAddressElement.toString()); processIncomingMessage(msg, msgScrAddress, dstAddress); } /** * {@inheritDoc} */ public MessengerEventListener addMessageTransport(MessageTransport transpt) { synchronized (messageTransports) { // check if it is already installed. if (!messageTransports.contains(transpt)) { clearProtoFromAdv(transpt); // just to be safe messageTransports.add(transpt); addProtoToAdv(transpt); // FIXME: For now, we return this. Later we might return something else, so that we can take // advantage of the fact that we know that the event is from a local transport. // That will help cleaning up the incoming messenger mess. return this; } } return null; } /** * {@inheritDoc} */ public boolean removeMessageTransport(MessageTransport transpt) { boolean removed; synchronized (messageTransports) { removed = messageTransports.remove(transpt); } if (removed) { clearProtoFromAdv(transpt); } return removed; } /** * {@inheritDoc} */ public Iterator<MessageTransport> getAllMessageTransports() { if (null != parentEndpoint) { return new SequenceIterator(getAllLocalTransports(), parentEndpoint.getAllMessageTransports()); } else { return getAllLocalTransports(); } } /** * {@inheritDoc} */ public MessageTransport getMessageTransport(String name) { Iterator<MessageTransport> allTransports = getAllMessageTransports(); while (allTransports.hasNext()) { MessageTransport transpt = allTransports.next(); if (transpt.getProtocolName().equals(name)) { return transpt; } } return null; } private void addProtoToAdv(MessageTransport proto) { boolean relay = false; try { if (!(proto instanceof MessageReceiver)) { return; } // no value to publish for the router endpoint address if (proto instanceof EndpointRouter) { // register the corresponding group to relay connection events addActiveRelayListener(group); return; } // register this group to Relay connection events if (proto instanceof RelayClient) { relay = true; ((RelayClient) proto).addActiveRelayListener(group); } // get the list of addresses Iterator<EndpointAddress> allAddresses = ((MessageReceiver) proto).getPublicAddresses(); Vector<String> ea = new Vector<String>(); while (allAddresses.hasNext()) { EndpointAddress anEndpointAddress = allAddresses.next(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Adding endpoint address to route advertisement : " + anEndpointAddress); } ea.add(anEndpointAddress.toString()); } PeerAdvertisement padv = group.getPeerAdvertisement(); StructuredDocument myParam = padv.getServiceParam(assignedID); RouteAdvertisement route = null; if (myParam != null) { Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType()); if (paramChilds.hasMoreElements()) { // we have an advertisement just add the new access points XMLElement param = (XMLElement) paramChilds.nextElement(); route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param); route.addDestEndpointAddresses(ea); if (relay) { // need to add the relay info if we have some Vector<AccessPointAdvertisement> hops = ((RelayClient) proto).getActiveRelays(group); if (!hops.isEmpty()) { route.setHops(hops); } } } } if (null == route) { // None yet, so create a new Route Advertisement // create the RouteAdvertisement that will contain the route to // the peer. At this point we only know the peer endpoint addresses // no hops are known // create the destination access point AccessPointAdvertisement destAP = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement( AccessPointAdvertisement.getAdvertisementType()); destAP.setPeerID(group.getPeerID()); destAP.setEndpointAddresses(ea); // create the route advertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); route.setDest(destAP); if (relay) { // need to add the relay info if we have some Vector<AccessPointAdvertisement> hops = ((RelayClient) proto).getActiveRelays(group); if (!hops.isEmpty()) { route.setHops(hops); } } } // create the param route XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm"); XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8); StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc); padv.putServiceParam(assignedID, newParam); // publish the new advertisement DiscoveryService discovery = group.getDiscoveryService(); if (discovery != null) { discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION); } } catch (Exception ex) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Exception adding message transport ", ex); } } } private void clearProtoFromAdv(MessageTransport transpt) { try { if (!(transpt instanceof MessageReceiver)) { return; } // no value to publish the router endpoint address if (transpt instanceof EndpointRouter) { // register the corresponding group in the relay removeActiveRelayListener(group); return; } // register this group to Relay connection events if (transpt instanceof RelayClient) { ((RelayClient) transpt).removeActiveRelayListener(group); } Iterator<EndpointAddress> allAddresses = ((MessageReceiver) transpt).getPublicAddresses(); Vector<String> ea = new Vector<String>(); while (allAddresses.hasNext()) { EndpointAddress anEndpointAddress = allAddresses.next(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Removing endpoint address from route advertisement : " + anEndpointAddress); } ea.add(anEndpointAddress.toString()); } PeerAdvertisement padv = group.getPeerAdvertisement(); XMLDocument myParam = (XMLDocument) padv.getServiceParam(assignedID); if (myParam == null) { return; } Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType()); if (!paramChilds.hasMoreElements()) { return; } XMLElement param = (XMLElement) paramChilds.nextElement(); RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param); route.removeDestEndpointAddresses(ea); // update the new route to a new parm structure. XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm"); XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8); StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc); // put the parms back. padv.putServiceParam(assignedID, newParam); // publish the new advertisement DiscoveryService discovery = group.getDiscoveryService(); if (discovery != null) { discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION); } } catch (Exception ex) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Exception removing messsage transport ", ex); } } } /** * {@inheritDoc} */ public boolean addMessengerEventListener(MessengerEventListener listener, int prio) { int priority = prio; if (priority > EndpointService.HighPrecedence) { priority = EndpointService.HighPrecedence; } if (priority < EndpointService.LowPrecedence) { priority = EndpointService.LowPrecedence; } return passiveMessengerListeners[priority].add(listener); } /** * {@inheritDoc} */ public boolean removeMessengerEventListener(MessengerEventListener listener, int prio) { int priority = prio; if (priority > EndpointService.HighPrecedence) { priority = EndpointService.HighPrecedence; } if (priority < EndpointService.LowPrecedence) { priority = EndpointService.LowPrecedence; } return passiveMessengerListeners[priority].remove(listener); } /** * {@inheritDoc} */ public boolean addIncomingMessageListener(EndpointListener listener, String serviceName, String serviceParam) { if (null == listener) { throw new IllegalArgumentException("EndpointListener must be non-null"); } if (null == serviceName) { throw new IllegalArgumentException("serviceName must not be null"); } if (-1 != serviceName.indexOf('/')) { throw new IllegalArgumentException("serviceName may not contain '/' characters"); } String address = serviceName; if (null != serviceParam) { address += "/" + serviceParam; } synchronized (incomingMessageListeners) { if (incomingMessageListeners.containsKey(address)) { return false; } InboundMeter incomingMessageListenerMeter = null; if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) { incomingMessageListenerMeter = endpointServiceMonitor.getInboundMeter(serviceName, serviceParam); } incomingMessageListeners.put(address, listener); } if (parentEndpoint != null) { if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { // The listener name is already re-written. // The listener is already a quota listener; we made extra sure of that before tucking it into our local map. parentEndpoint.addIncomingMessageListener(listener, serviceName, serviceParam); } else { parentEndpoint.addIncomingMessageListener(listener, myServiceName, address); } } return true; } /** * {@inheritDoc} */ public EndpointListener getIncomingMessageListener(String serviceName, String serviceParam) { if (null == serviceName) { throw new IllegalArgumentException("serviceName must not be null"); } EndpointListener listener = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -