📄 tcptransport.java
字号:
} allowMulticast = false; } if (allowMulticast) { multicastThread = new Thread(myThreadGroup, this, "TCP Multicast Server Listener"); multicastThread.start(); } } // We're fully ready to function. messengerEventListener = endpoint.addMessageTransport(this); } catch (Exception e) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Initialization exception", e); } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("FIXME: there may be threads that need killing."); } throw new PeerGroupException("Initialization exception", e); } if (messengerEventListener == null) { throw new PeerGroupException("Transport registration refused"); } // Cannot start before registration, we could be announcing new messengers while we // do not exist yet ! (And get an NPE because we do not have the messenger listener set). if (unicastServer != null) { if (!unicastServer.start(myThreadGroup)) { throw new PeerGroupException("Unable to start TCP Unicast Server"); } } // Tell tell the world about our configuration. if (LOG.isEnabledFor(Level.INFO)) { StringBuffer configInfo = new StringBuffer("Configuring TCP Transport : " + assignedID); configInfo.append("\n\tGroup Params:"); configInfo.append("\n\t\tGroup: " + group.getPeerGroupName()); configInfo.append("\n\t\tGroup ID: " + group.getPeerGroupID()); configInfo.append("\n\t\tPeer ID: " + group.getPeerID()); configInfo.append("\n\tFrom Adv:"); configInfo.append("\n\t\tProtocol: " + protocolName); configInfo.append("\n\t\tPublic address: " + (serverName == null ? "(unspecified)" : serverName)); configInfo.append("\n\t\tInterface address: " + (interfaceAddressStr == null ? "(unspecified)" : interfaceAddressStr)); configInfo.append("\n\t\tMulticast State: " + (allowMulticast ? "Enabled" : "Disabled")); if (allowMulticast) { configInfo.append("\n\t\t\tMulticastAddr: " + multicastAddress); configInfo.append("\n\t\t\tMulticastPort: " + multicastPortNb); configInfo.append("\n\t\t\tMulticastPacketSize: " + multicastPacketSize); } configInfo.append("\n\tConfiguration :"); if (null != unicastServer) { if (-1 == unicastServer.getStartPort()) { configInfo.append("\n\t\tUnicast Server Bind Addr: " + usingInterface.getHostAddress() + ":" + serverSocketPort); } else { configInfo.append( "\n\t\tUnicast Server Bind Addr: " + usingInterface.getHostAddress() + ":" + serverSocketPort + " [" + unicastServer.getStartPort() + "-" + unicastServer.getEndPort() + "]"); } configInfo.append("\n\t\tUnicast Server Bound Addr: " + unicastServer.getLocalSocketAddress()); } else { configInfo.append("\n\t\tUnicast Server : disabled"); } if (allowMulticast) { configInfo.append("\n\t\tMulticast Server Bind Addr: " + multicastSocket.getLocalSocketAddress()); } configInfo.append("\n\t\tPublic Addresses: "); configInfo.append("\n\t\t\tDefault Endpoint Addr : " + publicAddress); Iterator eachPublic = publicAddresses.iterator(); while (eachPublic.hasNext()) { EndpointAddress anAddr = (EndpointAddress) eachPublic.next(); configInfo.append("\n\t\t\tEndpoint Addr : " + anAddr); } if (LOG.isEnabledFor(Level.INFO)) { LOG.info(configInfo); } } } /** * {@inheritDoc} **/ public synchronized int startApp(String[] arg) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportServiceMonitor transportServiceMonitor = (TransportServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.transportServiceMonitorClassID); if (transportServiceMonitor != null) { unicastTransportMeter = transportServiceMonitor.createTransportMeter("TCP", publicAddress.toString()); } } if (allowMulticast) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportServiceMonitor transportServiceMonitor = (TransportServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.transportServiceMonitorClassID); if (transportServiceMonitor != null) { multicastTransportMeter = transportServiceMonitor.createTransportMeter("Multicast", mAddress); multicastTransportBindingMeter = getMulticastTransportBindingMeter(mAddress); multicastTransportBindingMeter.connectionEstablished(true, 0); // Since multicast is connectionless, force it to appear outbound connected multicastTransportBindingMeter.connectionEstablished(false, 0); // Since multicast is connectionless, force it to appear inbound connected } } } isClosed = false; return 0; } /** * {@inheritDoc} **/ public synchronized void stopApp() { if (isClosed) { return; } isClosed = true; endpoint.removeMessageTransport(this); if (unicastServer != null) { unicastServer.stop(); unicastServer = null; } if (multicastSocket != null) { multicastSocket.close(); multicastSocket = null; multicastThread = null; } connectionWatchTimer.cancel(); // Close all watched streams. The input threads will kill the // connections. WatchedStream[] allStreams = (WatchedStream[]) ShortCycle.toArray(new WatchedStream[0]); for (int i = 0, len = allStreams.length; i < len; i++) { try { allStreams[i].close(); } catch (IOException ignored) {} } allStreams = (WatchedStream[]) LongCycle.toArray(new WatchedStream[0]); for (int i = 0, len = allStreams.length; i < len; i++) { try { allStreams[i].close(); } catch (IOException ignored) {} } // There should be nothing left, but just for completeness... ShortCycle.clear(); LongCycle.clear(); // Accelerated GC (or so some say). endpoint = null; group = null; } /** * {@inheritDoc} **/ public String getProtocolName() { return protocolName; } /** * {@inheritDoc} **/ public EndpointAddress getPublicAddress() { return (EndpointAddress) publicAddress.clone(); } /** * {@inheritDoc} **/ public EndpointService getEndpointService() { return (EndpointService) endpoint.getInterface(); } /** * {@inheritDoc} **/ public Object transportControl(Object operation, Object Value) { return null; } /** * {@inheritDoc} **/ public Iterator getPublicAddresses() { return Collections.unmodifiableList(publicAddresses).iterator(); } /** * {@inheritDoc} **/ public boolean isConnectionOriented() { return true; } /** * {@inheritDoc} **/ public boolean allowsRouting() { return true; } /** * {@inheritDoc} **/ public Messenger getMessenger(EndpointAddress dst, Object hintIgnored) { if (null == dst) { throw new IllegalArgumentException("Null addr"); } EndpointAddress plainAddr = new EndpointAddress(dst, null, null); if (!plainAddr.getProtocolName().equals(getProtocolName())) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("getMessenger: cannot make messenger for protocol: " + plainAddr.getProtocolName()); } return null; } // XXX: the following is a work around in order to // strip out peers that advertise their loopback IP address (127.0.0.1) // as an EndpointAddress // lomax@jxta.org // jice@jxta.org: make an exception if we're configured with *only* the // loopback address, in order to allow interface-less machines // to be used for devellopment. During boot the loopback address // is allowed to be one of the public addresses, only if it is the // only address we have. // So, if the destination is one of our addresses // including loopback, it is okay to return a loopback messenger. // Else, if the address is a loopback address connection will be // denied by TcpConnection if it turns out to be a loopback // address and local addresses are not the loopback singleton. // check for loopback addresses if (publicAddresses.contains(plainAddr)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("getMessenger: return LoopbackMessenger for addr : " + dst); } return new LoopbackMessenger(endpoint, getPublicAddress(), dst, new EndpointAddress("jxta", group.getPeerID().getUniqueValue().toString(), null, null)); } // Not an *authorized* connection to self, then TcpConnection will // check that this is indeed a connection to non-self. It is more // efficient to test it there, where the address is converted from // its string form already. try { // Right now we do not want to "announce" outgoing messengers because they get pooled and so must // not be grabbed by a listener. If "announcing" is to be done, that should be by the endpoint // and probably with a subtely different interface. TcpMessenger m = new TcpMessenger(dst, this); m.start(); return m; } catch (Throwable caught) { if (LOG.isEnabledFor(Level.WARN)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("getMessenger: could not get messenger for " + dst, caught); } else { LOG.warn("getMessenger: could not get messenger for " + dst + "/" + caught.getMessage()); } } return null; } } /** * Handles incoming multicasts. **/ public void run() { if (!allowMulticast) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Multicast disabled"); } return; } try { byte[] buffer; while (true) { if (isClosed) { return; } buffer = new byte[propagateSize]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { multicastSocket.receive(packet); if (isClosed) { return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("multicast message received from :" + packet.getAddress().getHostAddress()); } processMulticast(buffer, packet.getLength());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -