📄 ntransaction.java
字号:
ctx.bos.reset(); ctx.bos.write(OOS_STREAM_HEADER, 0, 4); } ctx.oos.writeObject(obj); ctx.oos.flush(); saveInLog(ctx.bos.toByteArray(), dirName, name, ctx.log, false); } /** * Save an object state already serialized. The byte array keeped in log is * a copy, so the original one may be modified. */ public final void saveByteArray(byte[] buf, String name) throws IOException { saveByteArray(buf, null, name); } /** * Save an object state already serialized. The byte array keeped in log is * a copy, so the original one may be modified. */ public final void saveByteArray(byte[] buf, String dirName, String name) throws IOException { Context ctx = (Context) perThreadContext.get(); saveInLog(buf, dirName, name, ((Context) perThreadContext.get()).log, true); } private final void saveInLog(byte[] buf, String dirName, String name, Hashtable log, boolean copy) throws IOException { Object key = OperationKey.newKey(dirName, name); Operation op = Operation.alloc(Operation.SAVE, dirName, name, buf); Operation old = (Operation) log.put(key, op); if (copy) { if ((old != null) && (old.type == Operation.SAVE) && (old.value.length == buf.length)) { // reuse old buffer op.value = old.value; } else { // alloc a new one op.value = new byte[buf.length]; } System.arraycopy(buf, 0, op.value, 0, buf.length); } if (old != null) old.free(); } private final byte[] getFromLog(Hashtable log, Object key) throws IOException { // Searchs in the log a new value for the object. Operation op = (Operation) log.get(key); if (op != null) { if (op.type == Operation.SAVE) { return op.value; } else if (op.type == Operation.DELETE) { // The object was deleted. throw new FileNotFoundException(); } } return null; } private final byte[] getFromLog(String dirName, String name) throws IOException { // First searchs in the logs a new value for the object. Object key = OperationKey.newKey(dirName, name); byte[] buf = getFromLog(((Context) perThreadContext.get()).log, key); if (buf != null) return buf; if ((buf = getFromLog(logFile.log, key)) != null) { return buf; } return null; } public final Object load(String name) throws IOException, ClassNotFoundException { return load(null, name); } public Object load(String dirName, String name) throws IOException, ClassNotFoundException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, load(" + dirName + ", " + name + ")"); // First searchs in the logs a new value for the object. try { byte[] buf = getFromLog(dirName, name); if (buf != null) { ByteArrayInputStream bis = new ByteArrayInputStream(buf); ObjectInputStream ois = new ObjectInputStream(bis); return ois.readObject(); } // Gets it from disk. return repository.loadobj(dirName, name); } catch (FileNotFoundException exc) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, load(" + dirName + ", " + name + ") NOT FOUND"); return null; } } public final byte[] loadByteArray(String name) throws IOException { return loadByteArray(null, name); } public byte[] loadByteArray(String dirName, String name) throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, loadByteArray(" + dirName + ", " + name + ")"); // First searchs in the logs a new value for the object. try { byte[] buf = getFromLog(dirName, name); if (buf != null) return buf; // Gets it from disk. return repository.load(dirName, name); } catch (FileNotFoundException exc) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, loadByteArray(" + dirName + ", " + name + ") NOT FOUND"); return null; } } public final void delete(String name) { delete(null, name); } public final void delete(String dirName, String name) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, delete(" + dirName + ", " + name + ")"); Object key = OperationKey.newKey(dirName, name); Hashtable log = ((Context) perThreadContext.get()).log; Operation op = Operation.alloc(Operation.DELETE, dirName, name); op = (Operation) log.put(key, op); if (op != null) op.free(); } public final synchronized void commit() throws IOException { if (phase != RUN) throw new IllegalStateException("Can not commit."); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, commit"); Hashtable log = ((Context) perThreadContext.get()).log; if (! log.isEmpty()) { logFile.commit(log); log.clear(); } if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, committed"); setPhase(COMMIT); } public final synchronized void rollback() throws IOException { if (phase != RUN) throw new IllegalStateException("Can not rollback."); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction, rollback"); setPhase(ROLLBACK); ((Context) perThreadContext.get()).log.clear(); } public final synchronized void release() throws IOException { if ((phase != RUN) && (phase != COMMIT) && (phase != ROLLBACK)) throw new IllegalStateException("Can not release transaction."); // Change the transaction state. setPhase(FREE); // wake-up an eventually user's thread in begin notify(); } /** * Stops the transaction module. * It waits all transactions termination, then the module is kept * in a FREE 'ready to use' state. * The log file is garbaged, all operations are reported to disk. */ public synchronized void stop() { if (logmon.isLoggable(BasicLevel.INFO)) logmon.log(BasicLevel.INFO, "NTransaction, stops"); while (phase != FREE) { // Wait for the transaction subsystem to be free try { wait(); } catch (InterruptedException exc) { } } setPhase(FINALIZE); try { logFile.garbage(); } catch (IOException exc) { logmon.log(BasicLevel.WARN, "NTransaction, can't garbage logfile", exc); } setPhase(FREE); if (logmon.isLoggable(BasicLevel.INFO)) { logmon.log(BasicLevel.INFO, "NTransaction, stopped: " + "garbage=" + logFile.garbageCount + ", " + "commit=" + logFile.commitCount + ", " + "ratio=" + getGarbageRatio()); } } /** * Close the transaction module. * It waits all transactions termination, the module will be initialized * anew before reusing it. * The log file is garbaged then closed. */ public synchronized void close() { if (logmon.isLoggable(BasicLevel.INFO)) logmon.log(BasicLevel.INFO, "NTransaction, stops"); if (phase == INIT) return; while (phase != FREE) { // Wait for the transaction subsystem to be free try { wait(); } catch (InterruptedException exc) { } } setPhase(FINALIZE); logFile.stop(); setPhase(INIT); if (logmon.isLoggable(BasicLevel.INFO)) { logmon.log(BasicLevel.INFO, "NTransaction, closed: " + "garbage=" + logFile.garbageCount + ", " + "commit=" + logFile.commitCount + ", " + "ratio=" + getGarbageRatio()); } } /** * */ static final class LogFile extends ByteArrayOutputStream { /** * Log of all operations already commited but not reported on disk. */ Hashtable log = null; /** log file */ RandomAccessFile logFile = null; int current = -1; /** * Number of commit operation since starting up. */ int commitCount = 0; /** * Number of garbage operation since starting up. */ int garbageCount = 0; /** * Cumulated time of garbage operations since starting up. */ long garbageTime = 0l; /** Root directory of transaction storage */ private File dir = null; /** Coherency lock filename */ static private final String LockPathname = "lock"; /** Coherency lock file */ private File lockFile = null; private Repository repository = null; LogFile(File dir, Repository repository) throws IOException { super(4 * Kb); this.dir = dir; this.repository = repository; boolean nolock = Boolean.getBoolean("NTNoLockFile"); if (! nolock) { lockFile = new File(dir, LockPathname); if (! lockFile.createNewFile()) { logmon.log(BasicLevel.FATAL, "NTransaction.init(): " + "Either the server is already running, " + "either you have to remove lock file: " + lockFile.getAbsolutePath()); throw new IOException("Transaction already running."); } lockFile.deleteOnExit(); } // Search for old log file, then apply all committed operation, // finally cleans it. log = new Hashtable(LogMemoryCapacity); File logFilePN = new File(dir, "log"); if ((logFilePN.exists()) && (logFilePN.length() > 0)) { logFile = new RandomAccessFile(logFilePN, "r"); try { int optype = logFile.read(); while (optype == Operation.COMMIT) { String dirName; String name; optype = logFile.read(); while ((optype == Operation.SAVE) || (optype == Operation.DELETE)) { // Gets all operations of one committed transaction then // adds them to specified log. dirName = logFile.readUTF(); if (dirName.length() == 0) dirName = null; name = logFile.readUTF(); Object key = OperationKey.newKey(dirName, name); if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction.init(), OPERATION=" + optype + ", " + name); Operation op = null; if (optype == Operation.SAVE) { byte buf[] = new byte[logFile.readInt()]; logFile.readFully(buf); op = Operation.alloc(optype, dirName, name, buf); op = (Operation) log.put(key, op); } else { op = Operation.alloc(optype, dirName, name); op = (Operation) log.put(key, op); } if (op != null) op.free(); optype = logFile.read(); } if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction.init(), COMMIT=" + optype); }; if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction.init(), END=" + optype + ", " + logFile.getFilePointer()); if (optype != Operation.END) System.exit(-1); } catch (IOException exc) { throw exc; } finally { logFile.close(); } logFile = new RandomAccessFile(logFilePN, "rwd"); garbage(); } else { logFile = new RandomAccessFile(logFilePN, "rwd"); logFile.setLength(LogFileSize); current = 1; // Cleans log file logFile.seek(0); logFile.write(Operation.END); } } static private final byte[] emptyUTFString = {0, 0}; void writeUTF(String str) { int strlen = str.length() ; int newcount = count + strlen +2; if (newcount > buf.length) { byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; System.arraycopy(buf, 0, newbuf, 0, count); buf = newbuf; } buf[count++] = (byte) ((strlen >>> 8) & 0xFF); buf[count++] = (byte) ((strlen >>> 0) & 0xFF); str.getBytes(0, strlen, buf, count); count = newcount; } void writeInt(int v) throws IOException { int newcount = count +4; if (newcount > buf.length) { byte newbuf[] = new byte[buf.length << 1]; System.arraycopy(buf, 0, newbuf, 0, count); buf = newbuf; } buf[count++] = (byte) ((v >>> 24) & 0xFF); buf[count++] = (byte) ((v >>> 16) & 0xFF); buf[count++] = (byte) ((v >>> 8) & 0xFF); buf[count++] = (byte) ((v >>> 0) & 0xFF); } int logMemorySize = 0; /** * Reports all buffered operations in logs. */ void commit(Hashtable ctxlog) throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, "NTransaction.LogFile.commit()"); commitCount += 1; Operation op = null; for (Enumeration e = ctxlog.elements(); e.hasMoreElements(); ) { op = (Operation) e.nextElement(); // Save the log to disk write(op.type); if (op.dirName != null) { writeUTF(op.dirName); } else { write(emptyUTFString); } writeUTF(op.name); if (op.type == Operation.SAVE) { logMemorySize += op.value.length; writeInt(op.value.length); write(op.value); } // Reports all committed operation in clog op = (Operation) log.put(OperationKey.newKey(op.dirName, op.name), op); if (op != null) { if (op.type == Operation.SAVE) logMemorySize -= op.value.length; op.free(); } } write(Operation.END); // AF: Is it needed? Normally the file pointer should already set at // the right position... To be verified, but not really a big cost. logFile.seek(current); logFile.write(buf, 0, count); // AF: May be we can avoid this second synchronous write, using a // marker: determination d'un marqueur li
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -