📄 endpointserviceimpl.java
字号:
if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics.numFilteredOut++; } break; } sender.propagate((Message) filtered.clone(), serviceName, serviceParam, null); if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics.numPropagatedTo++; } } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed propagating message "+filtered+ " on message transport " + aTransport, e); } if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics.numErrorsPropagated++; } continue; } } } /** * {@inheritDoc} **/ public void propagate(Message srcMsg, String serviceName, String serviceParam) throws IOException { long startPropagationTime = 0; Metrics metrics = new Metrics(); // Keep the orig unchanged for metering reference and caller's benefit, but // we are forced to clone it here, because we add a header. Message myMsg = (Message) srcMsg.clone(); if (EndpointMeterBuildSettings.ENDPOINT_METERING) { startPropagationTime = System.currentTimeMillis(); } // Add our header. MessageElement srcHdrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME, localPeerId, (MessageElement) null); myMsg.replaceMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, srcHdrElement); // Do the local transports with the plain address. Iterator eachProto = getAllLocalTransports(); propagateThroughAll(eachProto, (Message) myMsg.clone(), serviceName, serviceParam, metrics); // Do the parent transports with a mangled address. if (parentEndpoint != null) { eachProto = parentEndpoint.getAllMessageTransports(); //FIXME what happens when service name, and/or param are null propagateThroughAll(eachProto, (Message) myMsg.clone(), myServiceName, serviceName + "/" + serviceParam, metrics); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) { PropagationMeter propagationMeter = endpointServiceMonitor.getPropagationMeter(serviceName, serviceParam); propagationMeter.registerPropagateMessageStats(metrics.numPropagatedTo, metrics.numFilteredOut, metrics.numErrorsPropagated, System.currentTimeMillis() - startPropagationTime); } } /** * Process the filters for this message. **/ private Message processFilters(Message message, EndpointAddress srcAddress, EndpointAddress dstAddress, boolean incoming) { Iterator eachFilter = incoming ? incomingFilterListeners.iterator() : outgoingFilterListeners.iterator(); while (eachFilter.hasNext()) { FilterListenerAndMask aFilter = (FilterListenerAndMask) eachFilter.next(); Message.ElementIterator eachElement = message.getMessageElements(); while (eachElement.hasNext()) { MessageElement anElement = (MessageElement) eachElement.next(); if ((null != aFilter.namespace) && (!aFilter.namespace.equals(eachElement.getNamespace()))) { continue; } if ((null != aFilter.name) && (!aFilter.name.equals(anElement.getElementName()))) { continue; } message = aFilter.listener.filterMessage(message, srcAddress, dstAddress); if (null == message) { return null; } } } // If we got here, no filter has rejected the message. Keep processing it. return message; } private static EndpointAddress demangleAddress(EndpointAddress mangled) { String serviceName = mangled.getServiceName(); if (!serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { // not a mangled address return mangled; } String serviceParam = mangled.getServiceParameter(); if (null == serviceParam) { // it has no param, its a null destination. return new EndpointAddress(mangled, null, null); } int slashAt = serviceParam.indexOf('/'); if (-1 == slashAt) { // param has no param portion. return new EndpointAddress(mangled, serviceParam, null); } return new EndpointAddress(mangled, serviceParam.substring(0, slashAt), serviceParam.substring(slashAt + 1)); } /** * {@inheritDoc} **/ public void processIncomingMessage(Message msg, EndpointAddress srcAddress, EndpointAddress dstAddress) { // check for propagate loopback. MessageElement srcPeerElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME); if (null != srcPeerElement) { msg.removeMessageElement(srcPeerElement); String srcPeer = srcPeerElement.toString(); if (localPeerId.equals(srcPeer)) { // This is a loopback. Discard. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( msg + " is a propagate loopback. Discarded"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.discardedLoopbackDemuxMessage(); } return; } } if (null == dstAddress) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("null destination address, discarding message " + msg.toString()); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.invalidIncomingMessage(); } return; } // Decode the destination address. // We want: // 1 - a version of the address that does not have the grp redirection. // 2 - a version of the serviceName that includes BOTH the group redirection and the original service name. // 3 - the original service param; without the original service name stuck to it. // So, basically we want the original serviceName part stuck to the group mangling, not stuck to the original // serviceParam. We do that by cut/pasting from both the mangled and demangled versions of the address. EndpointAddress demangledAddress = demangleAddress(dstAddress); String decodedServiceName = demangledAddress.getServiceName(); String decodedServiceParam = demangledAddress.getServiceParameter(); // Do filters for this message: // FIXME - jice 20040417 : filters are likely broken, now. They do not see messages // from xports in parent groups. For those messages that are seen, demangled address seems to be the useful one. msg = processFilters(msg, srcAddress, demangledAddress, true); // If processFilters retuns null, the message is to be discarded. if (msg == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Message discarded during filter processing"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.incomingMessageFilteredOut(); } return; } if ((null == decodedServiceName) || (0 == decodedServiceName.length())) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("dest serviceName must not be null, discarding message " + msg); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.invalidIncomingMessage(); } return; } // Now that we know the original service name is valid, finish building the decoded version. if (demangledAddress != dstAddress) { decodedServiceName = dstAddress.getServiceName() + "/" + decodedServiceName; } // First, try the regular destination EndpointListener h = null; if (null != decodedServiceParam) { h = (EndpointListener) incomingMessageListeners.get(decodedServiceName + "/" + decodedServiceParam); } // Didn't find it with param, maybe there is a generic listener for the service if (h == null) { h = (EndpointListener) incomingMessageListeners.get(decodedServiceName); } // Didn't find it still, try the compatibility name. if (h == null) { h = (EndpointListener) incomingMessageListeners.get(decodedServiceName + decodedServiceParam); } // Still no listener? oh well. if (h == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "No listener for '" + dstAddress + "' in group " + group + "\ndecodedServiceName :" + decodedServiceName + "\tdecodedServiceParam :" + decodedServiceParam); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.noListenerForIncomingMessage(); } return; // noone cares for this message } // call the listener try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Calling listener '" + dstAddress + "' for " + msg.toString()); } h.processIncomingMessage(msg, srcAddress, demangledAddress); if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.incomingMessageSentToEndpointListener(); } } catch (Throwable all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Uncaught throwable from listener for " + dstAddress, all); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.errorProcessingIncomingMessage(); } } } /** * Handles the given incoming message by calling the listener specified * by its destination as returned by the getDestAddress() method of the * message. * * @param msg The message to be delivered. */ public void demux(Message msg) { // Get the message destination MessageElement dstAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, EndpointServiceImpl.MESSAGE_DESTINATION_NAME); if (null == dstAddressElement) { // No destination address... Just discard if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( msg + " has no destination address. Discarded"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.noDestinationAddressForDemuxMessage(); } return; } msg.removeMessageElement(dstAddressElement); EndpointAddress dstAddress = new EndpointAddress(dstAddressElement.toString()); // Get the message source MessageElement srcAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME); if (null == srcAddressElement) { // No destination address... Just discard if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( msg + " has no source address. Discarded"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.noSourceAddressForDemuxMessage(); } return; } msg.removeMessageElement(srcAddressElement); EndpointAddress msgScrAddress = new EndpointAddress(srcAddressElement.toString()); processIncomingMessage(msg, msgScrAddress, dstAddress); if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.demuxMessageProcessed(); } } /** * {@inheritDoc} * * @deprecated Try and get a messenger instead **/ public boolean ping(EndpointAddress addr) { return true; } /** * {@inheritDoc} **/ public MessengerEventListener addMessageTransport(MessageTransport transpt) { synchronized (messageTransports) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -