📄 proxyservice.java
字号:
} // set non-deft passwd passwords.put(grpId, passwd); proxiedGroups.put(grpId, g); requestor.notifySuccess(); } private void handleCreateRequest(Requestor requestor, String type, String name, String id, String arg) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleCreateRequest type=" + type + " name=" + name + " id=" + id + " arg=" + arg); } if (name == null) { name = ""; // default name } if (TYPE_PEER.equals(type)) { PeerAdvertisement adv = createPeerAdvertisement(name, id); if (adv != null) { try { discovery.publish(adv); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not publish peer advertisement", e); } } requestor.send(adv, RESPONSE_SUCCESS); } else { requestor.notifyError("could not create advertisement"); } } else if (TYPE_GROUP.equals(type)) { PeerGroupAdvertisement adv = createGroupAdvertisement(name, id); if (adv != null) { requestor.send(adv, RESPONSE_SUCCESS); } else { requestor.notifyError("could not create advertisement"); } } else if (TYPE_PIPE.equals(type)) { if (arg == null) { arg = PipeService.UnicastType; // default pipe type } PipeAdvertisement adv = createPipeAdvertisement(name, id, arg); if (adv != null) { try { discovery.publish(adv); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not publish pipe advertisement", e); } } requestor.send(adv, RESPONSE_SUCCESS); } else { requestor.notifyError("could not create advertisement"); } } else { requestor.notifyError("unsupported type"); } } private void handleSearchRequest(Requestor requestor, String type, String attribute, String value, String threshold) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleSearchRequest type=" + type + " attribute=" + attribute + " value=" + value + " threshold=" + threshold); } int discoveryType; int thr = DEFAULT_THRESHOLD; try { thr = Integer.parseInt(threshold); } catch (NumberFormatException nex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("handleSearchRequest failed to parse threshold " + threshold + ", using default " + DEFAULT_THRESHOLD); } } requestor.setThreshold(thr); if (TYPE_PEER.equals(type)) { discoveryType = DiscoveryService.PEER; } else if (TYPE_GROUP.equals(type)) { discoveryType = DiscoveryService.GROUP; } else { discoveryType = DiscoveryService.ADV; } Enumeration<Advertisement> each = null; try { each = discovery.getLocalAdvertisements(discoveryType, attribute, value); } catch (IOException e) { requestor.notifyError("could not search locally"); } int i = 0; while (each.hasMoreElements() && i < thr) { Advertisement adv = each.nextElement(); // notify the requestor of the result // FIXME this can be optimized by sending all adv's in a // single message requestor.send(adv, RESPONSE_RESULT); i++; } // start the query int queryId = discovery.getRemoteAdvertisements(null, discoveryType, attribute, value, thr); // register the query searchRequests.put(queryId, requestor); } /** * Finds a JXTA Pipe and starts listening to it. * * @param requestor the peer sending listen request. * @param id the id of the Pipe. */ private void handleListenRequest(Requestor requestor, String id) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleListenRequest id=" + id); } if (id == null) { requestor.notifyError("Pipe ID not specified"); return; } PipeAdvertisement pipeAdv = findPipeAdvertisement(null, id, null); if (pipeAdv == null) { requestor.notifyError("Pipe Advertisement not found"); return; } String pipeId = pipeAdv.getPipeID().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("listen to pipe name=" + pipeAdv.getName() + " id=" + pipeAdv.getPipeID() + " type=" + pipeAdv.getType()); } // check to see if the input pipe already exist PipeListenerList list = pipeListeners.get(pipeId); if (list == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("first listener, create input pipe"); } // create an input pipe try { list = new PipeListenerList(pipe.createInputPipe(pipeAdv, this), pipeListeners, pipeId); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "could not listen to pipe", e); } requestor.notifyError("could not listen to pipe"); return; } pipeListeners.put(pipeId, list); } // add requestor to list list.add(requestor); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("add requestor=" + requestor + " id=" + pipeId + " list=" + list); LOG.fine("publish PipeAdvertisement"); } // advertise the pipe locally try { discovery.publish(pipeAdv); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not publish pipe advertisement", e); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("done with listen request"); } // notify requestor of success requestor.notifySuccess(); } private void handleCloseRequest(Requestor requestor, String id) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleCloseRequest id=" + id); } PipeListenerList list = pipeListeners.get(id); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleCloseRequest list = " + list); } if (list != null) { list.remove(requestor); if (list.size() == 0) { pipeListeners.remove(id); } } // notify requestor of success requestor.notifySuccess(); } // Send the given message to the given pipe. private void sendToPipe(Requestor req, Message mess, OutputPipe out) { try { out.send(mess); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("output pipe send end"); } // notify requestor of success req.notifySuccess(); } catch (IOException e) { req.notifyError("could not send to pipe"); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "could not send to pipe", e); } } } class ClientMessage { private Requestor requestor; private Message message; public ClientMessage(Requestor req, Message mess) { requestor = req; message = mess; } // Send this (pending) message public void send(OutputPipe out) { sendToPipe(requestor, message, out); } } class PendingPipe { private ClientMessage pending; public PendingPipe() { pending = null; } // Just got resolved ! Will send the pending message(s). public void sendPending(OutputPipe out) { pending.send(out); pending = null; } // Enqueue a new pending message. // (for now we only enqueue 1; others get trashed) public void enqueue(Requestor req, Message mess) { if (pending != null) { return; } pending = new ClientMessage(req, mess); } } private void handleSendRequest(Requestor requestor, String id, Message message) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("handleSendRequest id=" + id); } PipeAdvertisement pipeAdv = findPipeAdvertisement(null, id, null); if (pipeAdv == null) { requestor.notifyError("Could not find pipe"); return; } String pipeId = pipeAdv.getPipeID().toString(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "send to pipe name=" + pipeAdv.getName() + " id=" + pipeAdv.getPipeID().toString() + " arg=" + pipeAdv.getType()); } // check if there are local listeners PipeListenerList list = pipeListeners.get(pipeId); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("local listener list " + list); } if (list != null && PipeService.UnicastType.equals(pipeAdv.getType())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("start sending to each requestor"); } list.send(message, pipeId); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("end sending to each requestor"); } // notify requestor of success requestor.notifySuccess(); return; } // NOTE: This part is NOT exercised by the load test because all // clients are local. To exercise this part, comment out the // optimization above. // This is not a unicast pipe with at least one local listener // so we need to fingure out where the message should go. // This may take a while and has to be done asynchronously... // Carefull that the resolution can occur synchronously by this // very thread, and java lock will not prevent re-entry. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("output pipe creation begin"); } // Look for the pipe in the resolved list. If not found // look in the pending list or add it there. OutputPipe out = (OutputPipe) resolvedPipes.get(pipeId); if (out != null) { sendToPipe(requestor, message, out); return; } PendingPipe p = (PendingPipe) pendingPipes.get(pipeId); if (p != null) { p.enqueue(requestor, message); return; } try { p = new PendingPipe(); p.enqueue(requestor, message); pendingPipes.put(pipeId, p); pipe.createOutputPipe(pipeAdv, this); } catch (IOException e) { pendingPipes.remove(pipeId); requestor.notifyError("could not create output pipe"); } } // TBD: DO WE NEED THIS FUNCTIONALITY FOR JXME? private PeerAdvertisement createPeerAdvertisement(String name, String id) { PeerAdvertisement adv = null; PeerID pid = null; if (id != null) { try { ID tempId = IDFactory.fromURI(new URI(id)); pid = (PeerID) tempId; } catch (URISyntaxException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not parse peerId from url", e); } } catch (ClassCastException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "id was not a peerid", e); } } } if (pid == null) { pid = IDFactory.newPeerID(group.getPeerGroupID()); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("newPeerAdvertisement name=" + name + " id=" + pid.toString()); } try { // Create a pipe advertisement for this pipe. adv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(PeerAdvertisement.getAdvertisementType()); adv.setPeerID(pid); adv.setPeerGroupID(group.getPeerGroupID()); adv.setName(name); adv.setDescription("Peer Advertisement created for a jxme device"); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "newPeerAdvertisement Exception", e); } } return adv; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -