📄 discoveryserviceimpl.java
字号:
* @NoResponseException thrown when no response is generated
* @throws NoResponseException if no Responses were received
*/
public ResolverResponseMsg processQuery(ResolverQueryMsg query)
throws NoResponseException,
DiscardQueryException,
ResendQueryException {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("processQuery starts");
}
ResolverResponse response;
Enumeration enum = null;
Vector results = null;
Vector expirations = new Vector();
DiscoveryQuery dq;
try {
dq = new DiscoveryQuery(new ByteArrayInputStream
((query.getQuery()).getBytes()));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn( "Malformed query : ", e);
throw new DiscardQueryException( "Malformed query : " + e.getMessage() );
}
if( (dq.getThreshold() < 0) ||
((null == dq.getAttr()) && (null != dq.getValue()) ) ||
((null != dq.getAttr()) && (null == dq.getValue()) ) ||
(dq.getDiscoveryType() < PEER) ||
(dq.getDiscoveryType() > ADV) )
{
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn( "Malformed query" );
}
throw new DiscardQueryException( "Malformed query" );
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("Got a " + dirname[dq.getDiscoveryType()] + " query " + dq.getAttr() + " = " + dq.getValue());
}
// Get the Peer Adv from the query and publish it.
PeerAdvertisement padv = null;
try {
padv = (PeerAdvertisement)
AdvertisementFactory.newAdvertisement(textXml,
new ByteArrayInputStream(
((String) dq.getPeerAdv()).getBytes()));
if (!(padv.getPeerID().toString()).equals(localPeerId)) {
// publish others only. Since this one comes from outside,
// we must not keep it beyond its expiration time.
// FIXME: [jice@jxta.org 20011112] In theory there should
// be an expiration time associated with it in the msg, like
// all other items.
publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Bad Peer Adv in Discovery Query", e);
}
}
/*
* threshold==0 and type==PEER is a special case. In this case we are
* responding for the purpose of providing our own adv only.
*/
if( (dq.getDiscoveryType() == PEER) && (0 == dq.getThreshold()) ) {
results = new Vector();
expirations.clear();
}
else {
results = search(dq.getDiscoveryType(),
dq.getAttr(),
dq.getValue(),
dq.getThreshold(),
true,
expirations);
}
/*
* threshold==0 and type==PEER is a special case. In this case we are
* responding for the purpose of providing our own adv only.
*/
if ( (dq.getDiscoveryType() != PEER) && results.isEmpty() ) {
// If this peer is a rendezvous, simply let the resolver
// re-propagate the query.
// If this peer is not a rendez, just discard the query.
if (group.isRendezvous()) {
throw new NoResponseException("No results matching the query");
} else {
throw new DiscardQueryException("No results matching the query, no need to repropagate");
}
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("Responding to " + dirname[dq.getDiscoveryType()] + " Query : " + dq.getAttr() + " = " + dq.getValue());
}
updatePeerAdvStr();
DiscoveryResponse dresponse = new DiscoveryResponse(results.size(),
dq.getDiscoveryType(),
localPeerAdvStr,
dq.getAttr(),
dq.getValue(),
results,
expirations);
response = new ResolverResponse(handlerName,
credentialDoc,
query.getQueryId(),
dresponse.toString());
return (response);
}
/**
* Add a discovery listener
*
* @param listener The feature to be added to the DiscoveryListener attribute
* @since
*/
public synchronized void addDiscoveryListener(DiscoveryListener listener) {
listeners.addElement(listener);
}
/**
* remove a discovery listener
*
* @param listener DiscoveryListener to remove
* @return true if the argument was a component of this vector; false otherwise
* @since
*/
public synchronized
boolean removeDiscoveryListener(DiscoveryListener listener) {
Enumeration e = listenerTable.keys();
while (e.hasMoreElements()) {
Object key = (Object) e.nextElement();
if (listenerTable.get(key) == listener) {
listenerTable.remove(key);
}
}
return (listeners.removeElement(listener));
}
/**
* Remote Publish an advertisement will attempt to remote publish adv on all
* configured transports, the Advertisement will carry a a expiration of <i>
* lifetime</i>
*
*@param adv advertisement as as a string to publish
*@param type Discovery type PEER, GROUP, ADV
*@param timeout the amount of time to advise other nodes to hold this
* advertisement in their caches.
*/
void remotePublish(String advString, int type, long timeout) {
// In case this is invoked before startApp().
if (resolver == null) return;
Vector list = new Vector();
Enumeration peers;
// support groups, and adv
// to remotely publish
if (type == PEER) {
return;
}
Vector advert = new Vector(1);
Vector expirations = new Vector(1);
DiscoveryResponse dresponse = null;
advert.addElement(advString);
expirations.addElement(new Long(timeout));
updatePeerAdvStr();
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Remote publishing Group");
dresponse = new DiscoveryResponse(1,
type,
localPeerAdvStr,
null,
null,
advert, // This is the thing published.
expirations);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
}
ResolverResponse pushRes =
new ResolverResponse(handlerName,
credentialDoc,
0,
dresponse.toString());
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("remote publishing");
}
resolver.sendResponse(null, pushRes);
}
/**
* Search for a doc, that matches attr, and value
* bytes is set to true if the caller wants wire format of the
* advertisement, or set to false if caller wants Advertisement
* objects.
* @param type Discovery type PEER, GROUP, ADV
*@param attribute attribute name to narrow disocvery to Valid values for
* this parameter are null (don't care), or exact element name in the
* advertisement of interest (e.g. "Name")
* @param @param value value of attribute to narrow disocvery to valid values for
* @param threshold the upper limit of responses from one peer
* @param bytes flag to indicate how the results are returned docs, or bytes
* @param expirations vector containing the expirations associated with is returned
* @return vector of results either as docs, or Strings
* @since
*/
private Vector search(int type,
String attr,
String value,
int threshold,
boolean bytes,
Vector expirations) {
FileInputStream is = null;
Vector results = new Vector();
if( threshold <= 0 )
throw new IllegalArgumentException( "threshold must be greater than zero" );
if( null != expirations )
expirations.clear();
if (attr != null || value != null ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("narrowing down the response.");
// a discovery query with a specific search criteria.
Enumeration enum = cm.search(dirname[type], attr, value);
while (enum.hasMoreElements() && (results.size() < threshold) ) {
String fn = (String) enum.nextElement();
try {
long cacheTime = cm.getCacheLifetime(dirname[type], fn);
if (!fn.equals(localPeerId) && cacheTime > 0 ) {
if (bytes) {
results.addElement(new String(cm.restoreBytes(dirname[type], fn)));
expirations.addElement( new Long(cacheTime) );
} else {
try {
is = cm.getInputStream(dirname[type], fn);
Advertisement adv =
AdvertisementFactory.newAdvertisement(
textXml, is, cacheTime );
results.addElement(adv);
} finally {
if (is != null) {
try { is.close(); }
catch (IOException e) { }
}
}
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("search failed for item " + fn, e );
}
} // while
} else {
// remote does not care about narrowing down the response
String[] fn = cm.getFileNames(dirname[type]);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("remote does not care about narrowing down the response.");
if (fn == null || fn.length == 0) {
// return an empty Vector
return results;
}
for (int i = 0; (i < fn.length) && (results.size() < threshold); i++) {
try {
long cacheTime = cm.getCacheLifetime(dirname[type], fn[i] );
if (!fn[i].equals(localPeerId) && cacheTime > 0) {
if (bytes) {
results.addElement(new String(cm.restoreBytes(dirname[type], fn[i])));
expirations.addElement(new Long(cacheTime));
} else {
try {
is = cm.getInputStream(dirname[type], fn[i]);
Advertisement adv =
AdvertisementFactory.newAdvertisement(
textXml, is, cacheTime );
results.addElement(adv);
} finally {
if (is != null) {
try { is.close(); }
catch (IOException ignored) { }
}
}
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(e);
}
}
}
return results;
}
/**
* convert a doc to string
*
* @param adv Advertisement to convert
* @return String representation of the adv
*/
private String advToString(Advertisement adv) {
StringWriter out = new StringWriter();
try {
StructuredTextDocument doc = (StructuredTextDocument) adv.getDocument(textXml);
doc.sendToWriter(out);
return out.toString();
} catch (Exception all) {
return null;
}
}
private synchronized int nextQid() {
return qid++;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -