📄 endpointserviceimpl.java
字号:
} if (null == filtered) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(" message " + myMsg + " discarded upon filter decision"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics.numFilteredOut++; } break; } propagater.propagate(filtered.clone(), serviceName, serviceParam, initialTTL); if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics.numPropagatedTo++; } } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed propagating message " + filtered + " on message transport " + aTransport, e); } if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics.numErrorsPropagated++; } } } } /** * {@inheritDoc} */ public void propagate(Message msg, String serviceName, String serviceParam) { propagate(msg, serviceName, serviceParam, Integer.MAX_VALUE); } /** * {@inheritDoc} */ public void propagate(Message msg, String serviceName, String serviceParam, int initialTTL) { long startPropagationTime = 0; if (null == serviceName) { throw new IllegalArgumentException("serviceName may not be null"); } Metrics metrics = null; if (EndpointMeterBuildSettings.ENDPOINT_METERING) { metrics = new Metrics(); } // Keep the orig unchanged for metering reference and caller's benefit, but // we are forced to clone it here, because we add a header. msg = msg.clone(); if (EndpointMeterBuildSettings.ENDPOINT_METERING) { startPropagationTime = System.currentTimeMillis(); } // Add our header. MessageElement srcHdrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME, localPeerId, null); msg.replaceMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, srcHdrElement); // Do the local transports with the plain address. Iterator<MessageTransport> eachProto = getAllLocalTransports(); propagateThroughAll(eachProto, msg.clone(), serviceName, serviceParam, initialTTL, metrics); // Do the parent transports with a mangled address. if (parentEndpoint != null) { eachProto = parentEndpoint.getAllMessageTransports(); StringBuilder mangled = new StringBuilder(serviceName); if (null != serviceParam) { mangled.append('/'); mangled.append(serviceParam); } propagateThroughAll(eachProto, msg.clone(), myServiceName, mangled.toString(), initialTTL, metrics); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) { PropagationMeter propagationMeter = endpointServiceMonitor.getPropagationMeter(serviceName, serviceParam); propagationMeter.registerPropagateMessageStats(metrics.numPropagatedTo, metrics.numFilteredOut, metrics.numErrorsPropagated, System.currentTimeMillis() - startPropagationTime); } } /** * Process the filters for this message. */ private Message processFilters(Message message, EndpointAddress srcAddress, EndpointAddress dstAddress, boolean incoming) { Iterator<FilterListenerAndMask> eachFilter = incoming ? incomingFilterListeners.iterator() : outgoingFilterListeners.iterator(); while (eachFilter.hasNext()) { FilterListenerAndMask aFilter = eachFilter.next(); Message.ElementIterator eachElement = message.getMessageElements(); while (eachElement.hasNext()) { MessageElement anElement = eachElement.next(); if ((null != aFilter.namespace) && (!aFilter.namespace.equals(eachElement.getNamespace()))) { continue; } if ((null != aFilter.name) && (!aFilter.name.equals(anElement.getElementName()))) { continue; } message = aFilter.listener.filterMessage(message, srcAddress, dstAddress); if (null == message) { return null; } } } // If we got here, no filter has rejected the message. Keep processing it. return message; } private static EndpointAddress demangleAddress(EndpointAddress mangled) { String serviceName = mangled.getServiceName(); if (null == serviceName) { // not a mangled address return mangled; } if (!serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) { // not a mangled address return mangled; } String serviceParam = mangled.getServiceParameter(); if (null == serviceParam) { // it has no param, its a null destination. // XXX bondolo 20050907 I'm not sure this is correct. return new EndpointAddress(mangled, null, null); } int slashAt = serviceParam.indexOf('/'); if (-1 == slashAt) { // param has no param portion. return new EndpointAddress(mangled, serviceParam, null); } return new EndpointAddress(mangled, serviceParam.substring(0, slashAt), serviceParam.substring(slashAt + 1)); } /** * {@inheritDoc} */ public void processIncomingMessage(Message msg, EndpointAddress srcAddress, EndpointAddress dstAddress) { // check for propagate loopback. MessageElement srcPeerElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME); if (null != srcPeerElement) { msg.removeMessageElement(srcPeerElement); String srcPeer = srcPeerElement.toString(); if (localPeerId.equals(srcPeer)) { // This is a loopback. Discard. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(msg + " is a propagate loopback. Discarded"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.discardedLoopbackDemuxMessage(); } return; } } if (null == srcAddress) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("null src address, discarding message " + msg); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.invalidIncomingMessage(); } return; } if (null == dstAddress) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("null destination address, discarding message " + msg); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.invalidIncomingMessage(); } return; } // Decode the destination address. // We want: // 1 - a version of the address that does not have the grp redirection. // 2 - a version of the serviceName that includes BOTH the group redirection and the original service name. // 3 - the original service param; without the original service name stuck to it. // So, basically we want the original serviceName part stuck to the group mangling, not stuck to the original // serviceParam. We do that by cut/pasting from both the mangled and demangled versions of the address. EndpointAddress demangledAddress = demangleAddress(dstAddress); String decodedServiceName = demangledAddress.getServiceName(); String decodedServiceParam = demangledAddress.getServiceParameter(); if ((null == decodedServiceName) || (0 == decodedServiceName.length())) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("dest serviceName must not be null, discarding message " + msg); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.invalidIncomingMessage(); } return; } // Do filters for this message: // FIXME - jice 20040417 : filters are likely broken, now. They do not see messages // from xports in parent groups. For those messages that are seen, demangled address seems to be the useful one. msg = processFilters(msg, srcAddress, demangledAddress, true); // If processFilters retuns null, the message is to be discarded. if (msg == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Message discarded during filter processing"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.incomingMessageFilteredOut(); } return; } // Now that we know the original service name is valid, finish building the decoded version. if (demangledAddress != dstAddress) { decodedServiceName = dstAddress.getServiceName() + "/" + decodedServiceName; } // Look up the listener EndpointListener listener = getIncomingMessageListener(decodedServiceName, decodedServiceParam); // No listener? oh well. if (listener == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("No listener for \'" + dstAddress + "\' in group " + group + "\n\tdecodedServiceName :" + decodedServiceName + "\tdecodedServiceParam :" + decodedServiceParam); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.noListenerForIncomingMessage(); } return; // noone cares for this message } // call the listener try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { if (null != decodedServiceParam) { LOG.fine("Calling listener for \'" + decodedServiceName + "/" + decodedServiceParam + "\' with " + msg); } else { LOG.fine("Calling listener for \'" + decodedServiceName + "\' with " + msg); } } listener.processIncomingMessage(msg, srcAddress, demangledAddress); if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.incomingMessageSentToEndpointListener(); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.demuxMessageProcessed(); } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught throwable from listener for " + dstAddress, all); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.errorProcessingIncomingMessage(); } } } /** * {@inheritDoc} */ public void demux(Message msg) { // Get the message destination MessageElement dstAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, EndpointServiceImpl.MESSAGE_DESTINATION_NAME); if (null == dstAddressElement) { // No destination address... Just discard if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning(msg + " has no destination address. Discarded"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.noDestinationAddressForDemuxMessage(); } return; } msg.removeMessageElement(dstAddressElement); EndpointAddress dstAddress = new EndpointAddress(dstAddressElement.toString()); // Get the message source MessageElement srcAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME); if (null == srcAddressElement) { // No src address... Just discard if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning(msg + " has no source address. Discarded"); } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) { endpointMeter.noSourceAddressForDemuxMessage(); } return; } msg.removeMessageElement(srcAddressElement);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -