📄 tcptransport.java
字号:
} catch (Exception e) { if (LOG.isEnabledFor(Level.ERROR) && (!isClosed)) { LOG.error("failure during multicast receive", e); } if (isClosed) { return; } break; } packet = null; } } catch (Throwable all) { if (isClosed) { return; } if (LOG.isEnabledFor(Level.FATAL)) { LOG.fatal("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } } /** * Handle a byte buffer from a multi-cast. This assumes that processing of * the buffer is lightweight. Formerly there used to be a delegation to * worker threads. The way queuing works has changed though and it should * be ok to do the receiver right on the server thread. * * @param buffer the buffer to process. **/ public void processMulticast(byte[] buffer, int size) { if (!allowMulticast) { return; } long messageReceiveBeginTime = 0; try { if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { messageReceiveBeginTime = System.currentTimeMillis(); } if (size < 4) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processMulticast : damaged multicast discarded"); } return; } if (('J' != buffer[0]) || ('X' != buffer[1]) || ('T' != buffer[2]) || ('A' != buffer[3])) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processMulticast : damaged multicast discarded"); } return; } InputStream inputStream = new ByteArrayInputStream(buffer, 4, size - 4); MessagePackageHeader header = new MessagePackageHeader(inputStream); /* * DVT KLUDGE TO MAKE THE ROUTER'S LIFE MISERABLE. * to be removed after new ad-hoc routing dvt. */ if (restrictionPort != -1) { Header srcEAHeader = (Header) header.getHeader("srcEA").next(); EndpointAddress srcAddr = null; try { srcAddr = new EndpointAddress(new String(srcEAHeader.getValue(), "UTF-8")); } catch (UnsupportedEncodingException never) { // utf-8 is a required encoding. throw new IllegalStateException("utf-8 encoding support missing!"); } String strAddr = srcAddr.getProtocolAddress(); int srcPort = Integer.parseInt(strAddr.substring(strAddr.lastIndexOf(':') + 1)); if (srcPort < serverSocketPort - 1 || srcPort > serverSocketPort + 1) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processMulticast : simulated separate networks killed multicast message."); } return; } } MimeMediaType msgMime = header.getContentTypeHeader(); long msglength = header.getContentLengthHeader(); // FIXME 20020730 bondolo@jxta.org Do something with content-coding here. // read the message! Message msg = WireFormatMessageFactory.fromWire(new LimitInputStream(inputStream, msglength), msgMime, (MimeMediaType) null); // Give the message to the EndpointService Manager if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processMulticast : handing multicast message to EndpointService"); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.messageReceived(false, msg, messageReceiveBeginTime - System.currentTimeMillis(), size); } // Demux the message for the upper layers. endpoint.demux(msg); } catch (Throwable e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.receiveFailure(false, messageReceiveBeginTime - System.currentTimeMillis(), size); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processMulticast : discard incoming multicast message - exception ", e); } // Just discard the message. Multicast are not reliable return; } } /** *(@inheritdoc} */ public boolean isPropagateEnabled() { return allowMulticast; } /** *(@inheritdoc} */ public boolean isPropagationSupported() { return true; } /** * Propagates a TransportMessage on this EndpointProtocol * * <p/>Synchronizing to not allow concurrent IP multicast: this * naturally bounds the usage of ip-multicast boolean be linear and not * exponantial. * * @param message the TransportMessage to be propagated * @param pName the name of a service * @param pParams parameters for this message. * @param prunePeer (ignored) * @exception IOException thrown if the message could not be sent for some reason. */ public synchronized void propagate(Message message, String pName, String pParams, String prunePeer) throws IOException { if (!allowMulticast) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Multicast disabled, returning"); } return; } long sendStartTime = 0; int numBytesInPacket = 0; try { if (TransportMeterBuildSettings.TRANSPORT_METERING) { sendStartTime = System.currentTimeMillis(); } message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, msgSrcAddrElement); // First build the destination and source addresses EndpointAddress destAddr = new EndpointAddress(mAddress, pName, pParams); MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddr.toString(), (MessageElement) null); message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement); WireFormatMessage serialed = WireFormatMessageFactory.toWire(message, WireFormatMessageFactory.DEFAULT_WIRE_MIME, (MimeMediaType[]) null); MessagePackageHeader header = new MessagePackageHeader(); header.setContentTypeHeader(serialed.getMimeType()); header.setContentLengthHeader(serialed.getByteLength()); try { header.replaceHeader("srcEA", getPublicAddress().toString().getBytes("UTF-8")); } catch (UnsupportedEncodingException never) { // utf-8 is a required encoding. throw new IllegalStateException("utf-8 encoding support missing!"); } // 20020730 bondolo@jxta.org Do something with content-coding here // Write the header and the message. CountingOutputStream count = new CountingOutputStream(new DevNullOutputStream()); count.write('J'); count.write('X'); count.write('T'); count.write('A'); header.sendToStream(count); if (serialed.getByteLength() >= (multicastPacketSize - count.getBytesWritten())) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Message discarded : larger than multicast packet size (" + multicastPacketSize + "<" + (serialed.getByteLength() + count.getBytesWritten()) + ")"); } throw new IOException("Message discarded : larger than multicast packet size"); } ByteArrayOutputStream buffer = new ByteArrayOutputStream(multicastPacketSize); buffer.write('J'); buffer.write('X'); buffer.write('T'); buffer.write('A'); header.sendToStream(buffer); serialed.sendToStream(buffer); buffer.flush(); buffer.close(); numBytesInPacket = (int) (count.getBytesWritten() + serialed.getByteLength()); DatagramPacket packet = new DatagramPacket(buffer.toByteArray(), numBytesInPacket, propagateInetAddress, propagatePort); multicastSocket.send(packet); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sent Multicast message to :"+pName+"/"+pParams); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.messageSent(true, message, System.currentTimeMillis() - sendStartTime, numBytesInPacket); } } catch (IOException e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.sendFailure(true, message, System.currentTimeMillis() - sendStartTime, numBytesInPacket); } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Multicast socket send failed", e); } } } /** * Ping a remote host. * * This implementation tries to open a connection, and after tests the * result. Note if there is already an open connection, no new connection * is actually created. * * @param addr the endpoint addresss to ping * @return true if the address is reachable, otherwise false. **/ public boolean ping(EndpointAddress addr) { boolean result = false; EndpointAddress endpointAddress = null; long pingStartTime = 0; if (TransportMeterBuildSettings.TRANSPORT_METERING) { pingStartTime = System.currentTimeMillis(); } try { // Too bad that this one will not get pooled. On the other hand ping is // not here too stay. // Note the connection receive thread is not started so that messenger goes away real quick. endpointAddress = new EndpointAddress(addr, null, null); TcpMessenger tcpMessenger = new TcpMessenger(endpointAddress, this); if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportBindingMeter transportBindingMeter = tcpMessenger.getTransportBindingMeter(); if (transportBindingMeter != null) { transportBindingMeter.ping(System.currentTimeMillis() - pingStartTime); } } result = true; } catch (Throwable e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failure pinging " + addr.toString(), e); } if (TransportMeterBuildSettings.TRANSPORT_METERING) { TransportBindingMeter transportBindingMeter = getUnicastTransportBindingMeter(null, endpointAddress); if (transportBindingMeter != null) { transportBindingMeter.pingFailed(System.currentTimeMillis() - pingStartTime); } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("ping to " + addr.toString() + " == " + result); } return result; } int getRestrictionPort() { return restrictionPort; } TransportBindingMeter getUnicastTransportBindingMeter(PeerID peerID, EndpointAddress destinationAddress) { if (unicastTransportMeter != null) { return unicastTransportMeter.getTransportBindingMeter((peerID != null) ? peerID.toString() : TransportMeter.UNKNOWN_PEER, destinationAddress); } else { return null; } } TransportBindingMeter getMulticastTransportBindingMeter(EndpointAddress destinationAddress) { if (multicastTransportMeter != null) { return multicastTransportMeter.getTransportBindingMeter(group.getPeerID(), destinationAddress); } else { return null; } } void messengerReadyEvent(Messenger newMessenger, EndpointAddress connAddr) { messengerEventListener.messengerReady(new MessengerEvent(this, newMessenger, connAddr)); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -