📄 peerinfoserviceimpl.java
字号:
PeerInfoQueryMessage.getMessageType());
Element e = null;
e = doc.createElement("sourcePid", localPeerId);
doc.appendChild(e);
e = doc.createElement("targetPid", peerid);
doc.appendChild(e);
ResolverQuery query = null;
String ds = docToString(doc);
// protect increment of the query id
synchronized (this) {
query = new ResolverQuery
(handlerName, "JXTACRED",
localPeerId.toString(), ds, qid++);
}
if (listener != null) {
listenerTable.put(new Integer(query.getQueryId()), listener);
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("sending query " + query.getQueryId() +
" from " + localPeerId + " to " + peerid );
}
resolver.sendQuery( peerid.toString(), query);
}
/**
* Get PeerInfoService from local cache.
*
*@param peer Address of a peer. Can be null in which case
* locally cached PeerInfoResponseMessage of all peers are returned
*@return An Enumeration of locally cached
* advertisements for this peer
*@exception IOException
*/
public Enumeration getLocalPeerInfo(ID peerid) throws IOException {
Vector result = new Vector();
// If this is invoked before startApp(), just pretend.
if (cm == null) {
return result.elements();
}
if (peerid == null) {
// since no peer is specified, return advts for all peers
String[] fn = cm.getFileNames(dirname);
for (int i = 0; i < fn.length; i++) {
FileInputStream is = null;
Advertisement adv = null;
try {
is = cm.getInputStream(dirname, fn[i]);
adv = (Advertisement)
AdvertisementFactory.newAdvertisement
(new MimeMediaType("text/xml"), is);
result.addElement(adv);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug(e);
}
continue;
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
}
}
}
}
} else {
try {
// peer is specified, get this peer's advt
String fn = peerid.getUniqueValue().toString();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("restoring " + fn + " from " + dirname);
}
StructuredDocument doc = null;
doc = cm.restore(dirname, fn);
PeerInfoResponseMessage adv = new PeerInfoResponseMsg(doc);
result.addElement(adv);
} catch (Exception ignored) {
}
}
return result.elements();
}
/*
* methods we must implement by implementing Service, QueryHandler
*/
/**
* Returns the group to which this service is attached.
*
*@return PeerGroup the group
*/
public PeerGroup getGroup() {
return pg;
}
/**
* Handler API method
*
*@param query ResolverQueryMsg
*@return ResolverResponseMsg response
*@NoResponseException thrown when no response is generated
*@throws NoResponseException if no Responses were received
*/
public ResolverResponseMsg processQuery(ResolverQueryMsg query)
throws NoResponseException {
StructuredTextDocument doc = null;
try {
doc = (StructuredTextDocument)
StructuredDocumentFactory.newStructuredDocument(
new MimeMediaType("text/xml"),
new ByteArrayInputStream((query.getQuery()).getBytes()));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("PeerInfoService.processQuery got a bad adv", e);
}
return null;
}
PeerInfoQueryMessage pipquery = new PeerInfoQueryMsg( doc );
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("processing query " + query.getQueryId() +
" at peer " + localPeerId + " from " +
pipquery.getSourcePid() + " to " + pipquery.getTargetPid() );
}
// was this query addressed to us?
if (localPeerId.equals(pipquery.getTargetPid())) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("responding to query " + query.getQueryId());
}
PeerInfoResponseMessage response = new PeerInfoResponseMsg( );
response.setSourcePid(pipquery.getSourcePid());
response.setTargetPid(localPeerId);
long now = System.currentTimeMillis();
response.setUptime(now- startTime);
response.setTimestamp(now);
response.setLastIncomingMessageTime(incomingStats.getLastMessageTime());
response.setLastOutgoingMessageTime(outgoingStats.getLastMessageTime());
Enumeration ichannels = incomingStats.getChannelNames();
while (ichannels.hasMoreElements()) {
String ch = (String) ichannels.nextElement();
long tr = incomingStats.getTrafficOnChannel(ch);
response.setIncomingTrafficElement( ch, tr );
}
Enumeration ochannels = outgoingStats.getChannelNames();
while (ochannels.hasMoreElements()) {
String ch = (String) ochannels.nextElement();
long tr = outgoingStats.getTrafficOnChannel(ch);
response.setOutgoingTrafficElement( ch, tr );
}
doc = (StructuredTextDocument)
response.getDocument( new MimeMediaType( "text/xml" ) );
} else {
// just passing thru - do not change the message
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("forwarding query " + query.getQueryId());
}
try {
doc = (StructuredTextDocument)
pipquery.getDocument(new MimeMediaType("text/xml"));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Failure in gettting pip query", e);
}
return null;
}
}
ResolverResponse response = null;
response = new ResolverResponse
(handlerName, "JXTACRED",
query.getQueryId(), docToString(doc) );
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("sending response to " + response.getQueryId());
}
return response;
}
/**
* deal with incoming responses
*
*@param response
*/
public void processResponse(ResolverResponseMsg response) {
// If this is invoked before startApp(), just pretend.
if (cm == null) {
return;
}
PeerInfoResponseMessage resp = null;
try {
StructuredDocument doc = StructuredDocumentFactory.newStructuredDocument(
new MimeMediaType("text/xml"),
new ByteArrayInputStream(response.getResponse().getBytes()) );
resp = (PeerInfoResponseMessage) new PeerInfoResponseMsg( doc );
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("PeerInfoService.processResponse got a bad adv",e);
}
return;
}
// are there any registered peer info listeners,
// generate the event and callback.
//XXX FIXME response should be immutable
PeerInfoEvent newevent =
new PeerInfoEvent(this, resp, response.getQueryId());
PeerInfoListener listener = (PeerInfoListener)
listenerTable.get(new Integer(response.getQueryId()));
if (listener != null) {
listener.peerInfoResponse(newevent);
}
for (int i = 0; i < listeners.size(); i++) {
listener = (PeerInfoListener) listeners.elementAt(i);
listener.peerInfoResponse(newevent);
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("processing response " + response.getQueryId() +
" at peer " +
Integer.toHexString(localPeerId.hashCode()) +
" from " +
Integer.toHexString(resp.getSourcePid().hashCode()) +
" to " +
Integer.toHexString(resp.getTargetPid().hashCode()));
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("publishing response " + response.getQueryId());
}
try {
StructuredDocument doc = (StructuredDocument)
resp.getDocument(new MimeMediaType("text/xml"));
/*
* overwrite the status of the peer. As long as the
* target pid is the same, it should hash to the same
* filename
*/
String fn = resp.getTargetPid().getUniqueValue().toString();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("publishing response " + response.getQueryId() +
" in " + fn);
}
cm.save(dirname, fn, doc);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug(e);
}
return;
}
}
/**
* flush cached advertisements.
*
*@param id peerId of peer whose locally cached adv is to
* be deleted. If null, cached advs of all peers are deleted.
*@exception IOException
*/
public void flushAdvertisements(ID id) throws IOException {
// If this is invoked before startApp(), just pretend.
if (cm == null) {
return;
}
if (id != null) {
String fn = id.getUniqueValue().toString();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("flushing advertisements for peer " +
Integer.toHexString(id.hashCode()) +
" fn " + fn);
}
cm.remove(dirname, fn);
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("flushing all advertisements");
}
String[] fn = cm.getFileNames(dirname);
for (int i = 0; i < fn.length; i++) {
cm.remove(dirname, fn[i]);
}
}
}
/**
* Add a peerinfo listener
*
*@param listener The feature to be added to the
* PeerInfoListener attribute
*/
public synchronized void addPeerInfoListener(PeerInfoListener listener) {
listeners.addElement(listener);
}
/**
* remove a peer info listener
*
*@param listener
*@return true if the argument was a component of this
* vector; false otherwise
*/
public synchronized boolean removePeerInfoListener(PeerInfoListener listener) {
return (listeners.removeElement(listener));
}
/**
* converts a doc to a string
*
*@param doc
*@return
*/
private String docToString(StructuredTextDocument doc) {
StringWriter out = new StringWriter();
try {
doc.sendToWriter(out);
return out.toString();
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("docToString failed", e);
}
return "";
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -