📄 resolverserviceimpl.java
字号:
{
// We are asked to discard this query.
return;
}
catch (Exception e4)
{
// The query was invalid somehow. Just discard.
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(e4);
return;
}
}
}
/**
* Inner class to handle the Response queue for outgoing messages
*
*/
class RecvDemux implements EndpointListener {
public void processIncomingMessage(
Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr
)
{
if (message == null )
{
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("RecvDemux.demux: got a null message");
return;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("demuxing a response ");
Enumeration tags = message.getNames();
if (tags==null)
return;
MessageElementEnumeration elements = message.getElements();
while (elements.hasMoreElements())
{
MessageElement el = elements.nextMessageElement();
String tagString=el.getName();
if (!( tagString.endsWith(outQueNameShort)
|| tagString.endsWith(inQueNameShort)))
{
// Tag for something else - just ignore
continue;
}
try
{
while (true)
{
MessageElement elem = message.getElement(tagString);
// Remove the element from the message
message.removeElement (elem);
ResolverResponse resp =
new ResolverResponse(elem.getStream());
processResponse(resp);
}
}
catch (Exception e)
{
// we'll definitely get this exception when have no more
// messages, and this is the desired behavior.
}
}
}
}
/**
* Process the message and respond
* @return ResolverResponseMsg response
* @param query
* @throws NoResponseException
*/
public ResolverResponseMsg processQuery(ResolverQueryMsg query)
throws NoResponseException,
ResendQueryException,
DiscardQueryException,
IOException {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("handing a query to "+((ResolverQuery)query).getHandlerName());
QueryHandler theHandler = getHandler( ((ResolverQuery)query).getHandlerName());
if ( theHandler == null)
{
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("throwing away "+((ResolverQuery)query).getHandlerName());
return null;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("handing a query to "+((ResolverQuery)query).getHandlerName());
ResolverResponseMsg result;
result = theHandler.processQuery(query);
return((ResolverResponseMsg) result);
}
/**
* push Response back to the handler
* @return ResolverResponseMsg response
* @param query
* @throws NoResponseException
*/
public void processResponse(ResolverResponseMsg resp) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("pushing a response to "+((ResolverResponseMsg)resp).getHandlerName());
QueryHandler theHandler =
getHandler(((ResolverResponseMsg)resp).getHandlerName());
theHandler.processResponse(resp);
}
/**
* Send a response to a peer.
* This is used by resolver services that are handling query/response in
* an asynchronous way, which is the case of Rendezvous peers.
*
* @param destPeer is the destination of the response
* @param response is the response to be sent
*/
public void sendResponse (String destPeer, ResolverResponseMsg response) {
if (destPeer == null)
{
propagateResponse(response);
}
else
{
try
{
respond (destPeer,
handlerName,
inQueName,
inQueName,
(InputStream)((Document)
(response.getDocument(new MimeMediaType("text/xml")))).
getStream());
}
catch (Exception e)
{
}
}
}
private void propagateResponse(ResolverResponseMsg response) {
if (rendezvous == null)
return;
Message propagateMsg = endpoint.newMessage();
try
{
propagateMsg.addElement (propagateMsg.newMessageElement (inQueName,
new MimeMediaType ("text/xml"),
(InputStream)((Document)
(response.getDocument(new MimeMediaType("text/xml")))).getStream()));
rendezvous.propagateInGroup(propagateMsg,
handlerName,
inQueName,
1,
null);
}
catch (Exception e)
{
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "failure on propagateResponse", e);
}
}
/**
* respond with a ResolverResponse Message
* @param address
* @param tagName
* @param response
* @throws RuntimeException
*/
private void respond (String destPeer,
String pName,
String pParam,
String tagName,
InputStream response)
throws RuntimeException {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("destPeer :\n"+destPeer);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("responding to "+pName+" "+pParam+" "+tagName);
if (response == null)
{
throw new RuntimeException(
"Attempting to respond with a empty message");
}
Message msg = endpoint.newMessage();
try
{
msg.addElement( msg.newMessageElement (tagName,
null,
response));
// FIXME: we should make sure that it will not be re-propagated
// To do that we'd need to add a propagation header, which
// the rendezvous service does when propagating.
// However, at this time there is no unicast method
// among the rendezvous propagation service...
//
// updatePropHeader(msg, 1);
}
catch (Exception ez1)
{
// Not much we can do
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug ("Exception ", ez1);
}
EndpointMessenger messenger = null;
try
{
messenger = endpoint.getMessenger (mkAddress(destPeer,
pName,
pParam));
} catch (Exception e) {
}
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "endpoint.getMessenger failed to get messenger");
return;
}
try
{
messenger.sendMessage (msg);
}
catch (IOException e)
{
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "messenger.sendMessage failed to send", e );
throw new RuntimeException(
"messenger.sendMessage failed to send" );
}
}
/**
* init is called by the group running this instance
* @param g
* @throws PeerGroupException
*/
public void init(PeerGroup g, ID assignedID, Advertisement impl)
throws PeerGroupException
{
implAdvertisement = (ModuleImplAdvertisement) impl;
myGroup = g;
endpoint = g.getEndpointService();
localPeerId = g.getPeerID().toString();
handlerName = assignedID.toString();
String uniqueStr = (g.getPeerGroupID().getUniqueValue()).toString();
outQueName = uniqueStr + outQueNameShort;
inQueName = uniqueStr + inQueNameShort;
localPeerId = g.getPeerID().toString();
recvMux = new RecvDemux();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -