message.java
来自「JGRoups源码」· Java 代码 · 共 765 行 · 第 1/2 页
JAVA
765 行
} } /** * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time * some estimated size/header. The latter is needed because we don't want to marshal all headers just * to find out their size requirements. If a header implements Sizeable, the we can get the correct * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to * determine whether to fragment a message or not. Fragmentation will then serialize the message, * therefore getting the correct value. */ /** * Returns the exact size of the marshalled message. Uses method size() of each header to compute the size, so if * a Header subclass doesn't implement size() we will use an approximation. However, most relevant header subclasses * have size() implemented correctly. (See org.jgroups.tests.SizeTest). * @return The number of bytes for the marshalled message */ public long size() { long retval=Global.BYTE_SIZE // leading byte + length // buffer + (buf != null? Global.INT_SIZE : 0); // if buf != null 4 bytes for length // if(dest_addr != null) // retval+=dest_addr.size(); if(src_addr != null) retval+=(src_addr).size(); Map.Entry entry; String key; Header hdr; retval+=Global.SHORT_SIZE; // size (short) for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); key=(String)entry.getKey(); retval+=key.length() +2; // not the same as writeUTF(), but almost hdr=(Header)entry.getValue(); retval+=5; // 1 for presence of magic number, 4 for magic number retval+=hdr.size(); } return retval; } public String printObjectHeaders() { StringBuffer sb=new StringBuffer(); Map.Entry entry; if(headers != null) { for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); } } return sb.toString(); } /* ----------------------------------- Interface Externalizable ------------------------------- */ public void writeExternal(ObjectOutput out) throws IOException { int len; Externalizable hdr; Map.Entry entry; if(dest_addr != null) { out.writeBoolean(true); Marshaller.write(dest_addr, out); } else { out.writeBoolean(false); } if(src_addr != null) { out.writeBoolean(true); Marshaller.write(src_addr, out); } else { out.writeBoolean(false); } if(buf == null) out.writeInt(0); else { out.writeInt(length); out.write(buf, offset, length); } len=headers.size(); out.writeInt(len); for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); out.writeUTF((String)entry.getKey()); hdr=(Externalizable)entry.getValue(); Marshaller.write(hdr, out); } } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { boolean destAddressExist=in.readBoolean(); if(destAddressExist) { dest_addr=(Address)Marshaller.read(in); if(!DISABLE_CANONICALIZATION) dest_addr=canonicalAddress(dest_addr); } boolean srcAddressExist=in.readBoolean(); if(srcAddressExist) { src_addr=(Address)Marshaller.read(in); if(!DISABLE_CANONICALIZATION) src_addr=canonicalAddress(src_addr); } int i=in.readInt(); if(i != 0) { buf=new byte[i]; in.readFully(buf); offset=0; length=buf.length; } int len=in.readInt(); while(len-- > 0) { Object key=in.readUTF(); Object value=Marshaller.read(in); headers.put(key, value); } } /* --------------------------------- End of Interface Externalizable ----------------------------- */ /* ----------------------------------- Interface Streamable ------------------------------- */ /** * Streams all members (dest and src addresses, buffer and headers) to the output stream. * @param out * @throws IOException */ public void writeTo(DataOutputStream out) throws IOException { byte leading=0;// if(dest_addr != null) {// leading+=DEST_SET;// if(dest_addr instanceof IpAddress)// leading+=IPADDR_DEST;// } if(src_addr != null) { leading+=SRC_SET; if(src_addr instanceof IpAddress) { leading+=IPADDR_SRC; if(((IpAddress)src_addr).getIpAddress() == null) { leading+=SRC_HOST_NULL; } } } if(buf != null) leading+=BUF_SET; // 1. write the leading byte first out.write(leading); // 2. dest_addr// if(dest_addr != null) {// if(dest_addr instanceof IpAddress)// dest_addr.writeTo(out);// else// Util.writeAddress(dest_addr, out);// } // 3. src_addr if(src_addr != null) { if(src_addr instanceof IpAddress) { src_addr.writeTo(out); } else { Util.writeAddress(src_addr, out); } } // 4. buf if(buf != null) { out.writeInt(length); out.write(buf, offset, length); } // 5. headers int size=headers.size(); out.writeShort(size); Map.Entry entry; for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); out.writeUTF((String)entry.getKey()); writeHeader((Header)entry.getValue(), out); } } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { int len, leading; String hdr_name; Header hdr; // 1. read the leading byte first leading=in.readByte(); // 1. dest_addr// if((leading & DEST_SET) == DEST_SET) {// if((leading & IPADDR_DEST) == IPADDR_DEST) {// dest_addr=new IpAddress();// dest_addr.readFrom(in);// }// else {// dest_addr=Util.readAddress(in);// }// } // 2. src_addr if((leading & SRC_SET) == SRC_SET) { if((leading & IPADDR_SRC) == IPADDR_SRC) { src_addr=new IpAddress(); src_addr.readFrom(in); } else { src_addr=Util.readAddress(in); } if(!DISABLE_CANONICALIZATION) src_addr=canonicalAddress(src_addr); } // 3. buf if((leading & BUF_SET) == BUF_SET) { len=in.readInt(); buf=new byte[len]; in.read(buf, 0, len); length=len; } // 4. headers len=in.readShort(); headers=createHeaders(len); for(int i=0; i < len; i++) { hdr_name=in.readUTF(); hdr=readHeader(in); headers.put(hdr_name, hdr); } } /* --------------------------------- End of Interface Streamable ----------------------------- */ /* ----------------------------------- Private methods ------------------------------- */ private static void writeHeader(Header value, DataOutputStream out) throws IOException { int magic_number; String classname; ObjectOutputStream oos=null; try { magic_number=ClassConfigurator.getInstance(false).getMagicNumber(value.getClass()); // write the magic number or the class name if(magic_number == -1) { out.writeBoolean(false); classname=value.getClass().getName(); out.writeUTF(classname); } else { out.writeBoolean(true); out.writeInt(magic_number); } // write the contents if(value instanceof Streamable) { ((Streamable)value).writeTo(out); } else { oos=new ObjectOutputStream(out); value.writeExternal(oos); if(!nonStreamableHeaders.contains(value.getClass())) { nonStreamableHeaders.add(value.getClass()); if(log.isTraceEnabled()) log.trace("encountered non-Streamable header: " + value.getClass()); } } } catch(ChannelException e) { IOException io_ex=new IOException("failed writing header"); io_ex.initCause(e); throw io_ex; } finally { if(oos != null) oos.close(); // this is a no-op on ByteArrayOutputStream } } private static Header readHeader(DataInputStream in) throws IOException { Header hdr; boolean use_magic_number=in.readBoolean(); int magic_number; String classname; Class clazz; ObjectInputStream ois=null; try { if(use_magic_number) { magic_number=in.readInt(); clazz=ClassConfigurator.getInstance(false).get(magic_number); if(clazz == null) log.error("magic number " + magic_number + " is not available in magic map"); } else { classname=in.readUTF(); clazz=ClassConfigurator.getInstance(false).get(classname); } hdr=(Header)clazz.newInstance(); if(hdr instanceof Streamable) { ((Streamable)hdr).readFrom(in); } else { ois=new ObjectInputStream(in); hdr.readExternal(ois); } } catch(Exception ex) { IOException io_ex=new IOException("failed reading header"); io_ex.initCause(ex); throw io_ex; } return hdr; } private static Map createHeaders(int size) { return size > 0? new ConcurrentReaderHashMap(size) : new ConcurrentReaderHashMap(); } private static Map createHeaders(Map m) { return new ConcurrentReaderHashMap(m); } /** canonicalize addresses to some extent. There are race conditions * allowed in this method, so it may not fully canonicalize an address * @param nonCanonicalAddress * @return canonical representation of the address */ private static Address canonicalAddress(Address nonCanonicalAddress) { Address result=null; if(nonCanonicalAddress == null) { return null; } // do not synchronize between get/put on the canonical map to avoid cost of contention // this can allow multiple equivalent addresses to leak out, but it's worth the cost savings try { result=(Address)canonicalAddresses.get(nonCanonicalAddress); } catch(NullPointerException npe) { // no action needed } if(result == null) { result=nonCanonicalAddress; canonicalAddresses.put(nonCanonicalAddress, result); } return result; } /* ------------------------------- End of Private methods ---------------------------- */}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?