📄 mcasttransport.java
字号:
// enough. multicastProcessor.put(packet); } catch (InterruptedException woken) { Thread.interrupted(); } catch (InterruptedIOException woken) { Thread.interrupted(); } catch (Exception e) { if (isClosed) { return; } if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE) && (!isClosed)) { LOG.log(Level.SEVERE, "failure during multicast receive", e); } break; } } } catch (Throwable all) { if (isClosed) { return; } if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { multicastThread = null; } } /** * {@inheritDoc} * <p/> * Synchronized to not allow concurrent IP multicast: this naturally bounds * the usage of ip-multicast boolean be linear and not exponential. */ public synchronized boolean propagate(Message message, String pName, String pParams, int initalTTL) { long sendStartTime = System.currentTimeMillis(); int numBytesInPacket = 0; try { message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, msgSrcAddrElement); // First build the destination and source addresses EndpointAddress destAddr = new EndpointAddress(publicAddress, pName, pParams); MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddr.toString(), null); message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement); WireFormatMessage serialed = WireFormatMessageFactory.toWire(message, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null); MessagePackageHeader header = new MessagePackageHeader(); header.setContentTypeHeader(serialed.getMimeType()); header.setContentLengthHeader(serialed.getByteLength()); ByteArrayOutputStream buffer = new ByteArrayOutputStream(multicastPacketSize); buffer.write('J'); buffer.write('X'); buffer.write('T'); buffer.write('A'); header.sendToStream(buffer); serialed.sendToStream(buffer); buffer.flush(); buffer.close(); numBytesInPacket = buffer.size(); if ((buffer.size() > multicastPacketSize) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Multicast datagram exceeds multicast size."); } DatagramPacket packet = new DatagramPacket(buffer.toByteArray(), numBytesInPacket, multicastInetAddress, multicastPort); multicastSocket.send(packet); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent Multicast message to :" + pName + "/" + pParams); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.messageSent(true, message, System.currentTimeMillis() - sendStartTime, numBytesInPacket); } return true; } catch (IOException e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.sendFailure(true, message, System.currentTimeMillis() - sendStartTime, numBytesInPacket); } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Multicast socket send failed", e); } return false; } } /** * Handle a byte buffer from a multi-cast. * * @param packet the message packet. */ void processMulticast(DatagramPacket packet) { int size = packet.getLength(); byte[] buffer = packet.getData(); long messageReceiveBeginTime = System.currentTimeMillis(); try { if (size < 4) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("damaged multicast discarded"); } throw new IOException("damaged multicast discarded : too short"); } if (('J' != buffer[0]) || ('X' != buffer[1]) || ('T' != buffer[2]) || ('A' != buffer[3])) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("damaged multicast discarded"); } throw new IOException("damaged multicast discarded : incorrect signature"); } ByteBuffer bbuffer = ByteBuffer.wrap(buffer, 4, size - 4); MessagePackageHeader header = new MessagePackageHeader(); if (!header.readHeader(bbuffer)) { throw new IOException("Failed to read framing header"); } MimeMediaType msgMime = header.getContentTypeHeader(); // TODO 20020730 bondolo@jxta.org Do something with content-coding here. // read the message! Message msg = WireFormatMessageFactory.fromBuffer(bbuffer, msgMime, null); // Extract the source and destination MessageElement srcAddrElem = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME); if (null == srcAddrElem) { throw new IOException("No Source Address in " + msg); } msg.removeMessageElement(srcAddrElem); EndpointAddress srcAddr = new EndpointAddress(srcAddrElem.toString()); if (srcAddr.equals(ourSrcAddr)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Discard loopback multicast message"); } return; } MessageElement dstAddrElem = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, EndpointServiceImpl.MESSAGE_DESTINATION_NAME); if (null == dstAddrElem) { throw new IOException("No Destination Address in " + msg); } msg.removeMessageElement(dstAddrElem); EndpointAddress dstAddr = new EndpointAddress(dstAddrElem.toString()); // Handoff the message to the EndpointService Manager endpoint.processIncomingMessage(msg, srcAddr, dstAddr); if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.messageReceived(false, msg, messageReceiveBeginTime - System.currentTimeMillis(), size); } } catch (Exception e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) { multicastTransportBindingMeter.receiveFailure(false, messageReceiveBeginTime - System.currentTimeMillis(), size); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Discard incoming multicast message", e); } } } TransportBindingMeter getMulticastTransportBindingMeter(EndpointAddress destinationAddress) { if (multicastTransportMeter != null) { return multicastTransportMeter.getTransportBindingMeter(group.getPeerID(), destinationAddress); } else { return null; } } /** * Handles incoming datagram packets. This implementation uses the peer * group Executor service to process the datagram packets, but limits the * number of concurrent tasks. */ private class DatagramProcessor implements Runnable { /** * The maximum number of datagrams we will simultaneously process. */ private static final int MAX_SIMULTANEOUS_PROCESSING = 5; /** * The executor to which we will issue tasks. */ final Executor executor; /** * Queue of datagrams waiting to be executed. The queue is quite small. * The goal is not to cache datagrams in memory. If we can't keep up it * is better that we drop messages. */ final BlockingQueue<DatagramPacket> queue = new ArrayBlockingQueue<DatagramPacket>(MAX_SIMULTANEOUS_PROCESSING + 1); /** * The number of executor tasks we are currently using. */ int currentTasks = 0; /** * If {@code true} then this processor has been stopped. */ volatile boolean stopped = false; /** * Default constructor * @param executor the threadpool */ DatagramProcessor(Executor executor) { this.executor = executor; } /** * Stops this thread */ void stop() { queue.clear(); stopped = true; } /** * Puts a datagram on the queue. The enqueue operation is blocking and * may take a significant amount of time. * * @param packet the datagram * @throws InterruptedException if interrupted */ void put(DatagramPacket packet) throws InterruptedException { boolean execute = false; if (stopped) { return; } if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.log(Level.FINER, "Queuing incoming datagram packet : " + packet); } // push the datagram queue.put(packet); // See if we can start a new executor. synchronized (this) { if (!stopped && (currentTasks < MAX_SIMULTANEOUS_PROCESSING)) { currentTasks++; execute = true; } } // If it's ok, start a new executor outside of the synchronization. if (execute) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINER, "Starting new executor datagram processing task"); } executor.execute(this); } } /** * {@inheritDoc} */ public void run() { try { DatagramPacket packet; while (!stopped && (null != (packet = queue.poll()))) { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.log(Level.FINER, "Processing incoming datagram packet : " + packet); } processMulticast(packet); } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable", all); } } finally { synchronized (this) { currentTasks--; } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -