filequeuestore.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 525 行

JAVA
525
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.jms.file;import java.io.*;import java.util.logging.*;import javax.sql.*;import java.sql.Connection;import java.sql.Statement;import java.sql.PreparedStatement;import java.sql.SQLException;import java.sql.ResultSet;import javax.jms.*;import javax.annotation.*;import com.caucho.jms.queue.*;import com.caucho.jms.message.*;import com.caucho.config.ConfigException;import com.caucho.db.*;import com.caucho.db.jdbc.*;import com.caucho.util.L10N;import com.caucho.java.*;import com.caucho.server.resin.*;import com.caucho.vfs.*;/** * Implements a file queue. */public class FileQueueStore{  private static final L10N L = new L10N(FileQueueStore.class);  private static final Logger log    = Logger.getLogger(FileQueueStore.class.getName());  private static final MessageType []MESSAGE_TYPE = MessageType.values();  private Path _path;  private DataSource _db;  private String _name = "default";  private String _tablePrefix = "jms";  private MessageFactory _messageFactory;  private String _queueTable;  private String _messageTable;  private long _queueId;  private Connection _conn;  private PreparedStatement _sendStmt;  private PreparedStatement _receiveStartStmt;  private PreparedStatement _readStmt;  private PreparedStatement _receiveStmt;  private PreparedStatement _removeStmt;  private PreparedStatement _deleteStmt;  public FileQueueStore(MessageFactory messageFactory)  {    _messageFactory = messageFactory;  }  public void setName(String name)  {    _name = name;  }  public String getName()  {    return _name;  }  /**   * Sets the path to the database   */  public void setPath(Path path)  {    if (! path.exists()) {      try {	path.mkdirs();      } catch (IOException e) {	throw ConfigException.create(e);      }    }        if (! path.isDirectory())      throw new ConfigException(L.l("path '{0}' must be a directory",				    path));    _path = path;  }  /**   * Returns the path to the backing database   */  public Path getPath()  {    return _path;  }  public void setTablePrefix(String prefix)  {    _tablePrefix = prefix;  }  @PostConstruct  public void init()  {    if (_path == null)      _path = WorkDir.getLocalWorkDir();    if (! _path.isDirectory())      throw new ConfigException(L.l("FileQueue requires a valid persistent directory."));    Resin resin = Resin.getLocal();        String serverId = null;    if (resin != null)      serverId = resin.getServerId();    if (serverId == null)      serverId = "anon";    else if ("".equals(serverId))      serverId = "default";    _queueTable = escapeName("jms_queue_" + serverId);    _messageTable = escapeName("jms_message_" + serverId);    try {      DataSourceImpl db = new DataSourceImpl(_path);      db.setRemoveOnError(true);      db.init();      _db = db;      _conn = _db.getConnection();      initDatabase();            initQueue();      initStatements();    } catch (SQLException e) {      throw ConfigException.create(e);    }  }  /**   * Adds a new message to the persistent store.   */  public long send(MessageImpl msg, int priority, long expireTime)  {    synchronized (this) {      try {	_sendStmt.setLong(1, _queueId);	_sendStmt.setInt(2, priority);	_sendStmt.setLong(3, expireTime);	_sendStmt.setString(4, msg.getJMSMessageID());	_sendStmt.setBinaryStream(5, msg.propertiesToInputStream(), 0);	_sendStmt.setInt(6, msg.getType().ordinal());	_sendStmt.setBinaryStream(7, msg.bodyToInputStream(), 0);	_sendStmt.executeUpdate();	if (log.isLoggable(Level.FINE))	  log.fine(this + " send " + msg);	ResultSet rs = _sendStmt.getGeneratedKeys();	if (! rs.next())	  throw new java.lang.IllegalStateException();	long id = rs.getLong(1);	rs.close();	return id;      } catch (Exception e) {	throw new RuntimeException(e);      }    }  }  /**   * Retrieves a message from the persistent store.   */  void receiveStart(FileQueue fileQueue)  {    synchronized (this) {      try {	_receiveStartStmt.setLong(1, _queueId);	ResultSet rs = _receiveStartStmt.executeQuery();	while (rs.next()) {	  long id = rs.getLong(1);	  int priority = rs.getInt(2);	  long expire = rs.getLong(3);	  MessageType type = MESSAGE_TYPE[rs.getInt(4)];	  	  FileQueueEntry entry	    = fileQueue.addEntry(id, -1, priority, expire, type);	}	rs.close();      } catch (Exception e) {	throw new RuntimeException(e);      }    }  }  /**   * Retrieves a message from the persistent store.   */  public MessageImpl readMessage(long id, MessageType type)  {    synchronized (this) {      try {	_readStmt.setLong(1, id);	ResultSet rs = _readStmt.executeQuery();	if (rs.next()) {	  MessageImpl msg;          type = MESSAGE_TYPE[rs.getInt(1)];	  switch (type) {	  case NULL:	    msg = new MessageImpl();	    break;	  case BYTES:	    msg = new BytesMessageImpl();	    break;	  case MAP:	    msg = new MapMessageImpl();	    break;	  case OBJECT:	    msg = new ObjectMessageImpl();	    break;	  case STREAM:	    msg = new StreamMessageImpl();	    break;	  case TEXT:	    msg = new TextMessageImpl();	    break;	  default:	    msg = new MessageImpl();	    break;	  }	  	  String msgId = rs.getString(2);	  msg.setJMSMessageID(msgId);	  InputStream is = rs.getBinaryStream(3);	  if (is != null) {	    msg.readProperties(is);	    is.close();	  }	  is = rs.getBinaryStream(4);	  if (is != null) {	    msg.readBody(is);	    is.close();	  }	  return (MessageImpl) msg;	}	rs.close();      } catch (Exception e) {	throw new RuntimeException(e);      }    }    return null;  }  /**   * Retrieves a message from the persistent store.   */  public MessageImpl receive()  {    synchronized (this) {      try {	_receiveStmt.setLong(1, _queueId);	ResultSet rs = _receiveStmt.executeQuery();	if (rs.next()) {	  long id = rs.getLong(1);	  rs.close();	  Message msg = _messageFactory.createTextMessage("sample");	  _deleteStmt.setLong(1, id);	  _deleteStmt.executeUpdate();	  return (MessageImpl) msg;	}	rs.close();      } catch (Exception e) {	throw new RuntimeException(e);      }    }    return null;  }  /**   * Retrieves a message from the persistent store.   */  public void remove(String id)  {    synchronized (this) {      try {	_removeStmt.setString(1, id);	_removeStmt.executeUpdate();      } catch (Exception e) {	throw new RuntimeException(e);      }    }  }  /**   * Retrieves a message from the persistent store.   */  void delete(long id)  {    synchronized (this) {      try {	_deleteStmt.setLong(1, id);	_deleteStmt.executeUpdate();      } catch (Exception e) {	throw new RuntimeException(e);      }    }  }  private void initDatabase()    throws SQLException  {    String sql = "select id, priority from " + _messageTable + " where 1=0";    Statement stmt = _conn.createStatement();    try {      ResultSet rs = stmt.executeQuery(sql);      rs.close();            return;    } catch (SQLException e) {      log.finer(e.toString());    }    try {      stmt.executeUpdate("drop table " + _queueTable);    } catch (SQLException e) {      log.finer(e.toString());    }    try {      stmt.executeUpdate("drop table " + _messageTable);    } catch (SQLException e) {      log.finer(e.toString());    }    sql = ("create table " + _queueTable + " ("	   + "  id bigint auto_increment,"	   + "  name varchar(128)"	   + ")");    stmt.executeUpdate(sql);    sql = ("create table " + _messageTable + " ("	   + "  id bigint auto_increment,"	   + "  queue bigint,"	   + "  state integer,"	   + "  priority integer,"	   + "  expire datetime,"	   + "  owner_1 bigint,"	   + "  owner_2 bigint,"	   + "  msg_id varchar(64),"	   + "  header blob,"	   + "  type integer,"	   + "  body blob"	   + ")");    stmt.executeUpdate(sql);  }  private void initQueue()    throws SQLException  {    String sql = "select id from " + _queueTable + " where name=?";        PreparedStatement stmt = _conn.prepareStatement(sql);    stmt.setString(1, getName());    ResultSet rs = stmt.executeQuery();    if (rs.next()) {      _queueId = rs.getLong(1);      rs.close();      stmt.close();      return;    }    stmt.close();    sql = "insert into " + _queueTable + " (name) values(?)";    stmt = _conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);    stmt.setString(1, getName());    stmt.executeUpdate();    rs = stmt.getGeneratedKeys();    if (! rs.next())      throw new java.lang.IllegalStateException();    _queueId = rs.getLong(1);    rs.close();    stmt.close();  }  private void initStatements()    throws SQLException  {    String sql = ("insert into " + _messageTable		  + " (queue,priority,expire,msg_id,header,type,body)"		  + " VALUES(?,?,?,?,?,?,?)");        _sendStmt = _conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);        sql = ("select id,msg_id,header,body from " + _messageTable	   + " WHERE queue=? LIMIT 1");        _receiveStmt = _conn.prepareStatement(sql);        sql = ("select type,msg_id,header,body from " + _messageTable	   + " WHERE id=?");        _readStmt = _conn.prepareStatement(sql);        sql = ("select id,priority,expire,type from " + _messageTable	   + " WHERE queue=? AND body is not null ORDER BY id");        _receiveStartStmt = _conn.prepareStatement(sql);        _removeStmt = _conn.prepareStatement(sql);        sql = ("update " + _messageTable	   + " set body=null, expire=now() + 120000"	   + " WHERE id=?");        _removeStmt = _conn.prepareStatement(sql);        sql = ("delete from " + _messageTable	   + " WHERE id=?");        _deleteStmt = _conn.prepareStatement(sql);  }  private static String escapeName(String name)  {    StringBuilder sb = new StringBuilder();    for (int i = 0; i < name.length(); i++) {      char ch = name.charAt(i);      if ('a' <= ch && ch <= 'z'	  || 'A' <= ch && ch <= 'Z'	  || '0' <= ch && ch <= '0'	  || ch == '_') {	sb.append(ch);      }      else	sb.append('_');    }    return sb.toString();  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?