📄 resolverserviceimpl.java
字号:
catch(IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG) ) {
LOG.debug("Error propagating query",e);
}
}
}
private void resendQuery(Message msg, String srcPeer) {
if (rendezvous == null)
return;
if (!myGroup.isRendezvous()) {
// We are not a Rendez vous peer. Just drop the message.
return;
}
// Rebuild a query based on the existing one
MessageElementEnumeration elements = msg.getElements();
String tagString = null;
Vector tmp = new Vector();
ResolverQuery query = null;
// We need to change the source peer value of each query.
while (elements.hasMoreMessageElements()) {
MessageElement el = elements.nextMessageElement();
tagString=el.getName();
if ( !tagString.endsWith(outQueNameShort) ) {
// Tag for something else - just ignore
continue;
}
try {
query = new ResolverQuery(el.getStream());
// Remove the element
msg.removeElement(el);
query.setSrc(srcPeer);
tmp.addElement(query);
}
catch (Exception e) {
continue;
}
}
// At this point, all the resolver queries from the message have been
// removed, updated, and stored into tmp. Put them back into the
// new the message
if (tmp.size() == 0) {
// There was no query. Discard.
return;
}
Message newMsg = endpoint.newMessage();
for (int i = 0; i < tmp.size(); ++i) {
try {
query = (ResolverQuery) tmp.elementAt(i);
newMsg.addElement(newMsg.newMessageElement(outQueName,
textXml,
(InputStream)((Document)
(query.getDocument(textXml))).getStream()));
}
catch (Exception e) {
// Something weird happened. Discard.
return;
}
}
// Send the message
try {
rendezvous.propagateInGroup(newMsg,
handlerName,
outQueName,
7,
localPeerId);
}
catch(IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG) ) {
LOG.debug("Error propagating resending query",e);
}
}
}
/**
* This method will be called by the endpoint or the rendezvous service
* to handle query messages
*
* @param message
*/
public void processIncomingMessage(Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
MessageElementEnumeration elements;
String tagString;
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("demuxing a query");
Message srcMsg = (Message) message.clone();
elements = message.getElements();
while (elements.hasMoreElements()) {
MessageElement element = elements.nextMessageElement();
tagString= element.getName();
if ( !tagString.endsWith(outQueNameShort) ) {
// Tag for something else - just ignore
continue;
}
try {
// Remove the element fromt the message
message.removeElement(element);
ResolverQuery doc =
new ResolverQuery(element.getStream());
ResolverResponseMsg res =
(ResolverResponseMsg) processQuery((ResolverQueryMsg)doc);
if (res != null) {
// There is a response to be sent
respond(doc.getSrc(),
handlerName,
inQueName,
inQueName,
(InputStream)((Document) (res.getDocument(textXml))).getStream());
}
// This ResolverService Service allow to re-propagate a request even if
// a local answer has been found.
propagateQuery(srcMsg);
} catch (ResendQueryException e2) {
// The resolver service has no response, but is interested to get
// a response itself. Resend the query, but just like if that
// was the source of the query. The resolver service will be
// reponsible for resending the response to the original sender.
propagateQuery((Message) srcMsg.clone());
resendQuery(srcMsg, localPeerId);
}
catch (NoResponseException e1) {
// The resolver service has no response. Process the query to
// see if there is something else to do.
propagateQuery(srcMsg);
}
catch (DiscardQueryException e3) {
// We are asked to discard this query.
return;
}
catch (IOException e4) {
// The query was invalid somehow. Just discard.
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("query was malformed", e4);
}
return;
}
}
}
/**
* Inner class to handle the Response queue for incoming responses
*
*/
class RecvDemux implements EndpointListener {
public void processIncomingMessage(Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
if (message == null ) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("RecvDemux.demux: got a null message");
return;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("demuxing a response ");
MessageElementEnumeration elements = message.getElements();
while (elements.hasMoreElements()) {
MessageElement el = elements.nextMessageElement();
String tagString=el.getName();
if (! tagString.endsWith(inQueNameShort) ) {
// Tag for something else - just ignore
continue;
}
try {
// Remove the element from the message
message.removeElement(el);
ResolverResponse resp = new ResolverResponse(el.getStream());
processResponse(resp);
}
catch (IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Ill formatted resolver response, ignoring.", e );
}
}
}
}
/**
* Process the message and respond
* @return ResolverResponseMsg response
* @param query
* @throws NoResponseException
*/
public ResolverResponseMsg processQuery(ResolverQueryMsg query)
throws NoResponseException,
ResendQueryException,
DiscardQueryException,
IOException {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("handing a query to "+((ResolverQuery)query).getHandlerName());
QueryHandler theHandler = getHandler( ((ResolverQuery)query).getHandlerName());
if ( theHandler == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("throwing away "+((ResolverQuery)query).getHandlerName());
return null;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("handing a query to "+((ResolverQuery)query).getHandlerName());
ResolverResponseMsg result;
result = theHandler.processQuery(query);
return((ResolverResponseMsg) result);
}
/**
* push Response back to the handler
* @return ResolverResponseMsg response
* @param query
* @throws NoResponseException
*/
public void processResponse(ResolverResponseMsg resp) {
String handlerName = ((ResolverResponseMsg)resp).getHandlerName();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("pushing a response to " + handlerName );
}
QueryHandler theHandler = getHandler( handlerName );
theHandler.processResponse(resp);
}
/**
* Send a response to a peer.
* This is used by resolver services that are handling query/response in
* an asynchronous way, which is the case of Rendezvous peers.
*
* @param destPeer is the destination of the response
* @param response is the response to be sent
*/
public void sendResponse(String destPeer, ResolverResponseMsg response) {
if (destPeer == null) {
propagateResponse(response);
}
else {
try {
respond(destPeer,
handlerName,
inQueName,
inQueName,
(InputStream)((Document)(response.getDocument(textXml))).getStream());
}
catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("error in sending response", e );
}
}
}
private void propagateResponse(ResolverResponseMsg response) {
if (rendezvous == null)
return;
Message propagateMsg = endpoint.newMessage();
try {
propagateMsg.addElement(
propagateMsg.newMessageElement(inQueName,
textXml,
(InputStream)((Document)
(response.getDocument(textXml))).getStream()));
rendezvous.propagateInGroup(propagateMsg,
handlerName,
inQueName,
1,
null);
}
catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "failure on propagateResponse", e);
}
}
/**
* respond with a ResolverResponse Message
* @param address
* @param tagName
* @param response
* @throws RuntimeException
*/
private void respond(String destPeer,
String pName,
String pParam,
String tagName,
InputStream response) throws RuntimeException {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("destPeer :\n"+destPeer);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("responding to "+pName+" "+pParam+" "+tagName);
if (response == null) {
throw new RuntimeException("Attempting to respond with a empty message");
}
Message msg = endpoint.newMessage();
try {
msg.addElement( msg.newMessageElement(tagName,
null,
response));
// FIXME: we should make sure that it will not be re-propagated
// To do that we'd need to add a propagation header, which
// the rendezvous service does when propagating.
// However, at this time there is no unicast method
// among the rendezvous propagation service...
//
// updatePropHeader(msg, 1);
}
catch (Exception ez1) {
// Not much we can do
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Exception ", ez1);
}
EndpointMessenger messenger = null;
try {
messenger = endpoint.getMessenger(mkAddress(destPeer,
pName,
pParam));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "endpoint.getMessenger failed to get messenger", e );
return;
}
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "endpoint.getMessenger failed to get messenger");
return;
}
try {
messenger.sendMessage(msg);
}
catch (IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug( "messenger.sendMessage failed to send", e );
throw new RuntimeException( "messenger.sendMessage failed to send" );
}
}
/**
* init is called by the group running this instance
* @param g
* @throws PeerGroupException
*/
public void init(PeerGroup g, ID assignedID, Advertisement impl)
throws PeerGroupException {
implAdvertisement = (ModuleImplAdvertisement) impl;
myGroup = g;
endpoint = g.getEndpointService();
localPeerId = g.getPeerID().toString();
handlerName = assignedID.toString();
String uniqueStr = (g.getPeerGroupID().getUniqueValue()).toString();
outQueName = uniqueStr + outQueNameShort;
inQueName = uniqueStr + inQueNameShort;
localPeerId = g.getPeerID().toString();
recvMux = new RecvDemux();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -