📄 endpointrouter.java
字号:
} catch (Exception e1) {
continue;
}
}
}
}
StringWriter dumped = new StringWriter();
doc.sendToWriter( dumped );
ResolverResponse res = new ResolverResponse(routerSName,
null,
queryId,
dumped.toString() );
return res;
} catch (Exception ee) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: error while processing query: ", ee);
throw new IOException();
}
}
// This class is used in order to add routing information
// to Messengers.
public class RouterMessenger implements EndpointMessenger {
protected EndpointMessenger messenger = null;
protected EndpointAddress destAddress = null;
protected EndpointRouter router = null;
protected boolean persistent = false;
protected String dstPAddr = null;
public RouterMessenger(EndpointMessenger m,
EndpointAddress destAddress,
EndpointService e,
EndpointRouter r) {
if( m == null ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "null messenger!" );
throw new IllegalArgumentException( "null messenger!");
}
this.messenger = m;
this.destAddress = destAddress;
this.router = r;
}
// If created with the falg persistent, then this messenger
// is not disposable; it must try to re-establish a transortMessenger
// when the one it has fails.
// Routed messages go through a disposable messenger, and much less
// effort is put in establishing a transport messenger.
// Local messages go through a messenger that has been given to
// the application layer; therefore it has to try hard.
// A disposable messenger will not tolerate to be created with a null
// transport messenger. A persistent one, will.
public RouterMessenger(EndpointMessenger m,
EndpointAddress destAddress,
EndpointService e,
EndpointRouter r,
boolean persistent) {
this.messenger = m;
this.destAddress = destAddress;
this.router = r;
this.persistent = persistent;
// getTransportAddress wants it in this format and we may have
// to call it a number of times.
if (persistent) {
this.dstPAddr = destAddress.getProtocolName() + "://"
+ destAddress.getProtocolAddress();
}
}
public void sendMessage(Message message) throws IOException {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("EndpointRouterMessenger.sendMessage starts");
// We'll try this more than once:
// First if we do not have a transport messenger; which could
// happen because the connection got broken the last time
// we tried and we failed a retry, then we'll try again; but
// realy the caller is supposed to stop using this messenger
// when it fails.
// Second, if we have a transport messenger
// we'll try to send the message. If it fails, we'll make
// a new one and if that works, retry the message.
// Third, if that fails again, then we discard the transport
// messenger; assuming we got one, and give up. So, that's
// two attempts at most. Only one if we can't make a transport
// messenger -> IOEXception.
int triesLeft = 2;
while (triesLeft-->0) {
if (messenger == null) {
// This case exists only for persistent messengers.
messenger = getTransportMessenger(dstPAddr, false);
}
InputStream ip = null;
EndpointRouterMessage routerMsg = null;
MessageElement elem = message.getElement(EndpointRouterMessage.Name);
if ((elem != null) && ((ip = elem.getStream())!= null)) {
try {
routerMsg = new EndpointRouterMessage(ip);
} catch (Exception ez1) {
// Can't read the element. Ignore it.
routerMsg = null;
}
}
if (routerMsg == null) {
// We need to create one.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Create a new EndpointRouterMessage");
routerMsg = new EndpointRouterMessage();
routerMsg.setSrcAddress(router.localPeerAddr);
routerMsg.setNbOfHops("1");
try {
Route route = router.getRoute(destAddress.getProtocolName() + "://" +
destAddress.getProtocolAddress());
if (route != null) {
if (route.gateways != null) {
routerMsg.setForwardGateways(route.gateways);
}
}
} catch (Exception ez1) {
// Not much we can do
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Cannot set forward gateways");
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Using EndpointRouterMessage");
// If we want to modify the routing header of forwarded
// messages, HERE is the best place to do it.
// Check if the destination within the message is the same
// as the destination peer of this messenger.
String inMsgDest = routerMsg.getDestAddress();
if ((inMsgDest != null) && (!inMsgDest.equals(destAddress.toString()))) {
// The destination address is not the same. It is very likely
// that the router message embedded within the message is
// invalid. Update it.
routerMsg.setForwardGateways(null);
try {
Route route = router.getRoute(destAddress.toString());
if (route != null) {
if (route.gateways != null) {
routerMsg.setForwardGateways(route.gateways);
int nbOfHops = route.gateways.size() + 1;
routerMsg.setNbOfHops(String.valueOf(nbOfHops));
}
}
} catch (Exception ez1) {
// Not much we can do
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Cannot set forward gateways");
}
}
// If the routerMsg still contains a potentially correct reverse route,
// check it, and if it is really correct, keep it. Otherwise, try to see
// if we have a reverse route anyway.
boolean reverse = false;
if (routerMsg.getReverseGateways() != null) {
reverse = isLocalRoute(routerMsg.getLastHop());
if (!reverse) {
routerMsg.setReverseGateways(null);
}
}
if (!reverse) {
// Fine, this peer does not have a reverse route to the
// last hop, but maybe it has a route to the source of the message.
if (getRoute(routerMsg.getSrcAddress()) != null) {
reverse = true;
} else {
reverse = isLocalRoute(routerMsg.getSrcAddress());
}
}
if (reverse) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" setting reverse remote route");
// Add this local peer as a hop
Vector v = routerMsg.getReverseGateways();
if (v == null) {
v = new Vector(1);
}
try {
v.add(0, localPeerAddr);
} catch (Exception ez1) {
// Not much can be done here.
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Cannot insert local peer id in the reverse route");
}
try {
routerMsg.setNbOfHops
(String.valueOf(Integer.parseInt(routerMsg.getNbOfHops()) + 1));
} catch (Exception ez1) {
// Not much we can do here
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Got exception while trying to set the nbofHops " + ez1);
routerMsg.setNbOfHops("0");
}
routerMsg.setReverseGateways(v);
} else {
// We really have no reverse route. Sorry.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no reverse route");
routerMsg.setNbOfHops("0"); //No reverse route
routerMsg.setReverseGateways(null);
}
}
routerMsg.setDestAddress(destAddress.toString());
routerMsg.setLastHop(router.localPeerAddr);
// Push the router header onto the message.
// That's all we have to do for now.
try {
message.addElement(
message.newMessageElement(EndpointRouterMessage.Name,
null,
routerMsg.getInputStream()));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Can't push header");
}
// give the message to the real messenger
try {
messenger.sendMessage(message);
return;
} catch (IOException ee) {
if (!persistent) throw(ee);
// persistent messengers cannot be used for routed messages
// they reset the router message for the next attempt.
message.removeElement(EndpointRouterMessage.Name);
messenger = null;
// May be it is not a case of broken route. It could be
// just a broken connection, but if the address is now wrong
// it is not 100% sure that all protocols will detect it
// when the transport messenger is created. It could be
// that it is always detected when first sending.
// So we have to expect the worse and call brokenRoute,
// otherwise we'd never look for the new address.
brokenRoute(dstPAddr);
// Back to sq 1; we'll retry if appropriate.
}
}
// If we reached this point, we retried too many times.
throw(new IOException("RouterMessenger: Could not find " +
"a working transport for destination."));
}
public void close() {
}
}
// That's what upper layers call (through the endpoint API) to get
// a messenger to a peer-id based destination.
public EndpointMessenger getMessenger(EndpointAddress addr)
throws IOException {
// Extract the relevant components from this address.
String dstPAddr = addr.getProtocolName() + "://"
+ addr.getProtocolAddress();
// If the dest is the local peer, just loop it back without going
// through the router.
if (dstPAddr.equals(localPeerAddr)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("getMessenger: return LoopbackMessenger");
return new LoopbackMessenger(endpoint,
endpoint.newEndpointAddress(localPeerAddr),
addr);
}
// Find an appropriate real transport messenger for this
// address.
EndpointMessenger m = getTransportMessenger(dstPAddr, false);
// Build a persistent RouterMessenger around it that will add our
// header.
return new RouterMessenger(m, addr, endpoint, this, true);
}
// That's what does the job of actually finding the address and
// endpoint protocol to use to send the message to its next hop.
// If the local flag is true. We must not look for a route; just use
// a local route or fail.
// NB: we need to get rid of this stupid habit of sometimes returning null
// and sometimes throwing an exception.
private EndpointMessenger getTransportMessenger(String dstPAddr,
boolean local)
throws IOException {
// Find a raw transport address for this peer.
EndpointMessenger messenger = null;
int triesLeft = 2;
while (triesLeft-->0) {
Address naddr = null;
if (local) {
naddr = (Address) getLocalRoute(dstPAddr);
} else {
naddr = (Address) getAddress(dstPAddr);
}
if (naddr == null) {
// A null address has no protocol name. This equivalent to a non-existant
// protocol, which the endpoint service would respond to by returning null.
// which we handle below by throwing an exception. So, throw an exception here
// as well, instead of returning null.
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("getTransportMessenger: no xport address for " + dstPAddr);
throw new IOException("getTransportMessenger: no address");
}
// Clone it. The original is in our tables and must not be
// messed with.
naddr = (Address) naddr.clone();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("getMessenger(). " +
"dest peer= " + dstPAddr +
"using = " + naddr.toString());
// Complete this address with the router's service queue name
naddr.setServiceName(routerSName);
naddr.setServiceParameter(routerSParam);
// Find the appropriate raw messenger to use.
try {
messenger = endpoint.getMessenger(naddr);
// Some transports may return null rather than throwing. Convert.
if (messenger == null) {
// The catch is right below. We may retry. Not the
// same as breaking out of the loop.
throw new IOException("getTransportMessenger: "
+ "no messenger");
}
break;
} catch(IOException noCheese) {
// find which local route is broken and purge it.
brokenRoute(dstPAddr);
}
}
// Some transports may return null rather than throwing. Convert.
if (messenger == null) {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("getMessenger: no messenger to "
+ dstPAddr);
throw new IOException("getTransportMessenger: no messenger");
}
return messenger;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -