📄 discoveryserviceimpl.java
字号:
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processQuery starts");
ResolverResponse response;
Enumeration enum = null;
Vector results = null;
Vector expirations = new Vector();
DiscoveryQuery dq;
try {
dq = new DiscoveryQuery(new ByteArrayInputStream
((query.getQuery()).getBytes()));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Error in query : ", e);
throw new IOException();
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Got a " + dirname[dq.getDiscoveryType()] + " query " + dq.getAttr() + " = " + dq.getValue());
PeerAdvertisement padv = null;
try {
padv = (PeerAdvertisement)
AdvertisementFactory.newAdvertisement(
new MimeMediaType("text/xml"),
new ByteArrayInputStream(
((String) dq.getPeerAdv()).getBytes()));
if (!(padv.getPeerID().toString()).equals(localPeerId)) {
// publish others only. Since this one comes from outside,
// we must not keep it beyond its expiration time.
// FIXME: [jice@jxta.org 20011112] In theory there should
// be an expiration time associated with it in the msg, like
// all other items.
publish(padv, PEER, DEFAULT_EXPIRATION, DEFAULT_EXPIRATION);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
}
results = search(dq.getDiscoveryType(),
dq.getAttr(),
dq.getValue(),
dq.getThreshold(),
true,
expirations);
if ((results == null) || (results.size()) == 0) {
// If this peer is a rendezvous, simply let the resolver
// re-propagate the query.
// If this peer is not a rendez, just discard the query.
if (group.isRendezvous()) {
throw new NoResponseException();
} else {
throw new DiscardQueryException();
}
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Responding to " + dirname[dq.getDiscoveryType()] + " Query : " + dq.getAttr() + " = " + dq.getValue());
updatePeerAdvStr();
DiscoveryResponse dresponse =
new DiscoveryResponse(results.size(),
dq.getDiscoveryType(),
localPeerAdvStr,
dq.getAttr(),
dq.getValue(),
results,
expirations);
if ((dq.getDiscoveryType() > PEER) && results.isEmpty()) {
// remote peer is only interested in something other than peer
//(group, or adv)
// don't answer unless it is peer discovery
throw new DiscardQueryException();
}
response = new ResolverResponse(handlerName,
"JXTACRED",
query.getQueryId(),
dresponse.toString());
return (response);
}
/**
* Add a discovery listener
*
* @param listener The feature to be added to the DiscoveryListener attribute
* @since
*/
public synchronized void addDiscoveryListener(DiscoveryListener listener) {
listeners.addElement(listener);
}
/**
* remove a discovery listener
*
* @param listener Description of Parameter
* @return true if the argument was a component of this vector; false otherwise
* @since
*/
public synchronized
boolean removeDiscoveryListener(DiscoveryListener listener) {
Enumeration e = listenerTable.keys();
while (e.hasMoreElements()) {
Object key = (Object) e.nextElement();
if (listenerTable.get(key) == listener) {
listenerTable.remove(key);
}
}
return (listeners.removeElement(listener));
}
/**
* Description of the Method
*
* @param bits Description of Parameter
* @return Description of the Returned Value
* @since
*/
protected synchronized int next(int bits) {
long nextseed = (seed * multiplier + addend) & mask;
seed = nextseed;
return (int) (nextseed >>> (48 - bits));
}
/**
*Description of the Method
*
* @param advString Description of Parameter
* @param type Description of Parameter
* @param timeout Description of Parameter
* @since
*/
void remotePublish(String advString, int type, long timeout) {
// In case this is invoked before startApp().
if (resolver == null) return;
Vector list = new Vector();
Enumeration peers;
// support groups, and adv
// to remotely publish
if (type == PEER) {
return;
}
Vector advert = new Vector(1);
Vector expirations = new Vector(1);
DiscoveryResponse dresponse = null;
advert.addElement(advString);
expirations.addElement(new Long(timeout));
updatePeerAdvStr();
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Remote publishing Group");
dresponse =
new DiscoveryResponse(1,
type,
localPeerAdvStr,
null,
null,
advert, // This is the thing published.
expirations);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
}
ResolverResponse pushRes =
new ResolverResponse(handlerName,
"JXTACRED",
0, dresponse.toString());
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("remote publishing");
resolver.sendResponse(null, pushRes);
}
/**
* Search for a doc, that matches attr, and value
* bytes is set to true if the caller wants wire format of the
* advertisement, or set to false if caller wants Advertisement
* objects.
* @param type Description of Parameter
* @param attr Description of Parameter
* @param value Description of Parameter
* @param threshold Description of Parameter
* @param bytes Description of Parameter
* @param expirations Description of Parameter
* @return Description of the Returned Value
* @since
*/
private Vector search(int type,
String attr,
String value,
int threshold,
boolean bytes,
Vector expirations) {
FileInputStream is = null;
Enumeration enum = null;
Vector results = new Vector();
if (attr != null || value != null ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("narrowing down the response.");
// a discovery query with a specific search
// criteria.
enum = cm.search(dirname[type], attr, value);
if (enum == null) {
// return an empty Vector
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("No hits on query" + attr + " " + value);
return results;
}
int i = 1;
while (enum.hasMoreElements()) {
String fn = (String) enum.nextElement();
try {
if (!fn.equals(localPeerId)
/*&& cm.getExpirationTime (dirname[type], fn) > 0*/){
if (bytes) {
results.addElement(new String(cm.restoreBytes(dirname[type], fn)));
expirations.addElement(new Long(cm.getExpirationTime(dirname[type], fn)));
} else {
try {
is = cm.getInputStream(dirname[type], fn);
Advertisement adv = (Advertisement)
AdvertisementFactory.newAdvertisement
(
new MimeMediaType("text/xml"),
is,
cm.getExpirationTime(dirname[type],
fn));
results.addElement(adv);
} finally {
if (is != null) {
try { is.close(); }
catch (IOException e) { }
}
}
}
i++;
// did we reach the limit
if (i > threshold) {
return results;
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("search failed", e );
}
} // while
} else {
// remote does not care about narrowing down the response
String[] fn = cm.getFileNames(dirname[type]);
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("remote does not care about narrowing down the response.");
if (fn == null || fn.length == 0) {
// return an empty Vector
return results;
}
for (int i = 0; i < fn.length; i++) {
try {
if (!fn[i].equals(localPeerId) && cm.getExpirationTime(dirname[type], fn[i]) > 0) {
if (bytes) {
results.addElement(new String(cm.restoreBytes(dirname[type], fn[i])));
expirations.addElement(new Long(cm.getExpirationTime(dirname[type], fn[i])));
} else {
try {
is = cm.getInputStream(dirname[type], fn[i]);
Advertisement adv = (Advertisement)
AdvertisementFactory.newAdvertisement
(new MimeMediaType("text/xml"),
is,
cm.getExpirationTime(dirname[type],
fn[i]));
results.addElement(adv);
} finally {
if (is != null) {
try { is.close(); }
catch (IOException e) { }
}
}
}
}
if (i >= threshold - 1 ) {
return results;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
}
}
}
return results;
}
/**
* convert a doc to string
*
* @param adv Description of Parameter
* @return Description of the Returned Value
* @since
*/
private String advToString(Advertisement adv) {
StringWriter out = new StringWriter();
MimeMediaType displayAs = new MimeMediaType("text/xml");
try {
StructuredTextDocument doc = (StructuredTextDocument) adv.getDocument(displayAs);
doc.sendToWriter(out);
return out.toString();
} catch (Exception all) {
return null;
}
}
/**
*
*
* @param n Description of Parameter
* @return Description of the Returned Value
* @since
*/
private int nextInt(int n) {
if (n <= 0) {
throw new IllegalArgumentException("n must be positive");
}
if ((n & -n) == n) {
// i.e., n is a power of 2
return (int) ((n * (long) next(31)) >> 31);
}
int bits;
int val;
do {
bits = next(31);
val = bits % n;
} while (bits - val + (n - 1) < 0);
return val;
}
private synchronized int nextQid() {
return qid++;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -