📄 rdvpeerrdvservice.java
字号:
rendezvousMeter.propagateToGroup(); } } } /** * @inheritDoc */ @Override public PeerConnection getPeerConnection(ID peer) { return clients.get(peer); } /** * @inheritDoc */ @Override protected PeerConnection[] getPeerConnections() { return clients.values().toArray(new PeerConnection[0]); } /** * Add a client to our collection of clients. * * @param padv The advertisement of the peer to be added. * @param lease The lease duration in relative milliseconds. * @return the ClientConnection */ private ClientConnection addClient(PeerAdvertisement padv, long lease) { ClientConnectionMeter clientConnectionMeter = null; int eventType; ClientConnection pConn; synchronized (clients) { pConn = clients.get(padv.getPeerID()); // Check if the peer is already registered. if (null != pConn) { eventType = RendezvousEvent.CLIENTRECONNECT; } else { eventType = RendezvousEvent.CLIENTCONNECT; pConn = new ClientConnection(group, rdvService, padv.getPeerID()); clients.put(padv.getPeerID(), pConn); } } if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousServiceMonitor != null)) { clientConnectionMeter = rendezvousServiceMonitor.getClientConnectionMeter(padv.getPeerID()); } if (RendezvousEvent.CLIENTCONNECT == eventType) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (clientConnectionMeter != null)) { clientConnectionMeter.clientConnectionEstablished(lease); } } else { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (clientConnectionMeter != null)) { clientConnectionMeter.clientLeaseRenewed(lease); } } rdvService.generateEvent(eventType, padv.getPeerID()); pConn.connect(padv, lease); return pConn; } /** * Removes the specified client from the clients collections. * * @param pConn The connection object to remove. * @param requested If <code>true</code> then the disconnection was * requested by the remote peer. * @return the ClientConnection object of the client or <code>null</code> * if the client was not known. */ private ClientConnection removeClient(PeerConnection pConn, boolean requested) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Disconnecting client " + pConn); } if (pConn.isConnected()) { pConn.setConnected(false); sendDisconnect(pConn); } rdvService.generateEvent(requested ? RendezvousEvent.CLIENTDISCONNECT : RendezvousEvent.CLIENTFAILED, pConn.getPeerID()); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousServiceMonitor != null)) { ClientConnectionMeter clientConnectionMeter = rendezvousServiceMonitor.getClientConnectionMeter( (PeerID) pConn.getPeerID()); clientConnectionMeter.clientConnectionDisconnected(requested); } return clients.remove(pConn.getPeerID()); } private void disconnectAllClients() { for (Object o : Arrays.asList(clients.values().toArray())) { ClientConnection pConn = (ClientConnection) o; try { removeClient(pConn, false); } catch (Exception ez1) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "disconnectClient failed for" + pConn, ez1); } } } } /** * Handle a disconnection request * * @param msg Message containing the disconnection request. */ private void processDisconnectRequest(Message msg) { PeerAdvertisement adv; try { MessageElement elem = msg.getMessageElement("jxta", DisconnectRequest); XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(elem); adv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(asDoc); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not process disconnect request", e); } return; } ClientConnection pConn = clients.get(adv.getPeerID()); if (null != pConn) { pConn.setConnected(false); // Make sure we don't send a disconnect removeClient(pConn, true); } } /** * Handles a lease request message * * @param msg Message containing the lease request */ private void processLeaseRequest(Message msg) { PeerAdvertisement padv; try { MessageElement elem = msg.getMessageElement("jxta", ConnectRequest); XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(elem); padv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(asDoc); msg.removeMessageElement(elem); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Cannot retrieve advertisment from lease request", e); } return; } // Publish the client's peer advertisement try { // This is not our own peer adv so we must not keep it longer than // its expiration time. DiscoveryService discovery = group.getDiscoveryService(); if (null != discovery) { discovery.publish(padv, LEASE_DURATION * 2, 0); } } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Client peer advertisement publish failed", e); } } long lease; ClientConnection pConn = clients.get(padv.getPeerID()); if (null != pConn) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Renewing client lease to " + pConn); } lease = LEASE_DURATION; } else { if (clients.size() < MAX_CLIENTS) { lease = LEASE_DURATION; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Offering new client lease to " + padv.getName() + " [" + padv.getPeerID() + "]"); } } else { lease = 0; if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning( "Max clients exceeded, declining lease request from: " + padv.getName() + " [" + padv.getPeerID() + "]"); } } } if (lease > 0) { pConn = addClient(padv, lease); // FIXME 20041015 bondolo We're supposed to send a lease 0 if we can't accept new clients. sendLease(pConn, lease); } } /** * Sends a Connected lease reply message to the specified peer * * @param pConn The client peer. * @param lease lease duration. * @return Description of the Returned Value */ private boolean sendLease(ClientConnection pConn, long lease) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending lease (" + lease + ") to " + pConn.getPeerName()); } Message msg = new Message(); msg.addMessageElement("jxta", new TextDocumentMessageElement(ConnectedRdvAdvReply, getPeerAdvertisementDoc(), null)); msg.addMessageElement("jxta", new StringMessageElement(ConnectedPeerReply, group.getPeerID().toString(), null)); msg.addMessageElement("jxta", new StringMessageElement(ConnectedLeaseReply, Long.toString(lease), null)); return pConn.sendMessage(msg, pName, pParam); } /** * {@inheritDoc} */ @Override public void walk(Message msg, String serviceName, String serviceParam, int initialTTL) throws IOException { if (closed) { return; } msg = msg.clone(); int useTTL = Math.min(initialTTL, MAX_TTL); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Undirected walk of " + msg + "(TTL=" + useTTL + ") to :" + "\n\tsvc name:" + serviceName + "\tsvc params:" + serviceParam); } msg.replaceMessageElement("jxta", new StringMessageElement(RDV_WALK_SVC_NAME, serviceName, null)); msg.replaceMessageElement("jxta", new StringMessageElement(RDV_WALK_SVC_PARAM, serviceParam, null)); try { walker.walkMessage(null, msg, pName, pParam, useTTL); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walk(); } } catch (IOException failure) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkFailed(); } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Cannot send message with Walker", failure); } throw failure; } } /** * {@inheritDoc} */ @Override public void walk(Vector<? extends ID> destPeerIDs, Message msg, String serviceName, String serviceParam, int initialTTL) throws IOException { if (closed) { return; } msg = msg.clone(); int useTTL = Math.min(initialTTL, MAX_TTL); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Directed walk of " + msg + "(TTL=" + useTTL + ") to :" + "\n\tsvc name:" + serviceName + "\tsvc params:" + serviceParam); } msg.replaceMessageElement("jxta", new StringMessageElement(RDV_WALK_SVC_NAME, serviceName, null)); msg.replaceMessageElement("jxta", new StringMessageElement(RDV_WALK_SVC_PARAM, serviceParam, null)); for (ID destPeerID : destPeerIDs) { try { walker.walkMessage((PeerID) destPeerID, msg.clone(), pName, pParam, useTTL); } catch (IOException failed) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkToPeersFailed(); } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Cannot send message with Walker to: " + destPeerID, failed); } IOException failure = new IOException("Cannot send message with Walker to: " + destPeerID); failure.initCause(failed); throw failure; } } if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.walkToPeers(destPeerIDs.size()); } } /** * Periodic cleanup task */ private class GCTask extends TimerTask { /** * {@inheritDoc */ @Override public void run() { try { long gcStart = TimeUtils.timeNow(); int gcedClients = 0; List allClients = Arrays.asList(clients.values().toArray()); for (Object allClient : allClients) { ClientConnection pConn = (ClientConnection) allClient; try { long now = TimeUtils.timeNow(); if (!pConn.isConnected() || (pConn.getLeaseEnd() < now)) { // This client has dropped out or the lease is over. // remove it. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("GC CLIENT: dropping " + pConn); } pConn.setConnected(false); removeClient(pConn, false); gcedClients++; } } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "GCTask failed for " + pConn, e); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Client GC " + gcedClients + " of " + allClients.size() + " clients completed in " + TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), gcStart) + "ms."); } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } } } /** * @inheritDoc */ private class WalkListener implements EndpointListener { /** * {@inheritDoc} */ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { MessageElement serviceME = msg.getMessageElement("jxta", RDV_WALK_SVC_NAME); if (null == serviceME) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Discarding " + msg + " because its missing service name element"); } return; } msg.removeMessageElement(serviceME); String sName = serviceME.toString(); MessageElement paramME = msg.getMessageElement("jxta", RDV_WALK_SVC_PARAM); String sParam; if (null == paramME) { sParam = null; } else { msg.removeMessageElement(paramME); sParam = paramME.toString(); } EndpointAddress realDest = new EndpointAddress(dstAddr, sName, sParam); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Calling local listener for [" + realDest.getServiceName() + " / " + realDest.getServiceParameter() + "] with " + msg); } rdvService.endpoint.processIncomingMessage(msg, srcAddr, realDest); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.receivedMessageProcessedLocally(); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -