⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mcasttransport.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                    // enough.                    multicastProcessor.put(packet);                } catch (InterruptedException woken) {                    Thread.interrupted();                } catch (InterruptedIOException woken) {                    Thread.interrupted();                } catch (Exception e) {                    if (isClosed) {                        return;                    }                    if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE) && (!isClosed)) {                        LOG.log(Level.SEVERE, "failure during multicast receive", e);                    }                    break;                }            }        } catch (Throwable all) {            if (isClosed) {                return;            }            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);            }        } finally {            multicastThread = null;        }    }    /**     * {@inheritDoc}     * <p/>     * Synchronized to not allow concurrent IP multicast: this naturally bounds     * the usage of ip-multicast boolean be linear and not exponential.     */    public synchronized boolean propagate(Message message, String pName, String pParams, int initalTTL) {        long sendStartTime = System.currentTimeMillis();        int numBytesInPacket = 0;        try {            message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, msgSrcAddrElement);            // First build the destination and source addresses            EndpointAddress destAddr = new EndpointAddress(publicAddress, pName, pParams);            MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddr.toString(), null);            message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement);            WireFormatMessage serialed = WireFormatMessageFactory.toWire(message, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null);            MessagePackageHeader header = new MessagePackageHeader();            header.setContentTypeHeader(serialed.getMimeType());            header.setContentLengthHeader(serialed.getByteLength());            ByteArrayOutputStream buffer = new ByteArrayOutputStream(multicastPacketSize);            buffer.write('J');            buffer.write('X');            buffer.write('T');            buffer.write('A');            header.sendToStream(buffer);            serialed.sendToStream(buffer);            buffer.flush();            buffer.close();            numBytesInPacket = buffer.size();            if ((buffer.size() > multicastPacketSize) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.warning("Multicast datagram exceeds multicast size.");            }            DatagramPacket packet = new DatagramPacket(buffer.toByteArray(), numBytesInPacket, multicastInetAddress, multicastPort);            multicastSocket.send(packet);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sent Multicast message to :" + pName + "/" + pParams);            }            if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) {                multicastTransportBindingMeter.messageSent(true, message, System.currentTimeMillis() - sendStartTime, numBytesInPacket);            }            return true;        } catch (IOException e) {            if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) {                multicastTransportBindingMeter.sendFailure(true, message, System.currentTimeMillis() - sendStartTime, numBytesInPacket);            }            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Multicast socket send failed", e);            }            return false;        }    }    /**     * Handle a byte buffer from a multi-cast.     *     * @param packet the message packet.     */    void processMulticast(DatagramPacket packet) {        int size = packet.getLength();        byte[] buffer = packet.getData();        long messageReceiveBeginTime = System.currentTimeMillis();        try {            if (size < 4) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("damaged multicast discarded");                }                throw new IOException("damaged multicast discarded : too short");            }            if (('J' != buffer[0]) || ('X' != buffer[1]) || ('T' != buffer[2]) || ('A' != buffer[3])) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("damaged multicast discarded");                }                throw new IOException("damaged multicast discarded : incorrect signature");            }            ByteBuffer bbuffer = ByteBuffer.wrap(buffer, 4, size - 4);            MessagePackageHeader header = new MessagePackageHeader();            if (!header.readHeader(bbuffer)) {                throw new IOException("Failed to read framing header");            }            MimeMediaType msgMime = header.getContentTypeHeader();            // TODO 20020730 bondolo@jxta.org Do something with content-coding here.            // read the message!            Message msg = WireFormatMessageFactory.fromBuffer(bbuffer, msgMime, null);            // Extract the source and destination            MessageElement srcAddrElem = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME);            if (null == srcAddrElem) {                throw new IOException("No Source Address in " + msg);            }            msg.removeMessageElement(srcAddrElem);            EndpointAddress srcAddr = new EndpointAddress(srcAddrElem.toString());            if (srcAddr.equals(ourSrcAddr)) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.log(Level.FINE, "Discard loopback multicast message");                }                return;            }            MessageElement dstAddrElem = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, EndpointServiceImpl.MESSAGE_DESTINATION_NAME);            if (null == dstAddrElem) {                throw new IOException("No Destination Address in " + msg);            }            msg.removeMessageElement(dstAddrElem);            EndpointAddress dstAddr = new EndpointAddress(dstAddrElem.toString());            // Handoff the message to the EndpointService Manager            endpoint.processIncomingMessage(msg, srcAddr, dstAddr);            if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) {                multicastTransportBindingMeter.messageReceived(false, msg, messageReceiveBeginTime - System.currentTimeMillis(), size);            }        } catch (Exception e) {            if (TransportMeterBuildSettings.TRANSPORT_METERING && (multicastTransportBindingMeter != null)) {                multicastTransportBindingMeter.receiveFailure(false, messageReceiveBeginTime - System.currentTimeMillis(), size);            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "Discard incoming multicast message", e);            }        }    }    TransportBindingMeter getMulticastTransportBindingMeter(EndpointAddress destinationAddress) {        if (multicastTransportMeter != null) {            return multicastTransportMeter.getTransportBindingMeter(group.getPeerID(), destinationAddress);        } else {            return null;        }    }    /**     * Handles incoming datagram packets. This implementation uses the peer     * group Executor service to process the datagram packets, but limits the     * number of concurrent tasks.     */    private class DatagramProcessor implements Runnable {        /**         * The maximum number of datagrams we will simultaneously process.         */        private static final int MAX_SIMULTANEOUS_PROCESSING = 5;        /**         * The executor to which we will issue tasks.         */        final Executor executor;        /**         * Queue of datagrams waiting to be executed. The queue is quite small.         * The goal is not to cache datagrams in memory. If we can't keep up it         * is better that we drop messages.         */        final BlockingQueue<DatagramPacket> queue = new ArrayBlockingQueue<DatagramPacket>(MAX_SIMULTANEOUS_PROCESSING + 1);        /**         * The number of executor tasks we are currently using.         */        int currentTasks = 0;        /**         * If {@code true} then this processor has been stopped.         */        volatile boolean stopped = false;        /**         * Default constructor         * @param executor the threadpool         */        DatagramProcessor(Executor executor) {            this.executor = executor;        }        /**         * Stops this thread         */        void stop() {            queue.clear();            stopped = true;        }        /**         * Puts a datagram on the queue. The enqueue operation is blocking and         * may take a significant amount of time.         *         * @param packet the datagram         * @throws InterruptedException if interrupted         */        void put(DatagramPacket packet) throws InterruptedException {            boolean execute = false;            if (stopped) {                return;            }            if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {                LOG.log(Level.FINER, "Queuing incoming datagram packet : " + packet);            }            // push the datagram            queue.put(packet);            // See if we can start a new executor.            synchronized (this) {                if (!stopped && (currentTasks < MAX_SIMULTANEOUS_PROCESSING)) {                    currentTasks++;                    execute = true;                }            }            // If it's ok, start a new executor outside of the synchronization.            if (execute) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.log(Level.FINER, "Starting new executor datagram processing task");                }                executor.execute(this);            }        }        /**         * {@inheritDoc}         */        public void run() {            try {                DatagramPacket packet;                while (!stopped && (null != (packet = queue.poll()))) {                    if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {                        LOG.log(Level.FINER, "Processing incoming datagram packet : " + packet);                    }                    processMulticast(packet);                }            } catch (Throwable all) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.log(Level.SEVERE, "Uncaught Throwable", all);                }            } finally {                synchronized (this) {                    currentTasks--;                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -