⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbtransaction.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (C) 2006 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s):  */package fr.dyade.aaa.util;import java.io.*;import java.util.*;import java.sql.Connection;import java.sql.DriverManager;import java.sql.Statement;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.util.monolog.api.Logger;import fr.dyade.aaa.agent.Debug;public final class DBTransaction implements Transaction, DBTransactionMBean {  // Logging monitor  private static Logger logmon = null;  File dir = null;  /** Log context associated with each Thread using DBTransaction. */  private class Context {    Hashtable log = null;    ByteArrayOutputStream bos = null;    ObjectOutputStream oos = null;    Context() {      log = new Hashtable(15);      bos = new ByteArrayOutputStream(256);    }  }  /**   *  ThreadLocal variable used to get the log to associate state with each   * thread. The log contains all operations do by the current thread since   * the last <code>commit</code>. On commit, its content is added to current   * log (memory + disk), then it is freed.   */  private ThreadLocal perThreadContext = null;  /**   *  Number of pooled operation, by default 1000.   *  This value can be adjusted for a particular server by setting   * <code>DBLogThresholdOperation</code> specific property.   * <p>   *  These property can be fixed either from <code>java</code> launching   * command, or in <code>a3servers.xml</code> configuration file.   */  static int LogThresholdOperation = 1000;  /**   * Returns the pool size for <code>operation</code> objects, by default 1000.   *   * @return The pool size for <code>operation</code> objects.   */  public int getLogThresholdOperation() {    return LogThresholdOperation;  }  long startTime = 0L;  /**   * Returns the starting time.   *   * @return The starting time.   */  public long getStartTime() {    return startTime;  }  String driver = "org.apache.derby.jdbc.EmbeddedDriver";  String connurl = "jdbc:derby:";  Connection conn = null;  PreparedStatement insertStmt = null;  PreparedStatement updateStmt = null;  PreparedStatement deleteStmt = null;  public DBTransaction() {}  public void init(String path) throws IOException {    phase = INIT;    logmon = Debug.getLogger(Debug.A3Debug + ".Transaction");    if (logmon.isLoggable(BasicLevel.INFO))      logmon.log(BasicLevel.INFO, "DBTransaction, init()");    dir = new File(path);    if (!dir.exists()) dir.mkdir();    if (!dir.isDirectory())      throw new FileNotFoundException(path + " is not a directory.");    // Saves the transaction classname in order to prevent use of a    // different one after restart (see AgentServer.init).    DataOutputStream ldos = null;    try {      File tfc = new File(dir, "TFC");      if (! tfc.exists()) {        ldos = new DataOutputStream(new FileOutputStream(tfc));        ldos.writeUTF(getClass().getName());        ldos.flush();      }    } finally {      if (ldos != null) ldos.close();    }    try {      Class.forName(driver).newInstance();      Properties props = new Properties();      props.put("user", "user1");      props.put("password", "user1");      conn = DriverManager.getConnection(connurl + new File(dir, "JoramDB").getPath() + ";create=true", props);      conn.setAutoCommit(false);//         break;    } catch (IllegalAccessException exc) {        throw new IOException(exc.getMessage());      } catch (ClassNotFoundException exc) {        throw new IOException(exc.getMessage());      } catch (InstantiationException exc) {        throw new IOException(exc.getMessage());      } catch (SQLException sqle) {        throw new IOException(sqle.getMessage());    }    try {        // Creating a statement lets us issue commands against the connection.        Statement s = conn.createStatement();        // We create the table.        s.execute("CREATE TABLE JoramDB (name VARCHAR(256), content LONG VARCHAR FOR BIT DATA, PRIMARY KEY(name))");        s.close();        conn.commit();    } catch (SQLException sqle) {      if (logmon.isLoggable(BasicLevel.INFO))        logmon.log(BasicLevel.INFO, "DBTransaction, init() DB already exists");    }    try {      insertStmt = conn.prepareStatement("INSERT INTO JoramDB VALUES (?, ?)");      updateStmt = conn.prepareStatement("UPDATE JoramDB SET content=? WHERE name=?");      deleteStmt = conn.prepareStatement("DELETE FROM JoramDB WHERE name=?");    } catch (SQLException sqle) {      sqle.printStackTrace();      throw new IOException(sqle.getMessage());    }    perThreadContext = new ThreadLocal() {        protected synchronized Object initialValue() {          return new Context();        }      };    startTime = System.currentTimeMillis();    if (logmon.isLoggable(BasicLevel.INFO))      logmon.log(BasicLevel.INFO, "DBTransaction, initialized " + startTime);    /* The Transaction subsystem is ready */    setPhase(FREE);  }  public final File getDir() {    return dir;  }  /**   * Returns the path of persistence directory.   *   * @return The path of persistence directory.   */  public String getPersistenceDir() {    return dir.getPath();  }  // State of the transaction monitor.  private int phase = INIT;  String phaseInfo = PhaseInfo[phase];  /**   *   */  public int getPhase() {    return phase;  }  public String getPhaseInfo() {    return phaseInfo;  }  private final void setPhase(int newPhase) {    phase = newPhase;    phaseInfo = PhaseInfo[phase];  }  public final synchronized void begin() throws IOException {    while (phase != FREE) {      try {	wait();      } catch (InterruptedException exc) {      }    }    // Change the transaction state.    setPhase(RUN);  }  /**   *  Returns an array of strings naming the persistent objects denoted by   * a name that satisfy the specified prefix. Each string is an object name.   *    * @param prefix	the prefix   * @return		An array of strings naming the persistent objects   *		 denoted by a name that satisfy the specified prefix. The   *		 array will be empty if no names match.   */  public final synchronized String[] getList(String prefix) {    try {      // Creating a statement lets us issue commands against the connection.      Statement s = conn.createStatement();      ResultSet rs = s.executeQuery("SELECT name FROM JoramDB WHERE name LIKE '" + prefix + "%'");      Vector v = new Vector();      while (rs.next()) {        v.add(rs.getString(1));      }      rs.close();      s.close();      String[] result = new String[v.size()];      result = (String[]) v.toArray(result);      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, "DBTransaction, getList: " + v);      return result;    } catch (SQLException sqle) {      // TODO: AF    }    return null;  }  /**   * Tests if the Transaction component is persistent.   *   * @return true.   */  public boolean isPersistent() {    return true;  }  final String fname(String dirName, String name) {    if (dirName == null) {      return name;    } else {      return new StringBuffer(dirName).append('/').append(name).toString();    }  }  static private final byte[] OOS_STREAM_HEADER = {    (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF),    (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF),    (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF),    (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF)  };  public void save(Serializable obj,                   String dirName, String name) throws IOException {    save(obj, fname(dirName, name));  }  public void save(Serializable obj, String name) throws IOException{    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "DBTransaction, save(" + name + ")");    Context ctx = (Context) perThreadContext.get();    if (ctx.oos == null) {      ctx.bos.reset();      ctx.oos = new ObjectOutputStream(ctx.bos);    } else {      ctx.oos.reset();      ctx.bos.reset();      ctx.bos.write(OOS_STREAM_HEADER, 0, 4);    }    ctx.oos.writeObject(obj);    ctx.oos.flush();    saveInLog(ctx.bos.toByteArray(), name, ctx.log, false);  }  public void saveByteArray(byte[] buf,                            String dirName, String name) throws IOException {    save(buf, fname(dirName, name));  }  public void saveByteArray(byte[] buf, String name) throws IOException{    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "DBTransaction, saveByteArray(" + name + ")");    Context ctx = (Context) perThreadContext.get();    saveInLog(buf, name, ((Context) perThreadContext.get()).log, true);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -