📄 xbfparser.java
字号:
/*------------------------------------------------------------------------------Name: XbfParser.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: XbfParser class for raw socket messages------------------------------------------------------------------------------*/package org.xmlBlaster.util.xbformat;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.plugin.I_PluginConfig;import org.xmlBlaster.util.MsgUnitRaw;import java.io.IOException;import java.io.InputStream;/** * XbfParser class for raw socket/email messages. * <br /> * This class creates and parses raw byte[] messages which can be used * to transfer over a socket connection. * <br /> * XbfParser instances may be reused, but are NOT reentrant (there are many 'global' variables) * <br /> * Please read the requirement specification * <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.socket.html">protocol.socket</a> * * <pre> * msgLen[10] flag[6] requestId methodName sessionId lenUnzipped userData checkSum[10] * +---------+-------+------ -*----------*-----------*-----------*-----------+----------+ * * * The 'userData' consists of 0-n of these: * * qos key len content * +-----*---------*-----*----------+ * * * Examples, '*' marks a null byte and '|' is just to show the boundary (is not part of the message): * * Testing qos/key/content * | 83**I**17711*publish*oxf6hZs**<qos></qos>*<key oid='hello'/>*11*Hello world| * * Testing qos/key * | 70**I**17711*get*oxf6hZs**<qos></qos>*<key oid='ooo'></key>*0*| * * Testing publish return with qos * | 48**R**17711*publish*oxf6hZs**<qos/>**0*| * * Testing nothing * | 38**I**17711*get*oxf6hZs****0*| * * Testing ping: * | 29**I**11*ping*****0*| * * Testing XmlBlasterException * | 76**E**17711*get*oxf6hZs**XbfParser*An XmlBlasterException test only*0*| * * Testing qos/key/content return value * | 85**R**17711*publish***<qos></qos>*<key oid='hello'/>*20*Hello world response| * * Testing a QoS return value * | 58**R**17711*get***<qos><state id='OK'/></qos>**0*| * * Testing two qos/key/content * | 100**I**17711*publish*oxf6hZs**<qos/>*<key oid='x1'/>*6*Hello1<qos/>*<key oid='x2'/>*6*Hello2| * </pre> * * @author xmlBlaster@marcelruff.info * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/protocol.socket.html">The protocol.socket requirement</a> */public class XbfParser implements I_MsgInfoParser{ /* TODO: Try performance with b[i*2] = (byte)(c & 0xff); b[i*2 + 1] = (byte)((c >> 8) & 0xff); to cast char[] into byte[] */ private static final String ME = "xbformat.XbfParser"; private Global glob; private static Logger log = Logger.getLogger(XbfParser.class.getName()); public static final int NUM_FIELD_LEN = 10; public static final int FLAG_FIELD_LEN = 6; public static final int MAX_STRING_LEN = Integer.MAX_VALUE; public static final String EMPTY_STRING = ""; public static final byte CHECKSUM_ADLER_BYTE = (byte)65; // 'A' public static final byte COMPRESSED_GZIP_BYTE = (byte)90; // 'Z' public static final byte VERSION_1_BYTE = (byte)49; // '1' private static final byte[] EMPTY10 = new String(" ").getBytes(); private static final byte NULL_BYTE = (byte)0; public static final String XBFORMAT_EXTENSION = ".xbf"; public static final String XBFORMAT_ZLIB_EXTENSION = ".xbfz"; public static final String XBFORMAT_MIMETYPE = "application/xmlBlaster-xbf"; public static final String XBFORMAT_ZLIB_MIMETYPE = "application/xmlBlaster-xbfz"; // create only once, for low level parsing //private ByteArray byteArray = new ByteArray(256); private Buf buf; private byte[] first10; private long lenUnzipped; private long checkSumResult; private int maxMsgLength = Integer.MAX_VALUE; // TODO: Set by environment or calulate by LowMemoryDetector.java physical memory (expects JDK 1.5) /** If not null somebody wants to be notified about the current bytes send over socket */ private I_ProgressListener progressListener; static { MsgInfoParserFactory.instance().register(XBFORMAT_EXTENSION, XbfParser.class.getName()); MsgInfoParserFactory.instance().register(XBFORMAT_ZLIB_EXTENSION, XbfParser.class.getName()); MsgInfoParserFactory.instance().register(XBFORMAT_MIMETYPE, XbfParser.class.getName()); MsgInfoParserFactory.instance().register(XBFORMAT_ZLIB_MIMETYPE, XbfParser.class.getName()); } public XbfParser() { //initialize(); } public void init(Global glob, I_ProgressListener progressListener, I_PluginConfig pluginConfig) { this.glob = glob; this.progressListener = progressListener; //this.someConfig = glob.get("someConfig", (String)null, null, pluginConfig); } private void initialize() { this.buf = new Buf(); this.first10 = new byte[NUM_FIELD_LEN]; } /** * @param isCompressed true/false * @return XBFORMAT_MIMETYPE = "application/xmlBlaster-xbf"; * XBFORMAT_ZLIB_MIMETYPE = "application/xmlBlaster-xbfz"; */ public final String getMimetype(boolean isCompressed) { return (isCompressed) ? XBFORMAT_ZLIB_MIMETYPE : XBFORMAT_MIMETYPE; } /** * @param isCompressed true/false * @return XBFORMAT_EXTENSION = ".xbf"; * XBFORMAT_ZLIB_EXTENSION = ".xbfz"; */ public final String getExtension(boolean isCompressed) { return (isCompressed) ? XBFORMAT_ZLIB_EXTENSION : XBFORMAT_EXTENSION; } /** * Blocks on socket until a complete message is read. * @return A complete message in a byte[]. * NOTE: The first 10 bytes are not initialized.<br /> * null: An empty message which only contains the header 10 bytes */ private final Buf readOneMsg(MsgInfo msgInfo, InputStream in) throws IOException { if (log.isLoggable(Level.FINE)) log.fine("Entering readOneMsg(), waiting on inputStream"); // First we extract the first 10 bytes to get the msgLength ... int remainLength = NUM_FIELD_LEN; int lenRead; int msgLength = 0; I_ProgressListener listener = null; synchronized (in) { { int off = 0; while ((lenRead = in.read(first10, off, remainLength)) != -1) { remainLength -= lenRead; if (remainLength == 0) break; off += lenRead; //log.info(ME, "Receive: lenRead=" + lenRead + " off=" + off + " remainLength=" + remainLength); } } if (lenRead == -1) // if (sock.isClosed()) // since JDK 1.4 // throw new IOException("Can't read message header (first 10 bytes) from socket, message is corrupted"); throw new IOException(ME + ": Got EOF, lost socket connection"); try { msgLength = Integer.parseInt((new String(first10, 0, NUM_FIELD_LEN)).trim()); } catch (NumberFormatException e) { throw new IOException(ME + ": Format of xbf-message header is corrupted '" + new String(first10) + "', expected integral value"); } listener = this.progressListener; if (listener != null) { listener.progressRead("", 10, msgLength); } if (log.isLoggable(Level.FINE)) log.fine("Got first 10 bytes of total length=" + msgLength); if (msgLength == NUM_FIELD_LEN) return null; // An empty message only contains the header 10 bytes else if (msgLength < (NUM_FIELD_LEN+FLAG_FIELD_LEN)) throw new IOException(ME + ": Message format is corrupted, the given message length=" + msgLength + " is invalid"); if (msgLength > maxMsgLength) { throw new IOException(ME + ": Message format is corrupted, the given message length=" + msgLength + " would produce an OutOfMemory"); } // Now we know the msgLength, lets extract the complete message ... if (buf.buf == null || buf.buf.length != msgLength) { buf.buf = null; try { buf.buf = new byte[msgLength]; } catch (OutOfMemoryError e) { throw new IOException(ME + ": Message format is corrupted, the given message length=" + msgLength + " produces:" + e.toString()); } buf.offset = 0; } buf.offset = NUM_FIELD_LEN; remainLength = msgLength - buf.offset; while ((lenRead = in.read(buf.buf, buf.offset, remainLength)) != -1) { remainLength -= lenRead; listener = this.progressListener; if (listener != null) { listener.progressRead("", msgLength-remainLength, msgLength); } if (remainLength == 0) break; buf.offset += lenRead; //log.info(ME, "Receive: lenRead=" + lenRead + " buf.offset=" + buf.offset + " remainLength=" + remainLength); } } if (lenRead == -1) throw new IOException(ME + ": Can't read complete message (" + msgLength + " bytes) from socket, only " + remainLength + " received, message is corrupted"); if (remainLength != 0) // assert throw new IOException(ME + ": Internal error, can't read complete message (" + msgLength + " bytes) from socket, only " + remainLength + " received, message is corrupted"); return buf; } /** * This parses the raw message from an InputStream (typically from a socket). * Use the get...() methods to access the data. * <p /> * This method blocks until a message arrives * @return Guaranteed to be always an array of length=1 */ public final MsgInfo[] parse(InputStream in) throws IOException, IllegalArgumentException { if (log.isLoggable(Level.FINER)) log.finer("Entering parse()"); MsgInfo msgInfo = new MsgInfo(this.glob); msgInfo.setMsgInfoParser(this); initialize(); Buf buf = readOneMsg(msgInfo, in); // blocks until one message is read if (buf == null) { msgInfo.setMethodName(MethodName.PING); return new MsgInfo[] { msgInfo }; // The shortest ping ever } if (log.isLoggable(Level.FINEST)) log.finest("Raw message of length " + buf.buf.length + " received >" + toLiteral(buf.buf) + "<"); msgInfo.setChecksum(buf.buf[NUM_FIELD_LEN] > 0); if (msgInfo.isChecksum()) { log.warning("Ignoring checksum flag"); } msgInfo.setCompressed(buf.buf[NUM_FIELD_LEN+1] > 0); if (msgInfo.isCompressed()) { log.warning("Ignoring compress flag"); } msgInfo.setType(buf.buf[NUM_FIELD_LEN+2]); msgInfo.setByte4(buf.buf[NUM_FIELD_LEN+3]); msgInfo.setByte5(buf.buf[NUM_FIELD_LEN+4]); msgInfo.setVersion(buf.buf[NUM_FIELD_LEN+5] - 48); if (msgInfo.getVersion() != 1) { log.warning("Ignoring version=" + msgInfo.getVersion() + " on 1 is supported"); } buf.offset = NUM_FIELD_LEN+FLAG_FIELD_LEN; msgInfo.setRequestId(toString(buf)); msgInfo.setMethodName(MethodName.toMethodName(toString(buf))); msgInfo.setSecretSessionId(toString(buf)); lenUnzipped = toInt0(buf, -1); if (lenUnzipped != -1) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring given unzipped message length of size " + lenUnzipped); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -