📄 proxyservice.java
字号:
}
}
if (gid == null) {
gid = IDFactory.newPeerGroupID();
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("newPeerGroupAdvertisement name="+name+" id="+gid.toString());
}
try {
// Create a pipe advertisement for this pipe.
adv = (PeerGroupAdvertisement) AdvertisementFactory.newAdvertisement(
PeerGroupAdvertisement.getAdvertisementType());
adv.setName(name);
adv.setPeerGroupID(gid);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("newPeerGroupAdvertisement Exception", e);
}
}
return adv;
}
private PipeAdvertisement createPipeAdvertisement(String pipeName,
String pipeId,
String pipeType) {
PipeAdvertisement adv = null;
LOG.debug("newPipeAdvertisement name="+pipeName+" pipeId="+pipeId+" pipeType="+pipeType);
if (pipeType == null || pipeType.length() == 0) {
pipeType = PipeService.UnicastType;
}
if (pipeId == null) {
pipeId = IDFactory.newPipeID(group.getPeerGroupID()).toString();
}
try {
// Create a pipe advertisement for this pipe.
adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(
PipeAdvertisement.getAdvertisementType());
adv.setName(pipeName);
adv.setPipeID(IDFactory.fromURL(IDFactory.jxtaURL(pipeId)));
adv.setType (pipeType);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("newPipeAdvertisement Exception", e);
}
}
return adv;
}
private PipeAdvertisement findPipeAdvertisement(String name,
String id,
String arg) {
String attribute, value;
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("findPipeAdvertisement name=" + name +
" id=" + id + " arg=" + arg);
}
if (id != null) {
attribute = PipeAdvertisement.IdTag;
value = id;
} else if (name != null) {
attribute = PipeAdvertisement.NameTag;
value = name;
} else {
// the id or the name must be specified
return null;
}
if (arg == null) {
// the default pipe type
arg = PipeService.UnicastType;
}
Enumeration enum = null;
try {
enum = discovery.getLocalAdvertisements(DiscoveryService.ADV,
attribute,
value);
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("IOException in getLocalAdvertisements()", e);
}
return null;
}
PipeAdvertisement pipeAdv = null;
Advertisement adv = null;
while (enum != null && enum.hasMoreElements()) {
adv = (Advertisement) enum.nextElement();
// take the first match
if (adv instanceof PipeAdvertisement) {
pipeAdv = (PipeAdvertisement)adv;
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("found PipeAdvertisement = " + pipeAdv);
}
break;
}
}
if (pipeAdv == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("create PipeAdvertisement");
}
// create the pipe
pipeAdv = createPipeAdvertisement(name, id, arg);
}
return pipeAdv;
}
public synchronized void discoveryEvent(DiscoveryEvent event) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("discoveryEvent " + event);
}
Requestor requestor;
requestor = (Requestor) searchRequests.get(new Integer(event.getQueryID()));
if (requestor == null) {
return;
}
DiscoveryResponseMsg response = event.getResponse();
if (response == null) {
return;
}
Enumeration enum = response.getResponses();
if (enum == null || !enum.hasMoreElements()) {
return;
}
while (enum.hasMoreElements()) {
try {
String str = (String) enum.nextElement();
// Create Advertisement from response.
Advertisement adv = (Advertisement)
AdvertisementFactory.newAdvertisement(new MimeMediaType("text/xml"),
new ByteArrayInputStream(str.getBytes()));
// notify the requestor of the result
requestor.send(adv, RESPONSE_RESULT);
} catch (Exception e) {
// this should not happen unless a bad result is returned
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("Bad result returned by DiscoveryService", e);
}
}
}
}
public synchronized void pipeMsgEvent(PipeMsgEvent event) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("pipeMsgEvent " + event.getPipeID());
}
String id = event.getPipeID().toString();
PipeListenerList list = (PipeListenerList)pipeListeners.get(id);
if (list != null) {
Message message = event.getMessage();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("pipeMsgEvent: start sending to each requestor");
}
list.send((Message)message.clone(), id);
LOG.debug("pipeMsgEvent: end sending to each requestor");
} else {
// there are no listeners, close the input pipe
((InputPipe)event.getSource()).close();
LOG.debug("close pipe id=" + id);
}
}
public synchronized void outputPipeEvent(OutputPipeEvent event) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("outputPipeEvent " + event);
}
PendingPipe p =
(PendingPipe) pendingPipes.remove(event.getPipeID());
// Noone cares (anylonger).
if (p == null) {
event.getOutputPipe().close();
return;
}
resolvedPipes.put(event.getPipeID(), event.getOutputPipe());
p.sendPending(event.getOutputPipe());
}
private static String popString(String name, Message message) {
String value = message.getString(name);
message.removeElement(name);
return value;
}
static class PipeListenerList {
LinkedList list = new LinkedList();
InputPipe inputPipe = null;
Map pipeListeners = null;
String id = null;
PipeListenerList(InputPipe inputPipe,
Map pipeListeners,
String id) {
this.inputPipe = inputPipe;
this.pipeListeners = pipeListeners;
this.id = id;
if (pipeListeners != null) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("number of pipeListeners = " + pipeListeners.size());
}
}
}
void add(Requestor requestor) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("add " + requestor + " from " + toString());
}
if (!list.contains(requestor)) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("requestor add");
}
list.add(requestor);
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("requestor exits already");
}
}
}
void remove(Requestor requestor) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("remove " + requestor + " from " + toString());
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("removed = " + list.remove(requestor));
}
if (list.size() == 0) {
// close the pipe and remove from the listenerList
if (inputPipe != null) {
inputPipe.close();
}
if (id != null && pipeListeners != null) {
pipeListeners.remove(id);
}
}
}
int size() {
int size = list.size();
LOG.debug("size " + size);
return size;
}
void send(Message message, String id) {
LOG.debug("send list.size = " + list.size());
message.setString(RESPONSE_TAG, RESPONSE_MESSAGE);
message.setString(ID_TAG, id);
// removed all element that are known to be not needed
StringEnumeration names = message.getNames();
while (names.hasMoreStrings()) {
String name = names.nextString();
if (name.startsWith("RendezVousPropagate")) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("removeElement " + name);
}
message.removeElement(name);
} else if (name.startsWith("JxtaWireHeader")) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("removeElement " + name);
}
message.removeElement(name);
} else if (name.startsWith("JxtaEndpointRouter")) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("removeElement " + name);
}
message.removeElement(name);
} else if (name.startsWith("jxta:EndpointHeaderSrcPeer")) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("removeElement " + name);
}
message.removeElement(name);
}
}
Iterator iterator = list.iterator();
while (iterator.hasNext()) {
Requestor requestor = (Requestor)iterator.next();
if (requestor.send((Message)message.clone()) == false) {
// could not send to listener, remove them from the list
remove(requestor);
}
}
}
public String toString() {
String str = "PipeListenerList size=" + list.size();
return str;
}
}
protected static void logMessage(Message message, Category log) {
String out = "\n**************** begin ****************\n";
StringEnumeration names = message.getNames();
MessageElement element;
while (names.hasMoreStrings()) {
String name = names.nextString();
String data = message.getString(name);
out += "[" + name + "]=" + data + "\n";
}
log.debug(out + "**************** end ****************\n");
}
// This has to be a static so that it does not get garbage collected after
// main() exits
static PeerGroup netPeerGroup = null;
static public void main(String args[]) throws PeerGroupException {
netPeerGroup = PeerGroupFactory.newNetPeerGroup();
Category cat = Category.getInstance("net.jxta.impl.proxy");
cat.setPriority(Priority.DEBUG);
// cat = Category.getInstance("net.jxta.impl.endpoint.servlethttp.HttpRelayServlet");
// cat.setPriority(Priority.DEBUG);
}
/****************************************************************
* Implement the CacheEntryListener *
***************************************************************/
public void purged(CacheEntry ce) {
// A resolved pipe was purged from the cache because we have to
// many pre-resolved pipes hanging around. Close it, because
// it may be holding critical resources that the GC will not be
// sensitive to.
((OutputPipe)(ce.getValue())).close();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -