📄 endpointserviceimpl.java
字号:
// There is already a protocol of that name.
// If it comes from an ancestor endpoint, we may overload it.
EndpointProtocol old =
parentEndpoint.getEndpointProtocolByName(proto.getProtocolName());
if (old == null) {
// Nope, our parent never heard of it. It is one of ours.
// Throw an exception
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Duplicate endpoint protocol " +
proto.getProtocolName() +
" rejected.");
throw new IllegalArgumentException
("Duplicate endpoint protocol " +
proto.getProtocolName() +
" rejected.");
}
// If we're just replicating one of our parent's then this is
// not an overloading; it is simply the same object.
// Otherwise, it is up to the overloaded proto to let us do it
// or not.
if (old != proto) {
if (! old.allowOverLoad()) {
// Our parent has it but the protocol thinks it
// shouldn't be overloaded.
// Throw an exception
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Overload endpoint protocol "
+ proto.getProtocolName() + " rejected.");
throw new IllegalArgumentException
("Overload endpoint protocol " +
proto.getProtocolName() +
" rejected.");
}
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Protocol " + proto.getProtocolName() +
" hides a protocol of parent endpoint service.");
clearProtoFromAdv(old);
protocols.remove(old.getProtocolName());
}
}
// Work-around: when a protocol in a parent endp has been previously
// removed, its address is still hanging in our peeradv, so we'd better
// remove it preventively.
clearProtoFromAdv(proto);
protocols.put (proto.getProtocolName(), proto);
addProtoToAdv(proto);
}
/**
* Removes the given endpoint protocol from this endpoint service.
*
* protocols remove themselves from the list when stopped.
* If someone wants to remove a protocol from the list, it has to call
* stopApp explicitly if relevant. "If relevant" is not all that easy
* to figure out, since the same protocol object may be present in
* several groups. conclusion. Don't do it if you don't know what you're
* doing.
* FIXME: it would probably be a good idea to have reference counts in
* order to be resistant to naive programming.
*
* @param proto the protocol to be removed.
*/
public synchronized void removeEndpointProtocol(EndpointProtocol proto)
{
Object obj = protocols.remove (proto.getProtocolName());
if (obj != null) {
clearProtoFromAdv(proto);
}
}
/**
* Returns an enumeration of the endpoint protocols available to this
* endpoint service.
*
* @return Enumeration the enumeration.
*/
public Enumeration getEndpointProtocols() {
return protocols.elements();
}
/**
* Returns a new Message object suitable for use with this endpoint
* service.
*
* @return Message the new message.
*/
public Message newMessage() {
return new MessageImpl();
}
/**
* Builds an EndpointAddress out the the given URI string.
* The resulting EndpointAddress uniquely identifies a message listener
* at a given network address.
*
* @param Uri the uri. The structure of the Uri is as follows:
* protocol://address/[serviceName][/serviceParam]
*
*/
public EndpointAddress newEndpointAddress(String Uri) {
return new Address(Uri);
}
/**
* Builds and returns an EndpointMessager that may be used to send
* messages via this endpoint to the given destination in the given
* mode.
*
* @param addr the destination address. This address specifies an
* endpoint protocol, the address of a peer by that endpoint protocol, and
* a serviceName and serviceParam, the concatenation of which designates
* uniquely the listener to which the messages must be delivered on arrival.
* (see newEndpointAddress).
* @return EndpointMessenger the messenger.
*/
public EndpointMessenger getMessenger(EndpointAddress addr)
throws IOException {
EndpointProtocol proto =
getEndpointProtocolByName(addr.getProtocolName());
if (proto == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Could not get Protocol for name " + addr.getProtocolName() );
throw new IOException( "Could not get Protocol for name " + addr.getProtocolName() );
}
return proto.getMessenger(addr);
}
/**
* Propagates the given message through all the endpoint protocols that
* are available to this endpoint. Some or all of these endpoint
* protocols may silently drop the message. Each protocol may interpret
* the resquest for propagation differenly. The endpointService does not
* define which destinations the message will actually reach.
*
* The concatenation of the serviceName and serviceParam arguments
* uniquely designates the listener to which the message must be delivered
* on arrival.
*
* Note: the supplied message is not modified and may be re-used upon
* return.
*
* @param srcMsg the message to be propagated.
* @param serviceName a destination service name
* @param serviceParam a destination queue name within that service
* @param prunePeer specifies the PeerID of a peer that needs not
* receive the message. (This is just a hint).
*/
public void propagate(Message srcMsg,
String serviceName,
String serviceParam)
throws IOException
{
// FIXME 20010910 bondolo@jxta.org we need to figure out how to enable this
/*
if( null == ((MessageImpl)srcMsg).getSrcAddress() ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "null src address on message" );
throw new IllegalArgumentException( "null src address on message" );
}
*/
// First, synchronously process existing Filters.
// if processFilters retuns null, the message is to be discarded.
/* srcMsg = processFilters (srcMsg, srcAddress, dstAddress, false);
if (srcMsg == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" message discarded upon filter decision");
return;
}
*/
EndpointProtocol proto = null;
MessageImpl m = null;
Enumeration enum = protocols.elements();
while (enum.hasMoreElements()) {
try {
proto = (EndpointProtocol) enum.nextElement();
m = (MessageImpl) srcMsg.clone();
// FIXME 20010906 bondolo@jxta.org We need to figure out what
// these should be set to
//m.setDestAddress(((MessageImpl) srcMsg).getDestAddress());
//m.setSrcAddress(((MessageImpl) srcMsg).getSrcAddress());
// Add our header.
m.setString(EndpointHeaderSrcPeer, localPeerId);
srcMsg = processFilters (srcMsg,
proto.getPublicAddress (),
null,
false);
if (srcMsg == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" message discarded upon filter decision");
return;
}
proto.propagate(m,
serviceName,
serviceParam,
null);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Failed to send on endpoint protocol. ", e);
continue;
}
}
}
synchronized private void addListenerSync(String address,
EndpointListener listener)
throws IllegalArgumentException
{
if (listeners.contains(address))
throw new IllegalArgumentException ("listener already present " +
"for address " + address);
listeners.put(address, listener);
}
/**
* Registers an incoming messages listener.
* Each incoming message addressed to the queue specified by
* the given address will be passed to the given listener.
* The address is usually formed by concatenating the the name of the
* invoking service and a parameter unique to that service accross all
* groups (the group ID is normally included for that purpose).
*
* For a message to match this address, it must have been sent through
* an EndpointMessenger obtain by providing an EndpointAddress constructed
* with matching serviceName and serviceParam (see newEndpointAddress and
* getMessenger), or by invoking propagate with matching serviceName
* and serviceParam.
*
* If a listener is already registered with the given address, an
* IllegalArgumentException is thrown.
*
* @param address a queue name, unique accross all groups on this peer,
* @param listener a listener for these messages.
*/
public void addListener(String address, EndpointListener listener)
throws IllegalArgumentException
{
if (parentEndpoint != null)
parentEndpoint.addListener(address, listener);
// If parent went through, then let's do it here.
addListenerSync(address, new EndpointDemuxListener(address, listener));
}
// We do not trust services all that much; here, a service of a bad
// group could screw-up the endpoint of a well behaved ancestor.
private synchronized
boolean removeListenerSync(String address,
EndpointListener listener) {
// Can remove and put back; we're synchonized.
EndpointDemuxListener h = (EndpointDemuxListener) listeners.remove(address);
if (h == null) {
return false;
}
if (listener == null) return false;
if (h.getListener() == listener) {
h.unregistered();
return true;
}
// Removed the wrong one; put it back !
listeners.put(address, h);
return false;
}
/**
* Removes the given listener previously registered under the given
* address.
*
* @return boolean true is there was such a registration, false otherwise.
*/
public boolean removeListener(String address, EndpointListener listener) {
if (removeListenerSync(address, listener) == false) return false;
if (parentEndpoint == null) return true;
return parentEndpoint.removeListener(address, listener);
}
private synchronized EndpointListener lookupListener(String address) {
return (EndpointListener) listeners.get(address);
}
/**
* This is the Filter Listener mechanism
*
* Use full name or name space for registration. Messages can contain elements belonging
* to multiple name spaces, and single name space can have multiple elements.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -