⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udp.java.txt

📁 JGRoups源码
💻 TXT
📖 第 1 页 / 共 5 页
字号:
        if(use_outgoing_packet_handler)            outgoing_packet_handler.start();        if(use_incoming_packet_handler)            incoming_packet_handler.start();    }    /**     * Stops unicast and multicast receiver threads     */    void stopThreads() {        Thread tmp;        // 1. Stop the multicast receiver thread        if(mcast_receiver != null) {            if(mcast_receiver.isAlive()) {                tmp=mcast_receiver;                mcast_receiver=null;                closeMulticastSocket();  // will cause the multicast thread to terminate                tmp.interrupt();                try {                    tmp.join(100);                }                catch(Exception e) {                }                tmp=null;            }            mcast_receiver=null;        }        // 2. Stop the unicast receiver thread        if(ucast_receiver != null) {            ucast_receiver.stop();            ucast_receiver=null;        }        // 3. Stop the in_packet_handler thread        if(incoming_packet_handler != null)            incoming_packet_handler.stop();        // 4. Stop the outgoing packet handler thread        if(outgoing_packet_handler != null)            outgoing_packet_handler.stop();    }    void handleDownEvent(Event evt) {        switch(evt.getType()) {        case Event.TMP_VIEW:        case Event.VIEW_CHANGE:            synchronized(members) {                members.removeAllElements();                Vector tmpvec=((View)evt.getArg()).getMembers();                for(int i=0; i < tmpvec.size(); i++)                    members.addElement(tmpvec.elementAt(i));            }            break;        case Event.GET_LOCAL_ADDRESS:   // return local address -> Event(SET_LOCAL_ADDRESS, local)            passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));            break;        case Event.CONNECT:            channel_name=(String)evt.getArg();            udp_hdr=new UdpHeader(channel_name);            // removed March 18 2003 (bela), not needed (handled by GMS)            // changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might            // be needed if we run without GMS though            passUp(new Event(Event.CONNECT_OK));            break;        case Event.DISCONNECT:            passUp(new Event(Event.DISCONNECT_OK));            break;        case Event.CONFIG:            if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());            handleConfigEvent((HashMap)evt.getArg());            break;        }    }    void handleConfigEvent(HashMap map) {        if(map == null) return;        if(map.containsKey("additional_data"))            additional_data=(byte[])map.get("additional_data");        if(map.containsKey("send_buf_size")) {            mcast_send_buf_size=((Integer)map.get("send_buf_size")).intValue();            ucast_send_buf_size=mcast_send_buf_size;        }        if(map.containsKey("recv_buf_size")) {            mcast_recv_buf_size=((Integer)map.get("recv_buf_size")).intValue();            ucast_recv_buf_size=mcast_recv_buf_size;        }        setBufferSizes();    }    /* ----------------------------- End of Private Methods ---------------------------------------- */    /* ----------------------------- Inner Classes ---------------------------------------- */    class IncomingQueueEntry {        IpAddress   dest=null;        InetAddress sender=null;        int         port=-1;        byte[]      buf;        public IncomingQueueEntry(IpAddress dest, InetAddress sender, int port, byte[] buf) {            this.dest=dest;            this.sender=sender;            this.port=port;            this.buf=buf;        }        public IncomingQueueEntry(byte[] buf) {            this.buf=buf;        }    }    public class UcastReceiver implements Runnable {        boolean running=true;        Thread thread=null;        public void start() {            if(thread == null) {                thread=new Thread(this, "UDP.UcastReceiverThread");                thread.setDaemon(true);                running=true;                thread.start();            }        }        public void stop() {            Thread tmp;            if(thread != null && thread.isAlive()) {                running=false;                tmp=thread;                thread=null;                closeSocket(); // this will cause the thread to break out of its loop                tmp.interrupt();                tmp=null;            }            thread=null;        }        public void run() {            DatagramPacket  packet;            byte            receive_buf[]=new byte[65535];            int             len;            byte[]          data, tmp;            InetAddress     sender_addr;            int             sender_port;            // moved out of loop to avoid excessive object creations (bela March 8 2001)            packet=new DatagramPacket(receive_buf, receive_buf.length);            while(running && thread != null && sock != null) {                try {                    packet.setData(receive_buf, 0, receive_buf.length);                    sock.receive(packet);                    sender_addr=packet.getAddress();                    sender_port=packet.getPort();                    len=packet.getLength();                    data=packet.getData();                    if(log.isTraceEnabled())                        log.trace(new StringBuffer("received (ucast) ").append(len).append(" bytes from ").                                  append(sender_addr).append(':').append(sender_port));                    if(len > receive_buf.length) {                        if(log.isErrorEnabled())                            log.error("size of the received packet (" + len + ") is bigger than allocated buffer (" +                                      receive_buf.length + "): will not be able to handle packet. " +                                      "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);                    }                    if(use_incoming_packet_handler) {                        tmp=new byte[len];                        System.arraycopy(data, 0, tmp, 0, len);                        incoming_queue.add(new IncomingQueueEntry(local_addr, sender_addr, sender_port, tmp));                    }                    else                        handleIncomingUdpPacket(local_addr, sender_addr, sender_port, data);                }                catch(SocketException sock_ex) {                    if(log.isDebugEnabled()) log.debug("unicast receiver socket is closed, exception=" + sock_ex);                    break;                }                catch(InterruptedIOException io_ex) { // thread was interrupted                    ; // go back to top of loop, where we will terminate loop                }                catch(Throwable ex) {                    if(log.isErrorEnabled())                        log.error("[" + local_addr + "] failed receiving unicast packet", ex);                    Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)                }            }            if(log.isDebugEnabled()) log.debug("unicast receiver thread terminated");        }    }    /**     * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up     * to the higher layer (done in handleIncomingUdpPacket()).     */    class IncomingPacketHandler implements Runnable {        Thread t=null;        public void run() {            byte[] data;            IncomingQueueEntry entry;            while(incoming_queue != null && incoming_packet_handler != null) {                try {                    entry=(IncomingQueueEntry)incoming_queue.remove();                    data=entry.buf;                }                catch(QueueClosedException closed_ex) {                    if(log.isDebugEnabled()) log.debug("packet_handler thread terminating");                    break;                }                handleIncomingUdpPacket(entry.dest, entry.sender, entry.port, data);            }        }        void start() {            if(t == null || !t.isAlive()) {                t=new Thread(this, "UDP.IncomingPacketHandler thread");                t.setDaemon(true);                t.start();            }        }        void stop() {            if(incoming_queue != null)                incoming_queue.close(false); // should terminate the packet_handler thread too            t=null;            incoming_queue=null;        }    }    /**     * This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them     * using the unicast or multicast socket     */    class OutgoingPacketHandler implements Runnable {        Thread             t=null;        byte[]             buf;        DatagramPacket     packet;        IpAddress          dest;        public void run() {            Message msg;            while(outgoing_queue != null && outgoing_packet_handler != null) {                try {                    msg=(Message)outgoing_queue.remove();                    handleMessage(msg);                }                catch(QueueClosedException closed_ex) {                    break;                }                catch(Throwable th) {                    if(log.isErrorEnabled()) log.error("exception sending packet", th);                }                msg=null; // let's give the poor garbage collector a hand...            }            if(log.isTraceEnabled()) log.trace("packet_handler thread terminating");        }        protected void handleMessage(Message msg) throws Exception {            send(msg);        }        void start() {            if(t == null || !t.isAlive()) {                t=new Thread(this, "UDP.OutgoingPacketHandler thread");                t.setDaemon(true);                t.start();            }        }        void stop() {            if(outgoing_queue != null)                outgoing_queue.close(false); // should terminate the packet_handler thread too            t=null;            // outgoing_queue=null;        }    }    /**     * Bundles smaller messages into bigger ones. Collects messages in a list until     * messages of a total of <tt>max_bundle_size bytes</tt> have accumulated, or until     * <tt>max_bundle_timeout</tt> milliseconds have elapsed, whichever is first. Messages     * are unbundled at the receiver.     */    class BundlingOutgoingPacketHandler extends OutgoingPacketHandler {        long                total_bytes=0;        /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */        final HashMap       msgs=new HashMap(11);        void start() {            super.start();            t.setName("UDP.BundlingOutgoingPacketHandler thread");        }        public void run() {            Message msg=null, leftover=null;            long start=0;            while(outgoing_queue != null) {                try {                    total_bytes=0;                    msg=leftover != null? leftover : (Message)outgoing_queue.remove(); // blocks until message is available                    start=System.currentTimeMillis();                    leftover=waitForMessagesToAccumulate(msg, outgoing_queue, max_bundle_size, start, max_bundle_timeout);                    bundleAndSend(start);                }                catch(QueueClosedException closed_ex) {                    break;                }                catch(Throwable th) {                    if(log.isErrorEnabled()) log.error("exception sending packet", th);                }            }            bundleAndSend(start);            if(log.isTraceEnabled()) log.trace("packet_handler thread terminating");        }        /**         * Waits until max_size bytes have accumulated in the queue, or max_time milliseconds have elapsed.         * When a message cannot be added to the ready-to-send bundle, it is returned, so the caller can         * re-submit it again next time.         * @param m         * @param q         * @param max_size         * @param max_time       

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -