📄 tcptransport.java
字号:
StringBuilder configInfo = new StringBuilder("Configuring TCP Message Transport : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params:"); configInfo.append("\n\t\tGroup : ").append(group); configInfo.append("\n\t\tPeer ID: ").append(group.getPeerID()); configInfo.append("\n\tConfiguration:"); configInfo.append("\n\t\tProtocol: ").append(protocolName); configInfo.append("\n\t\tPublic address: ").append(serverName == null ? "(unspecified)" : serverName); configInfo.append("\n\t\tInterface address: ").append( interfaceAddressStr == null ? "(unspecified)" : interfaceAddressStr); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tUsing Interface: ").append(usingInterface.getHostAddress()); if (null != unicastServer) { if (-1 == unicastServer.getStartPort()) { configInfo.append("\n\t\tUnicast Server Bind Addr: ").append(usingInterface.getHostAddress()).append(":").append( serverSocketPort); } else { configInfo.append("\n\t\tUnicast Server Bind Addr: ").append(usingInterface.getHostAddress()).append(":").append(serverSocketPort).append(" [").append(unicastServer.getStartPort()).append("-").append(unicastServer.getEndPort()).append( "]"); } configInfo.append("\n\t\tUnicast Server Bound Addr: ").append(unicastServer.getLocalSocketAddress()); } else { configInfo.append("\n\t\tUnicast Server : disabled"); } configInfo.append("\n\t\tPublic Addresses: "); configInfo.append("\n\t\t\tDefault Endpoint Addr : ").append(publicAddress); for (EndpointAddress anAddr : publicAddresses) { configInfo.append("\n\t\t\tEndpoint Addr : ").append(anAddr); } LOG.config(configInfo.toString()); } } /** * {@inheritDoc} */ public synchronized int startApp(String[] arg) { endpoint = group.getEndpointService(); if (null == endpoint) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is an endpoint service"); } return Module.START_AGAIN_STALLED; } try { messengerSelector = SelectorProvider.provider().openSelector(); } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not create a messenger selector", e); } } messengerSelectorThread = new Thread(group.getHomeThreadGroup(), new MessengerSelectorThread(), "TCP Transport MessengerSelectorThread for " + this); messengerSelectorThread.setDaemon(true); messengerSelectorThread.start(); // We're fully ready to function. messengerEventListener = endpoint.addMessageTransport(this); if (messengerEventListener == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Transport registration refused"); } return -1; } // Cannot start before registration, we could be announcing new messengers while we // do not exist yet ! (And get an NPE because we do not have the messenger listener set). if (unicastServer != null) { if (!unicastServer.start()) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Unable to start TCP Unicast Server"); } return -1; } } if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportServiceMonitor transportServiceMonitor = (TransportServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.transportServiceMonitorClassID); if (transportServiceMonitor != null) { unicastTransportMeter = transportServiceMonitor.createTransportMeter("TCP", publicAddress); } } isClosed = false; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("TCP Message Transport started."); } return Module.START_OK; } /** * {@inheritDoc} */ public synchronized void stopApp() { if (isClosed) { return; } isClosed = true; if (unicastServer != null) { unicastServer.stop(); unicastServer = null; } Thread temp = messengerSelectorThread; if (null != temp) { temp.interrupt(); try { messengerSelector.close(); } catch (IOException failed) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "IO error occured while closing server socket", failed); } } } // Inform the pool that we don't need as many write selectors. synchronized (writeSelectorCache) { extraWriteSelectors += MAX_WRITE_SELECTORS; } endpoint.removeMessageTransport(this); endpoint = null; group = null; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info(MessageFormat.format("Total bytes sent : {0}", getBytesSent())); LOG.info(MessageFormat.format("Total Messages sent : {0}", getMessagesSent())); LOG.info(MessageFormat.format("Total bytes received : {0}", getBytesReceived())); LOG.info(MessageFormat.format("Total Messages received : {0}", getMessagesReceived())); LOG.info(MessageFormat.format("Total connections accepted : {0}", getConnectionsAccepted())); LOG.info("TCP Message Transport shut down."); } } /** * {@inheritDoc} */ public String getProtocolName() { return protocolName; } /** * {@inheritDoc} */ public EndpointAddress getPublicAddress() { return publicAddress; } /** * {@inheritDoc} */ public EndpointService getEndpointService() { return (EndpointService) endpoint.getInterface(); } /** * {@inheritDoc} */ public Object transportControl(Object operation, Object Value) { return null; } /** * {@inheritDoc} */ public Iterator<EndpointAddress> getPublicAddresses() { return Collections.unmodifiableList(publicAddresses).iterator(); } /** * {@inheritDoc} */ public boolean isConnectionOriented() { return true; } /** * {@inheritDoc} */ public boolean allowsRouting() { return true; } public Messenger getMessenger(EndpointAddress dst, Object hintIgnored) { return getMessenger(dst, hintIgnored, true); } /** * {@inheritDoc} */ public Messenger getMessenger(EndpointAddress dst, Object hintIgnored, boolean selfDestruct) { if (!dst.getProtocolName().equalsIgnoreCase(getProtocolName())) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Cannot make messenger for protocol: " + dst.getProtocolName()); } return null; } EndpointAddress plainAddr = new EndpointAddress(dst, null, null); // If the destination is one of our addresses including loopback, we // return a loopback messenger. if (publicAddresses.contains(plainAddr)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("return LoopbackMessenger for addr : " + dst); } return new LoopbackMessenger(group, endpoint, getPublicAddress(), dst, new EndpointAddress("jxta", group.getPeerID().getUniqueValue().toString(), null, null)); } try { // Right now we do not want to "announce" outgoing messengers because they get pooled and so must // not be grabbed by a listener. If "announcing" is to be done, that should be by the endpoint // and probably with a subtely different interface. return new TcpMessenger(dst, this, selfDestruct); } catch (Exception caught) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.log(Level.FINER, "Could not get messenger for " + dst, caught); } else { LOG.warning("Could not get messenger for " + dst + " : " + caught.getMessage()); } } if (caught instanceof RuntimeException) { throw (RuntimeException) caught; } return null; } } /** * {@inheritDoc} * <p/> * This implementation tries to open a connection, and after tests the * result. */ public boolean ping(EndpointAddress addr) { boolean result = false; EndpointAddress endpointAddress; long pingStartTime = 0; if (TransportMeterBuildSettings.TRANSPORT_METERING) { pingStartTime = System.currentTimeMillis(); } endpointAddress = new EndpointAddress(addr, null, null); try { // Too bad that this one will not get pooled. On the other hand ping is // not here too stay. TcpMessenger tcpMessenger = new TcpMessenger(endpointAddress, this); if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportBindingMeter transportBindingMeter = tcpMessenger.getTransportBindingMeter(); if (transportBindingMeter != null) { transportBindingMeter.ping(System.currentTimeMillis() - pingStartTime); } } result = true; } catch (Throwable e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "failure pinging " + addr.toString(), e); } if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportBindingMeter transportBindingMeter = getUnicastTransportBindingMeter(null, endpointAddress); if (transportBindingMeter != null) { transportBindingMeter.pingFailed(System.currentTimeMillis() - pingStartTime); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("ping to " + addr.toString() + " == " + result); } return result; } /** * Getter for property 'restrictionPort'. * * @return Value for property 'restrictionPort'. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -