📄 rdvpeerrdvservice.java
字号:
if( clients.size() < maxNbOfClients ) { lease = leaseDuration; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Offering new client lease to " + padv.getName() + " [" + padv.getPeerID() + "]"); } } else { lease = 0; if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Max clients exceeded, declining lease request from: " + padv.getName() + " [" + padv.getPeerID() + "]"); } } } if (lease > 0) { pConn = addClient(padv, lease); // FIXME 20041015 bondolo We're supposed to send a lease 0 if we can't accept new clients. sendLease(pConn, leaseDuration); } } /** * Sends a Connected lease reply message to the specified peer * * @param pConn The client peer. * @param lease lease duration. * @return Description of the Returned Value */ private boolean sendLease(ClientConnection pConn, long lease) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending lease (" + lease + ") to " + pConn.getPeerName()); } Message msg = new Message(); msg.addMessageElement("jxta", new TextDocumentMessageElement(ConnectedRdvAdvReply, getPeerAdvertisementDoc(), null)); msg.addMessageElement("jxta", new StringMessageElement(ConnectedPeerReply, group.getPeerID().toString(), null)); msg.addMessageElement("jxta", new StringMessageElement(ConnectedLeaseReply, Long.toString(lease), null)); return pConn.sendMessage(msg, pName, pParam); } /** * {@inheritDoc} **/ public void walk(Message msg, String serviceName, String serviceParam, int ttl) throws IOException { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Walk initiated for " + msg + " [" + serviceName + "/" + serviceParam + "]"); } if (walker == null) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkFailed(); } // The walker is not yet initialized. Fail. IOException failure = new IOException("Cannot walk message : no walker"); if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Cannot walk message : no walker", failure); } throw failure; } msg.replaceMessageElement("jxta", new StringMessageElement(RDV_SVC_NAME, serviceName, null)); msg.replaceMessageElement("jxta", new StringMessageElement(RDV_SVC_PARAM, serviceParam, null)); try { walker.sendMessage( null, msg, pName, pParam, ttl, null ); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walk(); } } catch (IOException failure) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkFailed(); } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot send message with Walker", failure); } IOException failed = new IOException("Cannot send message with Walker"); failed.initCause(failure); throw failed; } } /** * {@inheritDoc} **/ public void walk(Vector destPeerIDs, Message msg, String serviceName, String serviceParam, int defaultTTL) throws IOException { if ((destPeerIDs == null) || (destPeerIDs.size() == 0)) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("No destination"); } throw new IOException("no destination"); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("walk initiated for :" + "\n\tsvc name:" + serviceName + "\tsvc params:" + serviceParam); } if (walker == null) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkFailed(); } // The walker is not yet initialized. Fail. IOException failure = new IOException("Cannot walk message : no walker"); if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Cannot walk message : no walker", failure); } throw failure; } msg.replaceMessageElement("jxta", new StringMessageElement(RDV_SVC_NAME, serviceName, null)); msg.replaceMessageElement("jxta", new StringMessageElement(RDV_SVC_PARAM, serviceParam, null)); PeerID dest = null; for (int i = 0; i < destPeerIDs.size(); ++i) { try { dest = (PeerID) destPeerIDs.elementAt(i); Message tmpMsg = (Message) msg.clone(); walker.sendMessage( dest, tmpMsg, pName, pParam, defaultTTL, null ); } catch (Exception failed) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkToPeersFailed(); } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot send message with Walker to: " + dest, failed); } IOException failure = new IOException("Cannot send message with Walker to: " + dest ); failure.initCause( failed ); } } if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkToPeers(destPeerIDs.size()); } } private void walkInit() { // Create a LimitedRange Walk walk = new LimitedRangeWalk(group, pName, pParam, rdvService.rpv); // Get a Greeter greeter = walk.getGreeter(); if (greeter == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot get Greeter"); } return; } // Start the Greeter greeter.start(); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Greeter listening on " + pName + "/" + pParam); } // We need to use a Walker in order to propagate the request // when when have no answer. walker = walk.getWalker(); if (walker == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Cannot get Walker"); } return; } // Set our Endpoint Listener walkListener = new WalkListener(); greeter.setEndpointListener(walkListener); } /** * Periodic cleanup task **/ private class GCTask extends TimerTask { /** * {@inheritDoc **/ public void run() { try { long gcStart = TimeUtils.timeNow(); int gcedClients = 0; List allClients = Arrays.asList(clients.values().toArray()); Iterator eachClient = allClients.iterator(); while (eachClient.hasNext()) { ClientConnection pConn = (ClientConnection) eachClient.next(); try { long now = TimeUtils.timeNow(); if (!pConn.isConnected() || (pConn.getLeaseEnd() < now)) { // This client has dropped out or the lease is over. // remove it. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("GC CLIENT: dropping " + pConn); } pConn.setConnected(false); removeClient(pConn, false); gcedClients++; } } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("GCTask failed for " + pConn, e); } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Client GC " + gcedClients + " of " + allClients.size() + " clients completed in " + TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), gcStart) + "ms." ); } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } } } /** * @inheritDoc **/ private class WalkListener implements EndpointListener { /** * {@inheritDoc} **/ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { MessageElement serviceME = msg.getMessageElement("jxta", RDV_SVC_NAME); if (null == serviceME) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Discarding " + msg + " because its missing service name element"); } return; } msg.removeMessageElement(serviceME); String sName = serviceME.toString(); MessageElement paramME = msg.getMessageElement("jxta", RDV_SVC_PARAM); String sParam; if (null == paramME) { sParam = null; } else { msg.removeMessageElement(paramME); sParam = paramME.toString(); } EndpointAddress realDest = new EndpointAddress(dstAddr, sName, sParam); EndpointListener listener = rdvService.getListener(sName + sParam); if (listener != null) { // We have a local listener for this message. // Deliver it. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Calling local listener for [" + sName + "/" + sParam + "] with " + msg); } try { listener.processIncomingMessage(msg, srcAddr, realDest); } catch (Throwable ignored) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable during callback of (" + listener + ") to " + sName + "/" + sParam, ignored); } } if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.receivedMessageProcessedLocally(); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -