jdbcmessage.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 866 行 · 第 1/2 页

JAVA
866
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.jms.jdbc;import com.caucho.config.ConfigException;import com.caucho.jdbc.JdbcMetaData;import com.caucho.jdbc.OracleMetaData;import com.caucho.jms.JMSExceptionWrapper;import com.caucho.jms.message.BytesMessageImpl;import com.caucho.jms.message.MapMessageImpl;import com.caucho.jms.message.MessageImpl;import com.caucho.jms.message.ObjectMessageImpl;import com.caucho.jms.message.StreamMessageImpl;import com.caucho.jms.message.TextMessageImpl;import com.caucho.jms.selector.Selector;import com.caucho.util.CharBuffer;import com.caucho.util.L10N;import com.caucho.vfs.*;import javax.jms.*;import javax.sql.DataSource;import java.io.EOFException;import java.io.IOException;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.sql.Types;import java.util.Enumeration;import java.util.logging.Level;import java.util.logging.Logger;/** * Represents a JDBC message. */public class JdbcMessage{  static final Logger log = Logger.getLogger(JdbcMessage.class.getName());  static final L10N L = new L10N(JdbcMessage.class);  private static final int MESSAGE = 0;  private static final int TEXT = 1;  private static final int BYTES = 2;  private static final int STREAM = 3;  private static final int OBJECT = 4;  private static final int MAP = 5;    private final JdbcManager _jdbcManager;  private DataSource _dataSource;  private String _messageTable;  private String _messageSequence; private boolean _isOracle;  public JdbcMessage(JdbcManager jdbcManager)  {    _jdbcManager = jdbcManager;  }  /**   * Initializes the JdbcMessage   */  public void init()    throws ConfigException, SQLException  {    _messageTable = _jdbcManager.getMessageTable();    _dataSource = _jdbcManager.getDataSource();    JdbcMetaData metaData = _jdbcManager.getMetaData();    _isOracle = metaData instanceof OracleMetaData;    String longType = _jdbcManager.getLongType();    String identity =  longType + " PRIMARY KEY";    if (metaData.supportsIdentity())      identity = metaData.createIdentitySQL(identity);    else      _messageSequence = _messageTable + "_cseq";    Connection conn = _dataSource.getConnection();    try {      Statement stmt = conn.createStatement();      String sql = "SELECT 1 FROM " + _messageTable + " WHERE 1=0";      try {	ResultSet rs = stmt.executeQuery(sql);	rs.next();	rs.close();	stmt.close();	return;      } catch (SQLException e) {	log.finest(e.toString());      }      String blob = _jdbcManager.getBlob();      log.info(L.l("creating JMS message table {0}", _messageTable));            sql = ("CREATE TABLE " + _messageTable + " (" +	     "  m_id " + identity + "," +	     "  queue INTEGER NOT NULL," +	     "  conn VARCHAR(255)," +	     "  consumer " + longType + "," +	     "  delivered INTEGER NOT NULL," +	     "  msg_type INTEGER NOT NULL," +	     "  msg_id VARCHAR(64) NOT NULL," +	     "  priority INTEGER NOT NULL," +	     "  expire " + longType + " NOT NULL," +	     "  header " + blob + "," +	     "  body " + blob +	     ")");      if (_isOracle) {	String extent = "";		if (_jdbcManager.getTablespace() != null) {	  extent = " tablespace " + _jdbcManager.getTablespace();	}	// oracle recommends using retention (over pctversion) for performance	// Oracle will keep deleted lobs for the retention time before	// releasing them (e.g. 900 seconds)	sql += (" LOB(header) STORE AS (cache retention" + extent + ")");	sql += (" LOB(body) STORE AS (cache retention" + extent + ")");      }      stmt.executeUpdate(sql);      if (_messageSequence != null) {	stmt.executeUpdate(metaData.createSequenceSQL(_messageSequence, 1));      }    } finally {      conn.close();    }  }  /**   * Sends the message to the queue.   */  public long send(Message message, int queue, int priority, long expireTime)    throws SQLException, IOException, JMSException  {    if (log.isLoggable(Level.FINE))      log.fine("jms jdbc queue:" + queue + " send message " + message);        String msgId = message.getJMSMessageID();    TempStream header = new TempStream();    header.openWrite();          WriteStream ws = new WriteStream(header);    writeMessageHeader(ws, message);    ws.close();    TempStream body = null;    int type = MESSAGE;    if (message instanceof TextMessage) {      TextMessage text = (TextMessage) message;	      type = TEXT;	      if (text.getText() != null) {	body = new TempStream();	body.openWrite();		ws = new WriteStream(body);	ws.setEncoding("UTF-8");	ws.print(text.getText());	ws.close();      }    }    else if (message instanceof BytesMessage) {      BytesMessage bytes = (BytesMessage) message;      type = BYTES;      body = writeBytes(bytes);    }    else if (message instanceof StreamMessage) {      StreamMessage stream = (StreamMessage) message;      type = STREAM;      body = writeStream(stream);    }    else if (message instanceof ObjectMessage) {      ObjectMessage obj = (ObjectMessage) message;      type = OBJECT;      body = writeObject(obj);    }    else if (message instanceof MapMessage) {      MapMessage obj = (MapMessage) message;      type = MAP;      body = writeMap(obj);    }    Connection conn = _dataSource.getConnection();    try {      String sql;      if (_messageSequence != null) {	sql = _jdbcManager.getMetaData().selectSequenceSQL(_messageSequence);	PreparedStatement pstmt = conn.prepareStatement(sql);;	long mId = -1;		ResultSet rs = pstmt.executeQuery();	if (rs.next())	  mId = rs.getLong(1);	else	  throw new RuntimeException("can't create message");		sql = ("INSERT INTO " + _messageTable +	       "(m_id, queue, msg_type, msg_id, priority, expire, delivered, header, body) " +	       "VALUES (?,?,?,?,?,?,0,?,?)");	pstmt = conn.prepareStatement(sql);	int i = 1;	pstmt.setLong(i++, mId);	pstmt.setInt(i++, queue);	pstmt.setInt(i++, type);	pstmt.setString(i++, msgId);	pstmt.setInt(i++, priority);	pstmt.setLong(i++, expireTime);	if (header.getLength() > 0)	  pstmt.setBinaryStream(i++, header.openRead(), header.getLength());	else	  pstmt.setNull(i++, Types.BINARY);		if (body != null) 	  pstmt.setBinaryStream(i++, body.openRead(), body.getLength());	else	  pstmt.setString(i++, "");	pstmt.executeUpdate();      }      else {	sql = ("INSERT INTO " + _messageTable +	       "(queue, msg_type, msg_id, priority, expire, delivered, header, body) " +	       "VALUES (?,?,?,?,?,0,?,?)");	PreparedStatement pstmt;	pstmt = conn.prepareStatement(sql);	int i = 1;	pstmt.setInt(i++, queue);	pstmt.setInt(i++, type);	pstmt.setString(i++, msgId);	pstmt.setInt(i++, priority);	pstmt.setLong(i++, expireTime);	pstmt.setBinaryStream(i++, header.openRead(), header.getLength());		if (body != null) 	  pstmt.setBinaryStream(i++, body.openRead(), body.getLength());	else	  pstmt.setString(i++, "");	pstmt.executeUpdate();      }      return 0;    } finally {      conn.close();    }  }  /**   * Receives a message from the queue.   */  MessageImpl receive(int queue, int session)    throws SQLException, IOException, JMSException  {    long minId = -1;        Connection conn = _dataSource.getConnection();    try {      String sql = ("SELECT m_id, msg_type, msg_id, delivered, body, header" +		    " FROM " + _messageTable +		    " WHERE ?<id AND queue=? AND consumer IS NULL" +		    " ORDER BY priority DESC, id");      PreparedStatement selectStmt = conn.prepareStatement(sql);            sql = ("UPDATE " + _messageTable + " SET consumer=?, delivered=1 " +	     "WHERE m_id=? AND consumer IS NULL");            PreparedStatement updateStmt = conn.prepareStatement(sql);      long id = -1;      while (true) {	id = -1;		selectStmt.setLong(1, minId);	selectStmt.setInt(2, queue);	MessageImpl msg = null;	ResultSet rs = selectStmt.executeQuery();	while (rs.next()) {	  id = rs.getLong(1);	  minId = id;	  msg = readMessage(rs);	}	rs.close();	if (msg == null)	  return null;	updateStmt.setInt(1, session);	updateStmt.setLong(2, id);	  	int updateCount = updateStmt.executeUpdate();		if (updateCount == 1)	  return msg;	else if (log.isLoggable(Level.FINE)) {	  log.fine("JdbcMessageQueue[" + queue + "] can't update received message " + id + " for session " + session +".");	}      }    } finally {      conn.close();    }  }  /**   * Acknowledges all received messages from the session.   */  void acknowledge(int session)    throws SQLException  {    Connection conn = _dataSource.getConnection();    try {      String sql = ("DELETE FROM " +  _messageTable + " " +		    "WHERE consumer=?");      PreparedStatement pstmt;      pstmt = conn.prepareStatement(sql);      pstmt.setInt(1, session);      pstmt.executeUpdate();      pstmt.close();    } finally {      conn.close();    }  }  /**   * Reads the message from the result stream.   */  MessageImpl readMessage(ResultSet rs)    throws SQLException, IOException, JMSException  {    int msgType = rs.getInt(2);    String msgId = rs.getString(3);    boolean redelivered = rs.getInt(4) == 1;    MessageImpl msg;        switch (msgType) {    case TEXT:      {	InputStream is = rs.getBinaryStream(5);	try {	  msg = readTextMessage(is);	} finally {	  if (is != null)	    is.close();	}	break;      }	        case BYTES:      {	InputStream is = rs.getBinaryStream(5);	try {	  msg = readBytesMessage(is);	} finally {	  if (is != null)	    is.close();	}	break;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?