📄 piperesolver.java
字号:
doc.appendChild(e);
e = doc.createElement(PeerIdTag, peer);
doc.appendChild(e);
e = doc.createElement(CachedTag, "false");
doc.appendChild(e);
StringWriter dumped = new StringWriter();
doc.sendToWriter(dumped);
ResolverQuery query = new ResolverQuery(PipeResolverName,
null,
localPeerId,
dumped.toString(), qid++);
resolver.sendQuery(peer, query);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("sendVerify failed with " + e);
return;
}
}
/**
* processResponse deals with pipe resolver responses
*
* @param response ResolverResponse containing the payload
* @since 1.0
*/
public void processResponse(ResolverResponseMsg response) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processResponse got a response");
Enumeration enum = null;
String peer = null;
MimeMediaType mediaType = null;
String ipId = null;
String type = null;
String peerAdv = null;
try {
mediaType = new MimeMediaType("text/xml");
} catch (RuntimeException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processResponse: cannot get MimeMediaType " + e);
return;
}
ByteArrayInputStream ip =
new ByteArrayInputStream(response.getResponse().getBytes());
StructuredDocument doc = null;
try {
doc = StructuredDocumentFactory.newStructuredDocument(mediaType, ip);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processResponse: malformed response - discard");
return;
}
// XXX: we are doing no document sanity check here.
// Maybe should we be more careful.
// to be fixed. lomax@jxta.org
enum = doc.getChildren();
Vector tmp = new Vector();
boolean found = true;
while (enum.hasMoreElements()) {
TextElement elem = (TextElement) enum.nextElement();
if (elem.getName().equals(PeerIdTag)) {
tmp.addElement(elem.getValue());
continue;
}
if (elem.getName().equals(PipeIdTag)) {
ipId = (String) elem.getValue();
continue;
}
if (elem.getName().equals(TypeTag)) {
type = (String) elem.getValue();
continue;
}
if (elem.getName().equals(FoundTag)) {
found = ((String) elem.getValue()).equals("true");
continue;
}
// let's check whether the responder sent us a adv
if (elem.getName().equals(PeerAdvTag)) {
peerAdv = (String) elem.getValue();
saveAdv(peerAdv);
continue;
}
}
if ((ipId == null) || (tmp.size() == 0)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processResponse: malformed response - discard.");
// Malformed response. Just discard.
return;
}
if (type == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processResponse: no type - using UnicastType as default");
type = PipeService.UnicastType;
}
enum = tmp.elements();
if (enum == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processResponse: empty response");
return;
}
while (enum.hasMoreElements()) {
peer = (String) enum.nextElement();
if (!found) {
// We have received a NACK.
String cachedPeer = findLocal(ipId);
if ((cachedPeer != null) && cachedPeer.equals(peer)) {
// Remove that entry.
forgetCached(ipId);
return;
// May be we should forward the nacks ?
}
} else {
// put the answer into the cache.
registerCached(ipId, peer);
// call any listener for ipid
if ( listeners.containsKey(ipId) ) {
Event newevent = new Event(this,
peer,
ipId,
type,
response.getQueryId() );
Listener pl = (Listener) listeners.get(ipId);
pl.pipeResolverEvent(newevent);
}
}
}
// FIXME: [jice@jxta.org 20010914] Why do we respond with the
// last peer from the above loop only? Why are there several peers
// though? Should we be responding when we got a Nack?
}
/**
* Process the Query, and genrate response
*
* @param query Description of Parameter
* @return ResolverResponseMsg "Response"
* @exception NoResponseException Description of Exception
* @exception DiscardQueryException Description of Exception
* @exception IOException Description of Exception
* @exception ResendQueryException Description of Exception
* @since 1.0
*/
public ResolverResponseMsg processQuery(ResolverQueryMsg query)
throws NoResponseException,
DiscardQueryException,
IOException,
ResendQueryException {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery starts");
MimeMediaType mediaType = null;
mediaType = new MimeMediaType("text/xml");
ByteArrayInputStream ip =
new ByteArrayInputStream(query.getQuery().getBytes());
StructuredTextDocument doc = null;
try {
doc = (StructuredTextDocument)
StructuredDocumentFactory.newStructuredDocument( mediaType, ip );
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: malformed request ", e);
throw new IOException("processQuery: malformed request ");
}
// XXX: we are doing no document sanity check here.
// Maybe should we be more careful. to be fixed. lomax@jxta.org
String pId = null;
boolean cached = true;
String destPeer = null;
String type = null;
Enumeration enum = doc.getChildren();
while (enum.hasMoreElements()) {
TextElement elem = (TextElement) enum.nextElement();
if (elem.getName().equals(PipeIdTag)) {
pId = (String) elem.getValue();
continue;
}
if (elem.getName().equals(PeerIdTag)) {
destPeer = (String) elem.getValue();
continue;
}
if (elem.getName().equals(TypeTag)) {
type = (String) elem.getValue();
continue;
}
if (elem.getName().equals(CachedTag)) {
cached = ((String) elem.getValue()).equals("true");
continue;
}
}
if (pId == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: malformed request, no PipeId.");
throw new IOException();
}
if (type == null) {
// If type is null, discard the request.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: type is null, discarded.");
throw new DiscardQueryException("processQuery: type is null, discarded.");
}
if ((destPeer != null) && !destPeer.equals(localPeerId)) {
// This request wasn't for this peer.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: request not for this peer, discarded.");
throw new DiscardQueryException("processQuery: request not for this peer, discarded.");
}
int queryId = query.getQueryId();
String peer = null;
// XXX: Only one query at the time...
// Maybe that should be changed later. lomax@jxta.org
if (!cached) {
// The source of the query has requeried not to receive
// cached information.
if (pipes.get(pId) != null) {
peer = localPeerId;
} else {
peer = null;
}
} else {
// We can look into our cache.
peer = findLocal(pId);
}
if ((peer == null) && (destPeer == null)) {
// This request was sent to everyone.
// If this peer is a rendezvous, allow the ResolverService to repropagate
// the query. If this peer is not a rendezvous, just discard the query.
if (myGroup.isRendezvous()) {
throw new NoResponseException();
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: message discarded.");
throw new DiscardQueryException("processQuery: message discarded.");
}
}
try {
// Build the answer
doc = (StructuredTextDocument)
StructuredDocumentFactory.newStructuredDocument(
new MimeMediaType("text/xml"), "jxta:PipeResolver");
Element e = null;
e = doc.createElement(MsgTypeTag, AnswerMsgType);
doc.appendChild(e);
e = doc.createElement(PipeIdTag, pId);
doc.appendChild(e);
e = doc.createElement(TypeTag, type);
doc.appendChild(e);
if (peer == null) {
// We are sending a NACK
e = doc.createElement(PeerIdTag, localPeerId);
doc.appendChild(e);
e = doc.createElement(FoundTag, "false");
doc.appendChild(e);
} else {
e = doc.createElement(PeerIdTag, peer);
doc.appendChild(e);
// this saves the remote peer a big step in resolving pid to endpoint
String advStr = advString(peer);
if (advStr != null) {
e = doc.createElement(PeerAdvTag, advStr);
doc.appendChild(e);
}
}
StringWriter dumped = new StringWriter();
doc.sendToWriter(dumped);
ResolverResponse res = new ResolverResponse(PipeResolverName,
null,
queryId,
dumped.toString());
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: sending answer.");
return res;
} catch (Exception ee) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery: error while processing query:", ee);
throw new IOException( ee.toString() );
}
}
/**
*Description of the Method
*
* @param timer Description of Parameter
* @since 1.0
*/
public void signal(JxtaTimer timer) {
refreshCachedPipes();
}
/**
*Description of the Method
*
* @param entry Description of Parameter
* @since 1.0
*/
private void resetLease(PipeEntry entry) {
if (entry == null) {
return;
}
long currentTime = System.currentTimeMillis();
entry.lease = currentTime + DefaultPipeLease;
}
/**
*Description of the Method
*
* @since 1.0
*/
private synchronized void refreshCachedPipes() {
long currentTime = System.currentTimeMillis();
Enumeration enum = cachedPipes.keys();
if ((enum == null) || (!enum.hasMoreElements())) {
return;
}
PipeEntry entry = null;
String pipe = null;
while (enum.hasMoreElements()) {
try {
pipe = (String) enum.nextElement();
entry = (PipeEntry) cachedPipes.get(pipe);
if (entry == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("refreshCachedPipe: empty entry for pipe " + pipe);
continue;
}
if (entry.lease < currentTime) {
// Remove that entry.
cachedPipes.remove(pipe);
continue;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("refreshCachedPipe: " + e);
break;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -