⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ntransaction.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      ctx.bos.reset();      ctx.bos.write(OOS_STREAM_HEADER, 0, 4);    }    ctx.oos.writeObject(obj);    ctx.oos.flush();    saveInLog(ctx.bos.toByteArray(), dirName, name, ctx.log, false);  }  /**   *  Save an object state already serialized. The byte array keeped in log is   * a copy, so the original one may be modified.   */  public final void saveByteArray(byte[] buf, String name) throws IOException {    saveByteArray(buf, null, name);  }  /**   *  Save an object state already serialized. The byte array keeped in log is   * a copy, so the original one may be modified.   */  public final void saveByteArray(byte[] buf,                                  String dirName, String name) throws IOException {    Context ctx = (Context) perThreadContext.get();    saveInLog(buf,              dirName, name,              ((Context) perThreadContext.get()).log, true);  }  private final void saveInLog(byte[] buf,                               String dirName, String name,                               Hashtable log,                               boolean copy) throws IOException {    Object key = OperationKey.newKey(dirName, name);    Operation op = Operation.alloc(Operation.SAVE, dirName, name, buf);    Operation old = (Operation) log.put(key, op);    if (copy) {      if ((old != null) &&          (old.type == Operation.SAVE) &&          (old.value.length == buf.length)) {        // reuse old buffer        op.value = old.value;      } else {        // alloc a new one        op.value = new byte[buf.length];      }      System.arraycopy(buf, 0, op.value, 0, buf.length);    }    if (old != null) old.free();  }  private final byte[] getFromLog(Hashtable log, Object key) throws IOException {    // Searchs in the log a new value for the object.    Operation op = (Operation) log.get(key);    if (op != null) {      if (op.type == Operation.SAVE) {	return op.value;      } else if (op.type == Operation.DELETE) {	// The object was deleted.	throw new FileNotFoundException();      }    }    return null;  }  private final byte[] getFromLog(String dirName, String name) throws IOException {    // First searchs in the logs a new value for the object.    Object key = OperationKey.newKey(dirName, name);    byte[] buf = getFromLog(((Context) perThreadContext.get()).log, key);    if (buf != null) return buf;        if ((buf = getFromLog(logFile.log, key)) != null) {      return buf;    }    return null;    }  public final Object load(String name) throws IOException, ClassNotFoundException {    return load(null, name);  }  public Object load(String dirName, String name) throws IOException, ClassNotFoundException {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "NTransaction, load(" + dirName + ", " + name + ")");    // First searchs in the logs a new value for the object.    try {      byte[] buf = getFromLog(dirName, name);      if (buf != null) {      	ByteArrayInputStream bis = new ByteArrayInputStream(buf);	ObjectInputStream ois = new ObjectInputStream(bis);	  	return ois.readObject();      }            // Gets it from disk.            return repository.loadobj(dirName, name);    } catch (FileNotFoundException exc) {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, "NTransaction, load(" + dirName + ", " + name + ") NOT FOUND");      return null;    }  }  public final byte[] loadByteArray(String name) throws IOException {    return loadByteArray(null, name);  }  public byte[] loadByteArray(String dirName, String name) throws IOException {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "NTransaction, loadByteArray(" + dirName + ", " + name + ")");    // First searchs in the logs a new value for the object.    try {      byte[] buf = getFromLog(dirName, name);      if (buf != null) return buf;      // Gets it from disk.            return repository.load(dirName, name);    } catch (FileNotFoundException exc) {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, "NTransaction, loadByteArray(" + dirName + ", " + name + ") NOT FOUND");      return null;    }  }  public final void delete(String name) {    delete(null, name);  }    public final void delete(String dirName, String name) {    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG,                 "NTransaction, delete(" + dirName + ", " + name + ")");    Object key = OperationKey.newKey(dirName, name);    Hashtable log = ((Context) perThreadContext.get()).log;    Operation op = Operation.alloc(Operation.DELETE, dirName, name);    op = (Operation) log.put(key, op);    if (op != null) op.free();  }  public final synchronized void commit() throws IOException {    if (phase != RUN)      throw new IllegalStateException("Can not commit.");    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "NTransaction, commit");        Hashtable log = ((Context) perThreadContext.get()).log;    if (! log.isEmpty()) {      logFile.commit(log);      log.clear();    }    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "NTransaction, committed");    setPhase(COMMIT);  }  public final synchronized void rollback() throws IOException {    if (phase != RUN)      throw new IllegalStateException("Can not rollback.");    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG, "NTransaction, rollback");    setPhase(ROLLBACK);    ((Context) perThreadContext.get()).log.clear();  }  public final synchronized void release() throws IOException {    if ((phase != RUN) && (phase != COMMIT) && (phase != ROLLBACK))      throw new IllegalStateException("Can not release transaction.");    // Change the transaction state.    setPhase(FREE);    // wake-up an eventually user's thread in begin    notify();  }  /**   * Stops the transaction module.   * It waits all transactions termination, then the module is kept   * in a FREE 'ready to use' state.   * The log file is garbaged, all operations are reported to disk.   */  public synchronized void stop() {    if (logmon.isLoggable(BasicLevel.INFO))      logmon.log(BasicLevel.INFO, "NTransaction, stops");    while (phase != FREE) {      // Wait for the transaction subsystem to be free      try {        wait();      } catch (InterruptedException exc) {      }    }    setPhase(FINALIZE);    try {      logFile.garbage();    } catch (IOException exc) {      logmon.log(BasicLevel.WARN, "NTransaction, can't garbage logfile", exc);    }    setPhase(FREE);    if (logmon.isLoggable(BasicLevel.INFO)) {      logmon.log(BasicLevel.INFO,                 "NTransaction, stopped: " +                 "garbage=" + logFile.garbageCount + ", " +                 "commit=" + logFile.commitCount + ", " +                 "ratio=" + getGarbageRatio());    }  }  /**   * Close the transaction module.   * It waits all transactions termination, the module will be initialized   * anew before reusing it.   * The log file is garbaged then closed.   */  public synchronized void close() {    if (logmon.isLoggable(BasicLevel.INFO))      logmon.log(BasicLevel.INFO, "NTransaction, stops");    if (phase == INIT) return;    while (phase != FREE) {      // Wait for the transaction subsystem to be free      try {        wait();      } catch (InterruptedException exc) {      }    }    setPhase(FINALIZE);    logFile.stop();    setPhase(INIT);    if (logmon.isLoggable(BasicLevel.INFO)) {      logmon.log(BasicLevel.INFO,                 "NTransaction, closed: " +                 "garbage=" + logFile.garbageCount + ", " +                 "commit=" + logFile.commitCount + ", " +                 "ratio=" + getGarbageRatio());    }  }  /**   *   */  static final class LogFile extends ByteArrayOutputStream {    /**     * Log of all operations already commited but not reported on disk.     */    Hashtable log = null;    /** log file */    RandomAccessFile logFile = null;     int current = -1;    /**     * Number of commit operation since starting up.     */    int commitCount = 0;    /**     * Number of garbage operation since starting up.     */    int garbageCount = 0;    /**     * Cumulated time of garbage operations since starting up.     */    long garbageTime = 0l;    /** Root directory of transaction storage */    private File dir = null;    /** Coherency lock filename */    static private final String LockPathname = "lock";    /** Coherency lock file */    private File lockFile = null;    private Repository repository = null;    LogFile(File dir, Repository repository) throws IOException {      super(4 * Kb);      this.dir = dir;      this.repository = repository;      boolean nolock = Boolean.getBoolean("NTNoLockFile");      if (! nolock) {        lockFile = new File(dir, LockPathname);        if (! lockFile.createNewFile()) {          logmon.log(BasicLevel.FATAL,                     "NTransaction.init(): " +                     "Either the server is already running, " +                      "either you have to remove lock file: " +                     lockFile.getAbsolutePath());          throw new IOException("Transaction already running.");        }        lockFile.deleteOnExit();      }      //  Search for old log file, then apply all committed operation,      // finally cleans it.      log = new Hashtable(LogMemoryCapacity);            File logFilePN = new File(dir, "log");      if ((logFilePN.exists()) && (logFilePN.length() > 0)) {        logFile = new RandomAccessFile(logFilePN, "r");        try {          int optype = logFile.read();          while (optype == Operation.COMMIT) {            String dirName;            String name;            optype = logFile.read();             while ((optype == Operation.SAVE) ||                   (optype == Operation.DELETE)) {              //  Gets all operations of one committed transaction then              // adds them to specified log.              dirName = logFile.readUTF();              if (dirName.length() == 0) dirName = null;              name = logFile.readUTF();              Object key = OperationKey.newKey(dirName, name);              if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))                logmon.log(BasicLevel.DEBUG,                           "NTransaction.init(), OPERATION=" +                           optype + ", " + name);              Operation op = null;              if (optype == Operation.SAVE) {                byte buf[] = new byte[logFile.readInt()];                logFile.readFully(buf);                op = Operation.alloc(optype, dirName, name, buf);                op = (Operation) log.put(key, op);              } else {                op = Operation.alloc(optype, dirName, name);                op = (Operation) log.put(key, op);              }              if (op != null) op.free();              optype = logFile.read();            }            if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))              logmon.log(BasicLevel.DEBUG,                         "NTransaction.init(), COMMIT=" + optype);          };          if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))            logmon.log(BasicLevel.DEBUG,                       "NTransaction.init(), END=" + optype + ", " +                       logFile.getFilePointer());          if (optype != Operation.END) System.exit(-1);        } catch (IOException exc) {          throw exc;        } finally {          logFile.close();        }        logFile = new RandomAccessFile(logFilePN, "rwd");        garbage();      } else {        logFile = new RandomAccessFile(logFilePN, "rwd");        logFile.setLength(LogFileSize);        current = 1;        // Cleans log file        logFile.seek(0);        logFile.write(Operation.END);      }    }    static private final byte[] emptyUTFString = {0, 0};    void writeUTF(String str)  {      int strlen = str.length() ;      int newcount = count + strlen +2;      if (newcount > buf.length) {        byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];        System.arraycopy(buf, 0, newbuf, 0, count);        buf = newbuf;      }      buf[count++] = (byte) ((strlen >>> 8) & 0xFF);      buf[count++] = (byte) ((strlen >>> 0) & 0xFF);      str.getBytes(0, strlen, buf, count);      count = newcount;    }    void writeInt(int v) throws IOException {      int newcount = count +4;      if (newcount > buf.length) {        byte newbuf[] = new byte[buf.length << 1];        System.arraycopy(buf, 0, newbuf, 0, count);        buf = newbuf;      }      buf[count++] = (byte) ((v >>> 24) & 0xFF);      buf[count++] = (byte) ((v >>> 16) & 0xFF);      buf[count++] = (byte) ((v >>>  8) & 0xFF);      buf[count++] = (byte) ((v >>>  0) & 0xFF);    }    int logMemorySize = 0;    /**     * Reports all buffered operations in logs.     */    void commit(Hashtable ctxlog) throws IOException {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, "NTransaction.LogFile.commit()");      commitCount += 1;            Operation op = null;      for (Enumeration e = ctxlog.elements(); e.hasMoreElements(); ) {        op = (Operation) e.nextElement();        // Save the log to disk        write(op.type);        if (op.dirName != null) {          writeUTF(op.dirName);        } else {          write(emptyUTFString);        }        writeUTF(op.name);        if (op.type == Operation.SAVE) {          logMemorySize += op.value.length;          writeInt(op.value.length);          write(op.value);        }        // Reports all committed operation in clog        op = (Operation) log.put(OperationKey.newKey(op.dirName, op.name), op);        if (op != null) {          if (op.type == Operation.SAVE)            logMemorySize -= op.value.length;          op.free();        }      }      write(Operation.END);      // AF: Is it needed? Normally the file pointer should already set at      // the right position... To be verified, but not really a big cost.      logFile.seek(current);      logFile.write(buf, 0, count);      // AF: May be we can avoid this second synchronous write, using a      // marker: determination d'un marqueur li

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -