📄 endpointserviceimpl.java
字号:
/**
* This is the Filter Listener mechanism
*
* Use full name or name space for registration. Messages can contain elements belonging
* to multiple name spaces, and single name space can have multiple elements.
*/
synchronized private
void addFilterListenerSync(String name,
EndpointFilterListener listener,
boolean incoming)
throws IllegalArgumentException
{
Hashtable filterListeners =
incoming ? incomingFilterListeners : outgoingFilterListeners;
if (filterListeners.contains(name))
throw new IllegalArgumentException("filter istener for " +
name + " is already present");
filterListeners.put(name, listener);
}
/**
* Registers an incoming messages filter listener.
* Each incoming message which contains an element with the specified name
* space will be passed to the given listener.
*
* If a listener is already registered with the given address, or the name is
* malformed, then an IOException is thrown.
*
* @param name The name is either the name space to which message element belongs
* nameSpace.length() > 0, or the full element name which is of the form
* nameSpace + ":" + name, name.length() > 0.
* @param listener a listener for these messages.
*/
public void addFilterListener(String name,
EndpointFilterListener listener,
boolean incoming)
throws IllegalArgumentException
{
int index = name.indexOf(':');
if (index == -1) { // should be a namespace
addFilterListenerNameSpace(name, listener, index, incoming);
} else { // should be a full name
addFilterListenerFullName(name, listener, index, incoming);
}
}
/**
* Registers an incoming messages filter listener.
* Each incoming message addressed to the queue containing a message element
* which has a name associated to a filter listener will be passed to the
* the given listener.
*
* If a filter listener is already registered with the given message element name
* IllegalArgumentException is thrown.
*
* @param nameSpace The name space to which message element belongs
* @param listener a filter listener for these messages.
* @param index index of ":" in nameSpace
*/
private void addFilterListenerNameSpace(String nameSpace,
EndpointFilterListener listener,
int index,
boolean incoming)
throws IllegalArgumentException
{
if (index == 0) {
throw new IllegalArgumentException("Illegal fullName = \"\"");
}
if (parentEndpoint != null)
parentEndpoint.addFilterListener(nameSpace, listener, incoming);
// If parent went through, then let's do it here.
addFilterListenerSync(nameSpace, listener, incoming);
}
/**
* Registers an incoming messages filter listener.
* Each incoming message addressed to the queue containing a message element
* which has a name associated to a filter listener will be passed to the
* the given listener.
*
* If a filter listener is already registered with the given message element name
* IllegalArgumentException is thrown.
*
* @param fullName The full element name (namespace + ":" + name)
* @param listener a filter listener for these messages.
* @param index index of ":" in fullName
*/
private void addFilterListenerFullName(String fullName,
EndpointFilterListener listener,
int index,
boolean incoming)
throws IllegalArgumentException
{
// validate fullName (must contain be of the form: namespace + ":" + name,
// namespace.length() > 0 && name.length() > 0 is required. Otherwise,
// fullName == ":" or fullName == "xx..x:", (index + 1) == "xx..x:".length()
if (index == 0 || fullName.length() == (index + 1)) {
throw new IllegalArgumentException("Illegal fullName: " +
fullName);
}
if (parentEndpoint != null)
parentEndpoint.addFilterListener(fullName, listener, incoming);
// If parent went through, then let's do it here.
addFilterListenerSync(fullName, listener, incoming);
}
private synchronized
void removeFilterListenerSync(String name,
EndpointFilterListener listener,
boolean incoming)
{
Hashtable filterListeners =
incoming ? incomingFilterListeners : outgoingFilterListeners;
if (filterListeners.get (name) == listener)
filterListeners.remove (name);
}
/**
* Removes the given listener previously registered under the given
* full name or name space.
*/
public void removeFilterListener(String name,
EndpointFilterListener listener,
boolean incoming)
{
removeFilterListenerSync (name, listener, incoming);
if (parentEndpoint != null)
parentEndpoint.removeFilterListener(name, listener, incoming);
}
private synchronized
EndpointFilterListener lookupFilterListener(String name,
boolean incoming)
{
if (incoming)
return (EndpointFilterListener) incomingFilterListeners.get (name);
else
return (EndpointFilterListener) outgoingFilterListeners.get(name);
}
// Invoke filter listeners
private Message processFilters(Message msg,
EndpointAddress srcAddress,
EndpointAddress dstAddress,
boolean incoming) {
// This enumeration is in LIFO order
StringEnumeration elements = msg.getNames();
if ((elements == null) || (!elements.hasMoreElements())) {
// Empty message. Nothing to do.
return msg;
}
while (elements.hasMoreElements()) {
try {
String fullName = elements.nextString();
// Try elementName (fullName first)
String invokee = fullName;
EndpointFilterListener listener =
lookupFilterListener(fullName, incoming);
if (listener == null) {
// break out name space from the full name
// names[0] == name space, names[1] == name
String[] names = MessageElement.parseName(fullName);
listener = lookupFilterListener (names[0], incoming);
invokee = names[0];
}
if (listener != null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" invoking filter listener: " + invokee);
msg = listener.processIncomingMessage (msg,
srcAddress,
dstAddress);
if (msg == null) {
// Filter has discarded the message. Drop it here as well.
return null;
}
}
} catch (Exception e) {
// Something wrong happened, it's probably safer to discard the message.
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn ("process filter failed with " + e);
}
return null;
}
}
// If we got here, no filter has rejected the message. Keep processing it.
return msg;
}
/**
* (Privileged. One must have a handle to the implementation. The interface
* officialy includes this but the interface object's method does nothing
* only endpoint protocols are given the implementation object.)
*
* Handles the given incoming message by calling the listener specified
* by its destination as returned by the getDestAddress() method of the
* message.
*
* If the message cannot be delivered an IOException is thrown.
*
* @param msg The message to be delivered.
*/
public void demux(Message msg) throws IOException {
EndpointAddress dstAddress = msg.getDestinationAddress();
if (dstAddress == null) {
// No destination address... Just discard
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("demux: no destination address. Discard");
}
return;
}
EndpointAddress srcAddress = msg.getSourceAddress();
String srcPeer = null;
InputStream ip = null;
srcPeer = msg.getString( EndpointHeaderSrcPeer );
if ((srcPeer != null) && srcPeer.equals(localPeerId)) {
// This is a loopback. Discard.
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" discard loopback");
return;
}
// If the filters have not yet been processed for this message, do it
// now.
// If processFilters retuns null, the message is to be discarded.
if (!((MessageImpl) msg).filtered) {
((MessageImpl) msg).filtered = true;
msg = processFilters (msg, srcAddress, dstAddress, true);
if (msg == null) {
return;
}
}
// Find the appropriate destination of this message
String serviceName = dstAddress.getServiceName();
String serviceParam = dstAddress.getServiceParameter();
if (serviceName == null) serviceName = "";
if (serviceParam == null) serviceParam = "";
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("serviceName = " + serviceName);
LOG.debug("serviceParam = " + serviceParam);
}
EndpointListener h = lookupListener(serviceName + serviceParam);
if (h == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("demux: no listener for this kind of message: ");
}
return; // noone cares
}
h.processIncomingMessage(msg, srcAddress, dstAddress);
}
/**
* Returns the endpoint protocol registered under the given name.
*
* @return EndpointProtocol the protocol if found. null otherwise.
*/
public EndpointProtocol getEndpointProtocolByName(String name) {
return (EndpointProtocol) protocols.get(name);
}
/**
* Verifies that the given address can be reached.
* The verification is performed by the endpoint protocol designated
* by the given address, as returned by the getProtocolName() method
* of this address.
*
* The method, and accuracy of the verification depends upon each endpoint
* protocol.
*
* @return boolean true if the address can be reached. False otherwise.
*/
public boolean ping(EndpointAddress addr) {
if (addr == null) {
return false;
}
try {
String protoName = addr.getProtocolName();
EndpointProtocol proto = getEndpointProtocolByName(protoName);
return proto.ping(addr);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug ("Exeption while trying to ping " + addr + ": " + e);
}
return false;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -