⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udp.java.txt

📁 JGRoups源码
💻 TXT
📖 第 1 页 / 共 5 页
字号:
    private void setSourceAddress(Message msg) {        if(msg.getSrc() == null)            msg.setSrc(local_addr);    }    /**     * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because     * mcast or unicast socket reads can be concurrent.     * Correction (bela April 19 2005): we acces no instance variables, all vars are allocated on the stack, so     * this method should be reentrant: removed 'synchronized' keyword     */    void handleIncomingUdpPacket(IpAddress dest, InetAddress sender, int port, byte[] data) {        ByteArrayInputStream inp_stream=null;        DataInputStream      inp=null;        Message              msg=null;        List                 l;  // used if bundling is enabled        short                version;        boolean              is_message_list;        try {            // skip the first n bytes (default: 4), this is the version info            inp_stream=new ByteArrayInputStream(data);            inp=new DataInputStream(inp_stream);            version=inp.readShort();            if(Version.compareTo(version) == false) {                if(log.isWarnEnabled()) {                    StringBuffer sb=new StringBuffer();                    sb.append("packet from ").append(sender).append(':').append(port);                    sb.append(" has different version (").append(version);                    sb.append(") from ours (").append(Version.printVersion()).append("). ");                    if(discard_incompatible_packets)                        sb.append("Packet is discarded");                    else                        sb.append("This may cause problems");                    log.warn(sb.toString());                }                if(discard_incompatible_packets)                    return;            }            is_message_list=inp.readBoolean();            if(is_message_list) {                l=bufferToList(inp, dest);                for(Enumeration en=l.elements(); en.hasMoreElements();) {                    msg=(Message)en.nextElement();                    try {                        handleMessage(msg);                    }                    catch(Throwable t) {                        if(log.isErrorEnabled())                            log.error("failed unmarshalling message list", t);                    }                }            }            else {                msg=bufferToMessage(inp, dest, sender, port);                handleMessage(msg);            }        }        catch(Throwable e) {            if(log.isErrorEnabled()) log.error("exception in processing incoming packet", e);        }        finally {            Util.closeInputStream(inp);            Util.closeInputStream(inp_stream);        }    }    void handleMessage(Message msg) {        Event     evt;        UdpHeader hdr;        Address   dst=msg.getDest();        if(dst == null)            dst=mcast_addr;        if(stats) {            num_msgs_received++;            num_bytes_received+=msg.getLength();        }        // discard my own multicast loopback copy        if(loopback) {            Address src=msg.getSrc();            if((dst == null || (dst != null && dst.isMulticastAddress())) && src != null && local_addr.equals(src)) {                if(log.isTraceEnabled())                    log.trace("discarded own loopback multicast packet");                return;            }        }        evt=new Event(Event.MSG, msg);        if(log.isTraceEnabled()) {            StringBuffer sb=new StringBuffer("message is ");            sb.append(msg).append(", headers are ").append(msg.getHeaders());            log.trace(sb.toString());        }        /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.        * This allows e.g. PerfObserver to get the time of reception of a message */        if(observer != null)            observer.up(evt, up_queue.size());        hdr=(UdpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()        if(hdr != null) {            /* Discard all messages destined for a channel with a different name */            String ch_name=hdr.channel_name;            // Discard if message's group name is not the same as our group name unless the            // message is a diagnosis message (special group name DIAG_GROUP)            if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&                    !ch_name.equals(Util.DIAG_GROUP)) {                if(log.isWarnEnabled()) log.warn("discarded message from different group (" +                                                 ch_name + "). Sender was " + msg.getSrc());                return;            }        }        else {            if(log.isErrorEnabled()) log.error("message does not have a UDP header");        }        passUp(evt);    }    void sendUdpMessage(Message msg) throws Exception {        sendUdpMessage(msg, false);    }    /** Send a message to the address specified in dest */    void sendUdpMessage(Message msg, boolean copyForOutgoingQueue) throws Exception {        IpAddress           dest;        Message             copy;        Event               evt;        dest=(IpAddress)msg.getDest();  // guaranteed to be non-null        setSourceAddress(msg);        if(log.isTraceEnabled()) {            StringBuffer sb=new StringBuffer("sending msg to ");            sb.append(msg.getDest()).append(" (src=").append(msg.getSrc()).append("), headers are ").append(msg.getHeaders());            log.trace(sb.toString());        }        // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.        // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,        // we will discard our own multicast message        if(loopback && (dest.equals(local_addr) || dest.isMulticastAddress())) {            copy=msg.copy();            // copy.removeHeader(name); // we don't remove the header            copy.setSrc(local_addr);            // copy.setDest(dest);            evt=new Event(Event.MSG, copy);            /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.               This allows e.g. PerfObserver to get the time of reception of a message */            if(observer != null)                observer.up(evt, up_queue.size());            if(log.isTraceEnabled()) log.trace("looped back local message " + copy);            passUp(evt);            if(dest != null && !dest.isMulticastAddress())                return;        }        if(use_outgoing_packet_handler) {            if(copyForOutgoingQueue)                outgoing_queue.add(msg.copy());            else                outgoing_queue.add(msg);            return;        }        send(msg);    }    /** Internal method to serialize and send a message. This method is not reentrant */    void send(Message msg) throws Exception {        Buffer     buf;        IpAddress  dest=(IpAddress)msg.getDest(); // guaranteed to be non-null        IpAddress  src=(IpAddress)msg.getSrc();        synchronized(out_stream) {            buf=messageToBuffer(msg, dest, src);            doSend(buf, dest.getIpAddress(), dest.getPort());        }    }    void doSend(Buffer buf, InetAddress dest, int port) throws IOException {        DatagramPacket       packet;        // packet=new DatagramPacket(data, data.length, dest, port);        packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), dest, port);        if(stats) {            num_msgs_sent++;            num_bytes_sent+=buf.getLength();        }        if(dest.isMulticastAddress() && mcast_send_sock != null) { // mcast_recv_sock might be null if ip_mcast is false            mcast_send_sock.send(packet);        }        else {            if(sock != null)                sock.send(packet);        }    }    void sendMultipleUdpMessages(Message msg, Vector dests) {        Address dest;        for(int i=0; i < dests.size(); i++) {            dest=(Address)dests.elementAt(i);            msg.setDest(dest);            try {                sendUdpMessage(msg,                               true); // copy for outgoing queue if outgoing queue handler is enabled            }            catch(Exception e) {                if(log.isErrorEnabled()) log.error("failed sending multiple messages", e);            }        }    }    /**     * This method needs to be synchronized on out_stream when it is called     * @param msg     * @param dest     * @param src     * @return     * @throws IOException     */    private Buffer messageToBuffer(Message msg, IpAddress dest, IpAddress src) throws Exception {        Buffer retval;        DataOutputStream out=null;        try {            out_stream.reset();            out=new DataOutputStream(out_stream);            out.writeShort(Version.version); // write the version            out.writeBoolean(false); // single message, *not* a list of messages            nullAddresses(msg, dest, src);            msg.writeTo(out);            revertAddresses(msg, dest, src);            out.flush();            retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());            return retval;        }        finally {            Util.closeOutputStream(out);        }    }    private void nullAddresses(Message msg, IpAddress dest, IpAddress src) {        msg.setDest(null);        if(!dest.isMulticastAddress()) { // unicast            if(src != null) {                if(null_src_addresses)                    msg.setSrc(new IpAddress(src.getPort(), false)); // null the host part, leave the port                if(src.getAdditionalData() != null)                    ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData());            }            else {                msg.setSrc(null);            }        }        else {  // multicast            if(src != null) {                if(null_src_addresses)                    msg.setSrc(new IpAddress(src.getPort(), false));  // null the host part, leave the port                if(src.getAdditionalData() != null)                    ((IpAddress)msg.getSrc()).setAdditionalData(src.getAdditionalData());            }        }    }    private void revertAddresses(Message msg, IpAddress dest, IpAddress src) {        msg.setDest(dest);        msg.setSrc(src);    }    private Message bufferToMessage(DataInputStream instream, IpAddress dest, InetAddress sender, int port)            throws Exception {        Message msg=new Message();        msg.readFrom(instream);        setAddresses(msg, dest, sender, port);        return msg;    }    private void setAddresses(Message msg, IpAddress dest, InetAddress sender, int port) {        // set the destination address        if(msg.getDest() == null && dest != null)            msg.setDest(dest);        // set the source address if not set        IpAddress src_addr=(IpAddress)msg.getSrc();        if(src_addr == null) {            try {msg.setSrc(new IpAddress(sender, port));} catch(Throwable t) {}        }        else {            byte[] tmp_additional_data=src_addr.getAdditionalData();            if(src_addr.getIpAddress() == null) {                try {msg.setSrc(new IpAddress(sender, src_addr.getPort()));} catch(Throwable t) {}            }            if(tmp_additional_data != null)                ((IpAddress)msg.getSrc()).setAdditionalData(tmp_additional_data);        }    }    private Buffer listToBuffer(List l, IpAddress dest) throws Exception {        Buffer retval=null;        IpAddress src;        Message msg;        int len=l != null? l.size() : 0;        boolean src_written=false;        DataOutputStream out=null;        out_stream.reset();        try {            out=new DataOutputStream(out_stream);            out.writeShort(Version.version);            out.writeBoolean(true);            out.writeInt(len);            for(Enumeration en=l.elements(); en.hasMoreElements();) {                msg=(Message)en.nextElement();                src=(IpAddress)msg.getSrc();                if(!src_written) {                    Util.writeAddress(src, out);                    src_written=true;                }                msg.setDest(null);                msg.setSrc(null);                msg.writeTo(out);                revertAddresses(msg, dest, src);            }            out.flush();            retval=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());            return retval;        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -