📄 resolverserviceimpl.java
字号:
*/
public synchronized QueryHandler unregisterHandler( String name )
{
return (QueryHandler) handlers.remove(name);
}
/**
* gets the handler registered under the given name.
* @param name Handler name
* @return ResolveHandler
*/
public QueryHandler getHandler(String name) {
return (QueryHandler) handlers.get(name);
}
/**
* try getting a response for our query
* @param address
* @param query
* @throws RuntimeException
**/
public void sendQuery(String rdvPeer,
ResolverQueryMsg query)
{
if (rendezvous == null)
return;
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("sending query");
ResolverResponse doc=null;
Message propagateMsg = endpoint.newMessage();
if (rdvPeer == null)
{
try
{
propagateMsg.addElement(
propagateMsg.newMessageElement(outQueName,
new MimeMediaType ("text/xml"),
(InputStream)((Document)
(query.getDocument(new MimeMediaType("text/xml")))).getStream()));
rendezvous.propagateInGroup(propagateMsg,
handlerName,
outQueName,
7,
null);
}
catch (Exception e)
{
if (LOG.isEnabledFor(Priority.INFO))
LOG.info( "Error during propagate" ,e);
throw new RuntimeException ("Error during propagate :"
+e.toString());
}
}
else
{
//unicast instead
try
{
respond (rdvPeer,
handlerName,
outQueName,
outQueName,
((Document)
(query.getDocument(new MimeMediaType("text/xml")))).getStream());
}
catch (Exception e)
{
if (LOG.isEnabledFor(Priority.INFO))
LOG.info( "Error while unicasting query :", e );
throw new RuntimeException ("Error while unicasting query :"
+e.toString());
}
}
}
/*
* Returns the group to which this service is attached.
* @return PeerGroup the group
*/
public PeerGroup getGroup () {
return((PeerGroup)myGroup);
}
private void propagateQuery (Message msg) {
if (rendezvous == null)
return;
if (!myGroup.isRendezvous())
{
// We are not a Rendez vous peer. Just drop the message.
return;
}
// We are a Rendez vous. Re-propagate the message.
// Loop and TTL control is done in demux and propagate(). The TTL 7
// below is just a default it will be reduced appropriately
try
{
rendezvous.propagateInGroup(msg,
handlerName,
outQueName,
7,
localPeerId);
}
catch(IOException e)
{
if (LOG.isEnabledFor(Priority.DEBUG) )
LOG.debug("Error propagating query",e);
}
}
private void resendQuery (Message msg, String srcPeer) {
if (rendezvous == null)
return;
if (!myGroup.isRendezvous())
{
// We are not a Rendez vous peer. Just drop the message.
return;
}
// Rebuild a query based on the existing one
MessageElementEnumeration elements = msg.getElements();
String tagString = null;
Vector tmp = new Vector();
ResolverQuery query = null;
// We need to change the source peer value of each query.
while (elements.hasMoreMessageElements())
{
MessageElement el = elements.nextMessageElement();
tagString=el.getName();
if (!( tagString.endsWith(outQueNameShort)
|| tagString.endsWith(inQueNameShort)))
{ // usefull ?
// Tag for something else - just ignore
continue;
}
try
{
MessageElement elem = msg.getElement(tagString);
query = new ResolverQuery(elem.getStream());
// Remove the element
msg.removeElement (elem);
query.setSrc (srcPeer);
tmp.addElement (query);
}
catch (Exception e)
{
continue;
}
}
// At this point, all the resolver queries from the message have been
// removed, updated, and stored into tmp. Put them back into the
// new the message
if (tmp.size() == 0)
{
// There was no query. Discard.
return;
}
Message newMsg = endpoint.newMessage();
for (int i = 0; i < tmp.size(); ++i)
{
try
{
query = (ResolverQuery) tmp.elementAt (i);
newMsg.addElement (newMsg.newMessageElement(outQueName,
new MimeMediaType ("text/xml"),
(InputStream)((Document)
(query.getDocument(new MimeMediaType("text/xml")))).getStream()));
}
catch (Exception e)
{
// Something weird happened. Discard.
return;
}
}
// Send the message
try
{
rendezvous.propagateInGroup(newMsg,
handlerName,
outQueName,
7,
localPeerId);
}
catch(IOException e)
{
if (LOG.isEnabledFor(Priority.DEBUG) )
LOG.debug("Error propagating resending query",e);
}
}
/**
* This method will be called by the endpoint or the rendezvous service
* to handle messages of this type
* @param message
*/
public void processIncomingMessage(Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
MessageElementEnumeration elements;
String tagString;
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("demuxing a query");
Message srcMsg = (Message) message.clone();
elements = message.getElements();
while (elements.hasMoreElements())
{
MessageElement element = elements.nextMessageElement();
tagString= element.getName();
if (!( tagString.endsWith(outQueNameShort)
|| tagString.endsWith(inQueNameShort)))
{ // ? usefull
// Tag for something else - just ignore
continue;
}
try
{
MessageElement elem = message.getElement(tagString);
// Remove the element fromt the message
message.removeElement (elem);
ResolverQuery doc =
new ResolverQuery(elem.getStream());
ResolverResponseMsg res =
(ResolverResponseMsg) processQuery((ResolverQueryMsg)doc);
if (res != null)
{
// There is a response to be sent
respond (doc.getSrc(),
handlerName,
inQueName,
tagString,
(InputStream)((Document)
(res.getDocument(new MimeMediaType("text/xml")))).getStream());
}
// This ResolverService Service allow to re-propagate a request even if
// a local answer has been found.
propagateQuery (srcMsg);
}
catch (ResendQueryException e2)
{
// The resolver service has no response, but is interested to get
// a response itself. Resend the query, but just like if that
// was the source of the query. The resolver service will be
// reponsible for resending the response to the original sender.
propagateQuery ((Message) srcMsg.clone());
resendQuery (srcMsg, localPeerId);
}
catch (NoResponseException e1)
{
// The resolver service has no response. Process the query to
// see if there is something else to do.
propagateQuery (srcMsg);
}
catch (DiscardQueryException e3)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -