message.java

来自「JGRoups源码」· Java 代码 · 共 765 行 · 第 1/2 页

JAVA
765
字号
        }    }    /**     * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time     * some estimated size/header. The latter is needed because we don't want to marshal all headers just     * to find out their size requirements. If a header implements Sizeable, the we can get the correct     * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to     * determine whether to fragment a message or not. Fragmentation will then serialize the message,     * therefore getting the correct value.     */    /**     * Returns the exact size of the marshalled message. Uses method size() of each header to compute the size, so if     * a Header subclass doesn't implement size() we will use an approximation. However, most relevant header subclasses     * have size() implemented correctly. (See org.jgroups.tests.SizeTest).     * @return The number of bytes for the marshalled message     */    public long size() {        long retval=Global.BYTE_SIZE                  // leading byte                + length                              // buffer                + (buf != null? Global.INT_SIZE : 0); // if buf != null 4 bytes for length        // if(dest_addr != null)           // retval+=dest_addr.size();        if(src_addr != null)            retval+=(src_addr).size();            Map.Entry entry;            String key;            Header hdr;            retval+=Global.SHORT_SIZE; // size (short)            for(Iterator it=headers.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                key=(String)entry.getKey();                retval+=key.length() +2; // not the same as writeUTF(), but almost                hdr=(Header)entry.getValue();                retval+=5; // 1 for presence of magic number, 4 for magic number                retval+=hdr.size();            }        return retval;    }    public String printObjectHeaders() {        StringBuffer sb=new StringBuffer();        Map.Entry entry;        if(headers != null) {            for(Iterator it=headers.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');            }        }        return sb.toString();    }    /* ----------------------------------- Interface Externalizable ------------------------------- */    public void writeExternal(ObjectOutput out) throws IOException {        int             len;        Externalizable  hdr;        Map.Entry       entry;        if(dest_addr != null) {            out.writeBoolean(true);            Marshaller.write(dest_addr, out);        }        else {            out.writeBoolean(false);        }        if(src_addr != null) {            out.writeBoolean(true);            Marshaller.write(src_addr, out);        }        else {            out.writeBoolean(false);        }        if(buf == null)            out.writeInt(0);        else {            out.writeInt(length);            out.write(buf, offset, length);        }        len=headers.size();        out.writeInt(len);        for(Iterator it=headers.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            out.writeUTF((String)entry.getKey());            hdr=(Externalizable)entry.getValue();            Marshaller.write(hdr, out);        }    }    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {        boolean  destAddressExist=in.readBoolean();        if(destAddressExist) {            dest_addr=(Address)Marshaller.read(in);            if(!DISABLE_CANONICALIZATION)                dest_addr=canonicalAddress(dest_addr);        }        boolean srcAddressExist=in.readBoolean();        if(srcAddressExist) {            src_addr=(Address)Marshaller.read(in);            if(!DISABLE_CANONICALIZATION)                src_addr=canonicalAddress(src_addr);        }        int i=in.readInt();        if(i != 0) {            buf=new byte[i];            in.readFully(buf);            offset=0;            length=buf.length;        }        int len=in.readInt();        while(len-- > 0) {            Object key=in.readUTF();            Object value=Marshaller.read(in);            headers.put(key, value);        }    }    /* --------------------------------- End of Interface Externalizable ----------------------------- */    /* ----------------------------------- Interface Streamable  ------------------------------- */    /**     * Streams all members (dest and src addresses, buffer and headers) to the output stream.     * @param out     * @throws IOException     */    public void writeTo(DataOutputStream out) throws IOException {        byte leading=0;//        if(dest_addr != null) {//            leading+=DEST_SET;//            if(dest_addr instanceof IpAddress)//                leading+=IPADDR_DEST;//        }        if(src_addr != null) {            leading+=SRC_SET;            if(src_addr instanceof IpAddress) {                leading+=IPADDR_SRC;                if(((IpAddress)src_addr).getIpAddress() == null) {                    leading+=SRC_HOST_NULL;                }            }        }        if(buf != null)            leading+=BUF_SET;        // 1. write the leading byte first        out.write(leading);        // 2. dest_addr//        if(dest_addr != null) {//            if(dest_addr instanceof IpAddress)//                dest_addr.writeTo(out);//            else//                Util.writeAddress(dest_addr, out);//        }        // 3. src_addr        if(src_addr != null) {            if(src_addr instanceof IpAddress) {                src_addr.writeTo(out);            }            else {                Util.writeAddress(src_addr, out);            }        }        // 4. buf        if(buf != null) {            out.writeInt(length);            out.write(buf, offset, length);        }        // 5. headers        int size=headers.size();        out.writeShort(size);        Map.Entry        entry;        for(Iterator it=headers.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            out.writeUTF((String)entry.getKey());            writeHeader((Header)entry.getValue(), out);        }    }    public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {        int len, leading;        String hdr_name;        Header hdr;        // 1. read the leading byte first        leading=in.readByte();        // 1. dest_addr//        if((leading & DEST_SET) == DEST_SET) {//            if((leading & IPADDR_DEST) == IPADDR_DEST) {//                dest_addr=new IpAddress();//                dest_addr.readFrom(in);//            }//            else {//                dest_addr=Util.readAddress(in);//            }//        }        // 2. src_addr        if((leading & SRC_SET) == SRC_SET) {            if((leading & IPADDR_SRC) == IPADDR_SRC) {                src_addr=new IpAddress();                src_addr.readFrom(in);            }            else {                src_addr=Util.readAddress(in);            }            if(!DISABLE_CANONICALIZATION)                src_addr=canonicalAddress(src_addr);        }        // 3. buf        if((leading & BUF_SET) == BUF_SET) {            len=in.readInt();            buf=new byte[len];            in.read(buf, 0, len);            length=len;        }        // 4. headers        len=in.readShort();        headers=createHeaders(len);        for(int i=0; i < len; i++) {            hdr_name=in.readUTF();            hdr=readHeader(in);            headers.put(hdr_name, hdr);        }    }    /* --------------------------------- End of Interface Streamable ----------------------------- */    /* ----------------------------------- Private methods ------------------------------- */    private static void writeHeader(Header value, DataOutputStream out) throws IOException {        int magic_number;        String classname;        ObjectOutputStream oos=null;        try {            magic_number=ClassConfigurator.getInstance(false).getMagicNumber(value.getClass());            // write the magic number or the class name            if(magic_number == -1) {                out.writeBoolean(false);                classname=value.getClass().getName();                out.writeUTF(classname);            }            else {                out.writeBoolean(true);                out.writeInt(magic_number);            }            // write the contents            if(value instanceof Streamable) {                ((Streamable)value).writeTo(out);            }            else {                oos=new ObjectOutputStream(out);                value.writeExternal(oos);                if(!nonStreamableHeaders.contains(value.getClass())) {                    nonStreamableHeaders.add(value.getClass());                    if(log.isTraceEnabled())                        log.trace("encountered non-Streamable header: " + value.getClass());                }            }        }        catch(ChannelException e) {            IOException io_ex=new IOException("failed writing header");            io_ex.initCause(e);            throw io_ex;        }        finally {            if(oos != null)                oos.close(); // this is a no-op on ByteArrayOutputStream        }    }    private static Header readHeader(DataInputStream in) throws IOException {        Header            hdr;        boolean           use_magic_number=in.readBoolean();        int               magic_number;        String            classname;        Class             clazz;        ObjectInputStream ois=null;        try {            if(use_magic_number) {                magic_number=in.readInt();                clazz=ClassConfigurator.getInstance(false).get(magic_number);                if(clazz == null)                    log.error("magic number " + magic_number + " is not available in magic map");            }            else {                classname=in.readUTF();                clazz=ClassConfigurator.getInstance(false).get(classname);            }            hdr=(Header)clazz.newInstance();            if(hdr instanceof Streamable) {               ((Streamable)hdr).readFrom(in);            }            else {                ois=new ObjectInputStream(in);                hdr.readExternal(ois);            }        }        catch(Exception ex) {            IOException io_ex=new IOException("failed reading header");            io_ex.initCause(ex);            throw io_ex;        }        return hdr;    }    private static Map createHeaders(int size) {        return size > 0? new ConcurrentReaderHashMap(size) : new ConcurrentReaderHashMap();    }    private static Map createHeaders(Map m) {        return new ConcurrentReaderHashMap(m);    }    /** canonicalize addresses to some extent.  There are race conditions     * allowed in this method, so it may not fully canonicalize an address     * @param nonCanonicalAddress     * @return canonical representation of the address     */    private static Address canonicalAddress(Address nonCanonicalAddress) {        Address result=null;        if(nonCanonicalAddress == null) {            return null;        }        // do not synchronize between get/put on the canonical map to avoid cost of contention        // this can allow multiple equivalent addresses to leak out, but it's worth the cost savings        try {            result=(Address)canonicalAddresses.get(nonCanonicalAddress);        }        catch(NullPointerException npe) {            // no action needed        }        if(result == null) {            result=nonCanonicalAddress;            canonicalAddresses.put(nonCanonicalAddress, result);        }        return result;    }    /* ------------------------------- End of Private methods ---------------------------- */}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?