jdbcmessage.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 866 行 · 第 1/2 页
JAVA
866 行
} case STREAM: { InputStream is = rs.getBinaryStream(5); try { msg = readStreamMessage(is); } finally { if (is != null) is.close(); } break; } case OBJECT: { InputStream is = rs.getBinaryStream(5); try { msg = readObjectMessage(is); } finally { if (is != null) is.close(); } break; } case MAP: { InputStream is = rs.getBinaryStream(5); try { msg = readMapMessage(is); } finally { if (is != null) is.close(); } break; } case MESSAGE: default: { msg = new MessageImpl(); break; } } InputStream is = rs.getBinaryStream(6); if (is != null) { try { readMessageHeader(is, msg); } finally { is.close(); } } msg.setJMSMessageID(msgId); msg.setJMSRedelivered(redelivered); return msg; } /** * Writes the message header for a Resin message. */ private void writeMessageHeader(WriteStream ws, Message msg) throws IOException, JMSException { Enumeration names = msg.getPropertyNames(); CharBuffer cb = new CharBuffer(); while (names.hasMoreElements()) { String name = (String) names.nextElement(); writeValue(ws, cb, name); String value = msg.getStringProperty(name); writeValue(ws, cb, value); } } /** * Writes a value to the output stream. */ private void writeValue(WriteStream ws, CharBuffer cb, Object value) throws IOException { if (value == null) ws.write('N'); else { cb.clear(); cb.append(value); int length = cb.length(); char []buf = cb.getBuffer(); ws.write('S'); ws.write(length >> 24); ws.write(length >> 16); ws.write(length >> 8); ws.write(length); for (int i = 0; i < length; i++) { int ch = buf[i]; ws.write(ch >> 8); ws.write(ch); } } } /** * Writes the bytes message. */ private TempStream writeBytes(BytesMessage bytes) throws IOException, JMSException { TempStream body = new TempStream(); body.openWrite(); WriteStream ws = new WriteStream(body); int data; //bytes.reset(); TempBuffer tb = TempBuffer.allocate(); byte []buffer = tb.getBuffer(); int len; while ((len = bytes.readBytes(buffer, buffer.length)) >= 0) { ws.write(buffer, 0, len); } TempBuffer.free(tb); tb = null; ws.close(); return body; } /** * Writes the stream message. */ private TempStream writeStream(StreamMessage stream) throws IOException, JMSException { TempStream body = new TempStream(); body.openWrite(); WriteStream ws = new WriteStream(body); ObjectOutputStream out = new ObjectOutputStream(ws); try { while (true) { Object data = stream.readObject(); out.writeObject(data); } } catch (MessageEOFException e) { } out.close(); ws.close(); return body; } /** * Writes the object message. */ private TempStream writeObject(ObjectMessage obj) throws IOException, JMSException { TempStream body = new TempStream(); body.openWrite(); WriteStream ws = new WriteStream(body); ObjectOutputStream out = new ObjectOutputStream(ws); out.writeObject(obj.getObject()); out.close(); ws.close(); return body; } /** * Writes the map message. */ private TempStream writeMap(MapMessage map) throws IOException, JMSException { TempStream body = new TempStream(); body.openWrite(); WriteStream ws = new WriteStream(body); ObjectOutputStream out = new ObjectOutputStream(ws); try { Enumeration e = map.getMapNames(); while (e.hasMoreElements()) { String name = (String) e.nextElement(); out.writeUTF(name); Object data = map.getObject(name); out.writeObject(data); } } catch (MessageEOFException e) { } out.close(); ws.close(); return body; } /** * Writes the message header for a Resin message. */ private void readMessageHeader(InputStream is, Message msg) throws IOException, JMSException { CharBuffer cb = new CharBuffer(); int type; while ((type = is.read()) > 0) { String name = (String) readValue(is, type, cb); Object value = readValue(is, is.read(), cb); msg.setObjectProperty(name, value); } } /** * Writes the message header for a Resin message. */ private TextMessageImpl readTextMessage(InputStream is) throws IOException, JMSException { TextMessageImpl text = new TextMessageImpl(); if (is == null) return text; ByteToChar byteToChar = ByteToChar.create(); int ch; byteToChar.setEncoding("UTF-8"); while ((ch = is.read()) >= 0) { byteToChar.addByte(ch); } text.setText(byteToChar.getConvertedString()); return text; } /** * Reads a bytes message. */ private BytesMessageImpl readBytesMessage(InputStream is) throws IOException, JMSException { BytesMessageImpl bytes = new BytesMessageImpl(); if (is == null) { bytes.reset(); return bytes; } int data; while ((data = is.read()) >= 0) { bytes.writeByte((byte) data); } bytes.reset(); return bytes; } /** * Reads a stream message. */ private StreamMessageImpl readStreamMessage(InputStream is) throws IOException, JMSException { StreamMessageImpl stream = new StreamMessageImpl(); if (is == null) return stream; ObjectInputStream in = new ContextLoaderObjectInputStream(is); try { while (true) { Object obj = in.readObject(); stream.writeObject(obj); } } catch (EOFException e) { } catch (Exception e) { throw new JMSExceptionWrapper(e); } in.close(); stream.reset(); return stream; } /** * Reads a map message. */ private MapMessageImpl readMapMessage(InputStream is) throws IOException, JMSException { MapMessageImpl map = new MapMessageImpl(); if (is == null) return map; ObjectInputStream in = new ContextLoaderObjectInputStream(is); try { while (true) { String name = in.readUTF(); Object obj = in.readObject(); map.setObject(name, obj); } } catch (EOFException e) { } catch (Exception e) { throw new JMSExceptionWrapper(e); } in.close(); return map; } /** * Reads an object message. */ private ObjectMessageImpl readObjectMessage(InputStream is) throws IOException, JMSException { ObjectMessageImpl msg = new ObjectMessageImpl(); if (is == null) return msg; ObjectInputStream in = new ContextLoaderObjectInputStream(is); try { Object obj = in.readObject(); msg.setObject((java.io.Serializable) obj); } catch (IOException e) { throw e; } catch (Exception e) { throw new JMSExceptionWrapper(e); } in.close(); return msg; } /** * Writes a value to the output stream. */ private Object readValue(InputStream is, int type, CharBuffer cb) throws IOException { switch (type) { case 'N': return null; case 'S': { cb.clear(); int length = readInt(is); for (int i = 0; i < length; i++) { char ch = (char) ((is.read() << 8) + is.read()); cb.append(ch); } return cb.toString(); } default: throw new IOException(L.l("unknown header type")); } } /** * Reads an integer value. */ private int readInt(InputStream is) throws IOException { return ((is.read() << 24) + (is.read() << 16) + (is.read() << 8) + (is.read())); } /** * Removes the first message matching the selector. */ private boolean hasMessage(Selector selector) throws JMSException { return false; } /** * Returns a printable view of the queue. */ public String toString() { return "JdbcMessage[" + _messageTable + "]"; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?