jdbcmessage.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 866 行 · 第 1/2 页

JAVA
866
字号
      }	        case STREAM:      {	InputStream is = rs.getBinaryStream(5);	try {	  msg = readStreamMessage(is);	} finally {	  if (is != null)	    is.close();	}	break;      }	        case OBJECT:      {	InputStream is = rs.getBinaryStream(5);	try {	  msg = readObjectMessage(is);	} finally {	  if (is != null)	    is.close();	}	break;      }	        case MAP:      {	InputStream is = rs.getBinaryStream(5);	try {	  msg = readMapMessage(is);	} finally {	  if (is != null)	    is.close();	}	break;      }	        case MESSAGE:    default:      {	msg = new MessageImpl();	break;      }    }	      InputStream is = rs.getBinaryStream(6);        if (is != null) {      try {	readMessageHeader(is, msg);      } finally {	is.close();      }    }    msg.setJMSMessageID(msgId);    msg.setJMSRedelivered(redelivered);    return msg;  }  /**   * Writes the message header for a Resin message.   */  private void writeMessageHeader(WriteStream ws, Message msg)    throws IOException, JMSException  {    Enumeration names = msg.getPropertyNames();    CharBuffer cb = new CharBuffer();    while (names.hasMoreElements()) {      String name = (String) names.nextElement();      writeValue(ws, cb, name);            String value = msg.getStringProperty(name);      writeValue(ws, cb, value);    }  }  /**   * Writes a value to the output stream.   */  private void writeValue(WriteStream ws, CharBuffer cb, Object value)    throws IOException  {    if (value == null)      ws.write('N');    else {      cb.clear();      cb.append(value);      int length = cb.length();      char []buf = cb.getBuffer();      ws.write('S');      ws.write(length >> 24);      ws.write(length >> 16);      ws.write(length >> 8);      ws.write(length);      for (int i = 0; i < length; i++) {	int ch = buf[i];	ws.write(ch >> 8);	ws.write(ch);      }    }  }  /**   * Writes the bytes message.   */  private TempStream writeBytes(BytesMessage bytes)    throws IOException, JMSException  {    TempStream body = new TempStream();    body.openWrite();	    WriteStream ws = new WriteStream(body);	    int data;    //bytes.reset();    TempBuffer tb = TempBuffer.allocate();    byte []buffer = tb.getBuffer();    int len;        while ((len = bytes.readBytes(buffer, buffer.length)) >= 0) {      ws.write(buffer, 0, len);    }    TempBuffer.free(tb);    tb = null;        ws.close();    return body;  }  /**   * Writes the stream message.   */  private TempStream writeStream(StreamMessage stream)    throws IOException, JMSException  {    TempStream body = new TempStream();    body.openWrite();	    WriteStream ws = new WriteStream(body);    ObjectOutputStream out = new ObjectOutputStream(ws);    try {      while (true) {	Object data = stream.readObject();	out.writeObject(data);      }    } catch (MessageEOFException e) {    }    out.close();    ws.close();    return body;  }  /**   * Writes the object message.   */  private TempStream writeObject(ObjectMessage obj)    throws IOException, JMSException  {    TempStream body = new TempStream();    body.openWrite();	    WriteStream ws = new WriteStream(body);    ObjectOutputStream out = new ObjectOutputStream(ws);    out.writeObject(obj.getObject());    out.close();    ws.close();    return body;  }  /**   * Writes the map message.   */  private TempStream writeMap(MapMessage map)    throws IOException, JMSException  {    TempStream body = new TempStream();    body.openWrite();	    WriteStream ws = new WriteStream(body);    ObjectOutputStream out = new ObjectOutputStream(ws);    try {      Enumeration e = map.getMapNames();      while (e.hasMoreElements()) {	String name = (String) e.nextElement();	out.writeUTF(name);		Object data = map.getObject(name);	out.writeObject(data);      }    } catch (MessageEOFException e) {    }    out.close();    ws.close();    return body;  }  /**   * Writes the message header for a Resin message.   */  private void readMessageHeader(InputStream is, Message msg)    throws IOException, JMSException  {    CharBuffer cb = new CharBuffer();    int type;    while ((type = is.read()) > 0) {      String name = (String) readValue(is, type, cb);      Object value = readValue(is, is.read(), cb);      msg.setObjectProperty(name, value);    }  }  /**   * Writes the message header for a Resin message.   */  private TextMessageImpl readTextMessage(InputStream is)    throws IOException, JMSException  {    TextMessageImpl text = new TextMessageImpl();    if (is == null)      return text;    ByteToChar byteToChar = ByteToChar.create();    int ch;    byteToChar.setEncoding("UTF-8");    while ((ch = is.read()) >= 0) {      byteToChar.addByte(ch);    }    text.setText(byteToChar.getConvertedString());        return text;  }  /**   * Reads a bytes message.   */  private BytesMessageImpl readBytesMessage(InputStream is)    throws IOException, JMSException  {    BytesMessageImpl bytes = new BytesMessageImpl();    if (is == null) {      bytes.reset();            return bytes;    }    int data;    while ((data = is.read()) >= 0) {      bytes.writeByte((byte) data);    }    bytes.reset();        return bytes;  }  /**   * Reads a stream message.   */  private StreamMessageImpl readStreamMessage(InputStream is)    throws IOException, JMSException  {    StreamMessageImpl stream = new StreamMessageImpl();    if (is == null)      return stream;    ObjectInputStream in = new ContextLoaderObjectInputStream(is);    try {      while (true) {	Object obj = in.readObject();	stream.writeObject(obj);      }    } catch (EOFException e) {    } catch (Exception e) {      throw new JMSExceptionWrapper(e);    }    in.close();    stream.reset();        return stream;  }  /**   * Reads a map message.   */  private MapMessageImpl readMapMessage(InputStream is)    throws IOException, JMSException  {    MapMessageImpl map = new MapMessageImpl();    if (is == null)      return map;    ObjectInputStream in = new ContextLoaderObjectInputStream(is);    try {      while (true) {	String name = in.readUTF();	Object obj = in.readObject();	map.setObject(name, obj);      }    } catch (EOFException e) {    } catch (Exception e) {      throw new JMSExceptionWrapper(e);    }    in.close();    return map;  }  /**   * Reads an object message.   */  private ObjectMessageImpl readObjectMessage(InputStream is)    throws IOException, JMSException  {    ObjectMessageImpl msg = new ObjectMessageImpl();    if (is == null)      return msg;    ObjectInputStream in = new ContextLoaderObjectInputStream(is);    try {      Object obj = in.readObject();      msg.setObject((java.io.Serializable) obj);    } catch (IOException e) {      throw e;    } catch (Exception e) {      throw new JMSExceptionWrapper(e);    }    in.close();        return msg;  }  /**   * Writes a value to the output stream.   */  private Object readValue(InputStream is, int type, CharBuffer cb)    throws IOException  {    switch (type) {    case 'N':      return null;    case 'S':      {	cb.clear();	int length = readInt(is);	for (int i = 0; i < length; i++) {	  char ch = (char) ((is.read() << 8) + is.read());	  cb.append(ch);	}	return cb.toString();      }    default:      throw new IOException(L.l("unknown header type"));    }  }  /**   * Reads an integer value.   */  private int readInt(InputStream is)    throws IOException  {    return ((is.read() << 24) +	    (is.read() << 16) +	    (is.read() << 8) +	    (is.read()));  }  /**   * Removes the first message matching the selector.   */  private boolean hasMessage(Selector selector)    throws JMSException  {    return false;  }  /**   * Returns a printable view of the queue.   */  public String toString()  {    return "JdbcMessage[" + _messageTable + "]";  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?