📄 proxyservice.java
字号:
discoveryType = DiscoveryService.GROUP;
} else {
discoveryType = DiscoveryService.ADV;
}
Enumeration enum = null;
try {
enum = discovery.getLocalAdvertisements(discoveryType,
attribute,
value);
} catch (IOException e) {
requestor.notifyError("could not search locally");
}
Advertisement adv = null;
while (enum != null && enum.hasMoreElements()) {
adv = (Advertisement) enum.nextElement();
// notify the requestor of the result
requestor.send(adv, RESPONSE_RESULT);
}
// start the query
int queryId = discovery.getRemoteAdvertisements(null, // peerId
discoveryType,
attribute,
value,
DEFAULT_THRESHOLD);
// register the query
// FIXME: jice@jxta.org - 20020515
// Right now the client API and client-proxy protocol lacks a way
// to cancel a discovery request, so we have absolutely no way to
// remove these requests from the list. On top of that, the above
// does not use the listener interface at all, so we never ever get
// an event to report to the client, thus the searchRequests list is
// not usefull. Finally, we have no way to detect redundant queries
// which puts us at the mercy of silly clients. So, it is better
// to just not enqueue the request right now. From the client's point
// of view the difference is not noticeable; all responses look
// asynchronous, even if respond immediately with a localy found adv.
// However, if we do not have a local response, the client will
// never get a response until it retries, and everytime it does
// it cost us a remote disco :-(
// Suppressing the following line does neither improve nor worsen that.
// but it avoids leaking resources.
// searchRequests.put(new Integer(queryId), requestor);
}
/**
*/
private void handleListenRequest(Requestor requestor,
String name,
String id,
String arg) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("handleListenRequest name=" + name +
" id=" + id +
" arg=" + arg);
}
PipeAdvertisement pipeAdv = null;
if (id != null && id.length() > 0) {
pipeAdv = createPipeAdvertisement(name, id, arg);
} else {
pipeAdv = findPipeAdvertisement(name, id, arg);
// send a copy of the pipe to the requestor
requestor.send(pipeAdv, RESPONSE_INFO);
}
if (pipeAdv == null) {
requestor.notifyError("Could not find pipe");
return;
}
String pipeId = pipeAdv.getPipeID().toString();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("listen to pipe name=" + pipeAdv.getName() +
" id=" + pipeAdv.getPipeID().toString() +
" arg=" + pipeAdv.getType());
}
// check to see if the input pipe already exist
PipeListenerList list = (PipeListenerList)pipeListeners.get(pipeId);
if (list == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("first listener, create input pipe");
}
// create an input pipe
try {
list = new PipeListenerList(pipe.createInputPipe(pipeAdv, this),
pipeListeners, pipeId);
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("could not listen to pipe", e);
}
requestor.notifyError("could not listen to pipe");
return;
}
pipeListeners.put(pipeId, list);
}
// add requestor to list
list.add(requestor);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("add requestor=" + requestor + " id=" + pipeId +
" list=" + list);
LOG.debug("publish PipeAdvertisement");
}
// advertise the pipe locally
try {
discovery.publish(pipeAdv, DiscoveryService.ADV,
DEFAULT_LIFETIME, DEFAULT_LIFETIME);
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("Could not publish pipe advertisement");
}
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("done with listen request");
}
// notify requestor of success
requestor.notifySuccess();
}
/**
*/
private void handleCloseRequest(Requestor requestor,
String id) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("handleCloseRequest id=" + id);
}
PipeListenerList list = (PipeListenerList)pipeListeners.get(id);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("handleCloseRequest list = " + list);
}
if (list != null) {
list.remove(requestor);
if (list.size() == 0) {
pipeListeners.remove(id);
}
}
// notify requestor of success
requestor.notifySuccess();
}
// Send the given message to the given pipe.
private void sendToPipe(Requestor req, Message mess,
OutputPipe out) {
try {
out.send(mess);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("output pipe send end");
}
// notify requestor of success
req.notifySuccess();
} catch (IOException e) {
req.notifyError("could not send to pipe");
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("could not send to pipe", e);
}
return;
}
}
class ClientMessage {
private Requestor requestor;
private Message message;
public ClientMessage(Requestor req, Message mess) {
requestor = req;
message = mess;
}
// Send this (pending) message
public void send(OutputPipe out) {
sendToPipe(requestor, message, out);
}
}
class PendingPipe {
private ClientMessage pending;
public PendingPipe() {
pending = null;
}
// Just got resolved ! Will send the pending message(s).
public void sendPending(OutputPipe out) {
pending.send(out);
pending = null;
}
// Enqueue a new pending message.
// (for now we only enqueue 1; others get trashed)
public void enqueue(Requestor req, Message mess) {
if (pending != null) {
return;
}
pending = new ClientMessage(req, mess);
}
}
/**
*/
private void handleSendRequest(Requestor requestor,
String name,
String id,
String arg,
Message message) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("handleSendRequest name=" + name +
" id=" + id +
" arg=" + arg);
}
PipeAdvertisement pipeAdv = null;
if (id != null && id.length() > 0) {
pipeAdv = createPipeAdvertisement(name, id, arg);
} else {
pipeAdv = findPipeAdvertisement(name, id, arg);
// send a copy of the pipe to the requestor
requestor.send(pipeAdv, RESPONSE_INFO);
}
if (pipeAdv == null) {
requestor.notifyError("Could not find pipe");
return;
}
String pipeId = pipeAdv.getPipeID().toString();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("send to pipe name=" + pipeAdv.getName() +
" id=" + pipeAdv.getPipeID().toString() +
" arg=" + pipeAdv.getType());
}
// check if there are local listeners
PipeListenerList list = (PipeListenerList)pipeListeners.get(pipeId);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("local listener list " + list);
}
if (list != null && PipeService.UnicastType.equals(pipeAdv.getType())) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("start sending to each requestor");
}
list.send((Message)message.clone(), pipeId);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("end sending to each requestor");
}
// notify requestor of success
requestor.notifySuccess();
return;
}
// NOTE: This part is NOT exercised by the load test because all
// clients are local. To exercise this part, comment out the
//爋ptimization above.
// This is not a unicast pipe with at least one local listener
// so we need to fingure out where the message should go.
// This may take a while and has to be done asynchronously...
// Carefull that the resolution can occur synchronously by this
// very thread, and java lock will not prevent re-entry.
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("output pipe creation begin");
}
// Look for the pipe in the resolved list. If not found
// look in the pending list or add it there.
OutputPipe out = (OutputPipe) resolvedPipes.get(pipeId);
if (out != null) {
sendToPipe(requestor, message, out);
return;
}
PendingPipe p = (PendingPipe) pendingPipes.get(pipeId);
if (p != null) {
p.enqueue(requestor, message);
return;
}
try {
p = new PendingPipe();
p.enqueue(requestor, message);
pendingPipes.put(pipeId, p);
pipe.createOutputPipe(pipeAdv, this);
} catch (IOException e) {
pendingPipes.remove(pipeId);
requestor.notifyError("could not create output pipe");
return;
}
}
private PeerAdvertisement createPeerAdvertisement(String name,
String id) {
PeerAdvertisement adv = null;
PeerID pid = null;
if (id != null) {
try {
ID tempId = IDFactory.fromURL(IDFactory.jxtaURL(id));
if (tempId instanceof PeerID) {
pid = (PeerID)tempId;
}
} catch (UnknownServiceException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("Invalid peerId", e);
}
} catch (MalformedURLException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("Could not parse peerId from url", e);
}
}
}
if (pid == null) {
pid = IDFactory.newPeerID(group.getPeerGroupID());
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("newPeerAdvertisement name="+name+" id="+pid.toString());
}
try {
// Create a pipe advertisement for this pipe.
adv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(
PeerAdvertisement.getAdvertisementType());
adv.setName(name);
adv.setPeerID(pid);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("newPeerAdvertisement Exception", e);
}
}
return adv;
}
private PeerGroupAdvertisement createGroupAdvertisement(String name,
String id) {
PeerGroupAdvertisement adv = null;
PeerGroupID gid = null;
if (id != null) {
try {
ID tempId = IDFactory.fromURL(IDFactory.jxtaURL(id));
if (tempId instanceof PeerGroupID) {
gid = (PeerGroupID)tempId;
}
} catch (UnknownServiceException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("Invalid peergroupId", e);
}
} catch (MalformedURLException e) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("Could not parse peergroupId from url", e);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -