📄 clientcachedatabase.java
字号:
nameTable.addForKey( id, name );
CacheObjectContainer container = fetch0( id, Lock.LEVEL_READ );
return container.ozoneProxy();
} else {
return null;
}
}
}
public Object invoke( OzoneProxy proxy, String methodName, String sig, Object[] args, int lockLevel )
throws Exception {
throw new RuntimeException( "invoke(): Method not implemented." );
}
public Object invoke( OzoneProxy proxy, int methodIndex, Object[] args, int lockLevel ) throws Exception {
throw new RuntimeException( "invoke(): Method not implemented." );
}
// fetch/sync cache ***********************************
public OzoneCompatible fetch(OzoneProxy proxy,int lockLevel) throws Exception,org.ozoneDB.ObjectNotFoundException, java.io.IOException, java.lang.ClassNotFoundException, org.ozoneDB.TransactionException, org.ozoneDB.core.TransactionError {
if (debug) {
System.out.println( "[debug] fetch(): id:" + proxy.remoteID() );
}
CacheObjectContainer container = fetch0(proxy.remoteID(),lockLevel);
OzoneCompatible target = container.target();
if (target == null) {
if (debug) {
System.out.println( "[debug] fetch(): id:" + proxy.remoteID() );
}
throw new ObjectNotFoundException( "Target is null." );
}
return target;
}
/**
* Fetch the object with the specified ObjectID from the server.
*
*
* @param id The id that specifies the object to fetch.
* @param lockLevel the type of lock requested (Lock.READ, Lock.WRITE etc.)
* @return The transaction for the current thread.
*/
protected CacheObjectContainer fetch0( ObjectID id, int lockLevel ) throws Exception,ObjectNotFoundException,TransactionException {
AbstractTransaction tx = delegate.txForThread( Thread.currentThread() );
if (tx == null) {
throw new TransactionException( "Thread has not yet joined a transaction.", TransactionException.STATE );
}
CacheObjectContainer container = (CacheObjectContainer)idTable.elementForKey( id );
if (container == null) {
container = fetchChunk(id,100000);
}
if (container == null) {
throw new ObjectNotFoundException( "No object for the given ID." );
}
// we are going to change the proxy so we have to synchronize it;
// afterwards the value of container.tx, which in fact is a lock,
// prevents other threads from changing the container
synchronized (container) {
// check if the target is member of our tx; because there is no
// detection on the client, we may end up in a real, hard deadlock!
while (container.tx != null && container.tx != tx) {
try {
// we have to call wait on the container to let the JVM
// release the lock that the synchronized statement put on it
container.wait();
} catch (InterruptedException e) {
}
}
// make this proxy/object a member of the transaction; in fact,
// this puts a lock on it
if (container.tx == null) {
container.tx = tx;
}
if (lockLevel == Lock.LEVEL_READ) {
container.raiseState( CacheObjectContainer.STATE_READ );
} else {
container.raiseState( CacheObjectContainer.STATE_MODIFIED );
container.setDirty( true );
}
}
return container;
}
protected synchronized CacheObjectContainer fetchChunk( ObjectID rootID, int size ) throws Exception {
ensureSpace(size);
DxArrayBag chunk = (DxArrayBag) delegate.sendCommand(new DbCacheChunk(rootID,size),true);
for (int i = 0; i < chunk.count(); i++) {
CacheObjectContainer container = (CacheObjectContainer)chunk.elementAtIndex( i );
if (debug) {
System.out.println( "[debug] fetchChunk(): container:" + container.id() );
}
// set the container link of this client side copy of the target
container.setTarget( container.target() );
container.setDatabase( this );
if (idTable.containsKey( container.id() )) {
System.out.print( "[debug] fetchChunk(): container already registered... " );
// use the newly fetched container instead of the old one if
// the old isn't currently locked
CacheObjectContainer c = (CacheObjectContainer)idTable.elementForKey( container.id() );
if (c.tx != null) {
System.out.println( "and locked - using old one." );
} else {
System.out.println( "not locked - using new one." );
idTable.removeForKey( container.id() );
idTable.addForKey( container, container.id() );
}
} else {
idTable.addForKey( container, container.id() );
}
}
return (CacheObjectContainer)idTable.elementForKey( rootID );
}
/**
* Synchronize all changed objects with the server. This does not commits
* the transaction but only transfers all changed objects back to the
* server.
*/
protected synchronized void syncCache() throws Exception {
if (debug) {
System.out.println( "[debug] syncCache()" );
}
// although we send a byte[] the proxy links are handled correctly -
// see DbCacheChunk for details
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream( bout );
int count = 0;
DxIterator it = idTable.iterator();
while (it.next() != null) {
CacheObjectContainer container = (CacheObjectContainer)it.object();
if (debug) {
System.out.println( "[debug] id:" + container.id() + ", state:" + container.state() + ", dirty:"
+ container.dirty() );
}
if (container.dirty()) {
out.writeObject( container );
// if the sendCommand() fails, which would make the container
// dirty again, then the entire transaction is aborted the
// container is thrown away
container.setDirty( false );
count++;
}
if (bout.size() > 500000) {
if (debug) {
System.out.println( "[debug] syncCache(): writing " + count + " objects" );
}
out.close();
delegate.sendCommand( new DbCacheChunk( bout.toByteArray() ), true );
bout = new ByteArrayOutputStream();
out = new ObjectOutputStream( bout );
count = 0;
}
}
if (debug) {
System.out.println( "[debug] syncCache(): writing " + count + " objects" );
}
out.close();
delegate.sendCommand( new DbCacheChunk( bout.toByteArray() ), true );
}
/**
* Throws away all modified containers. Clears idTable and nameTable.
*/
protected synchronized void abortCache() {
try {
DxIterator it = idTable.iterator();
while (it.next() != null) {
CacheObjectContainer container = (CacheObjectContainer)it.object();
if (container.state() >= ObjectContainer.STATE_MODIFIED) {
idTable.removeForKey( container.id() );
if (container.name() != null) {
nameTable.removeForKey( container.name() );
}
}
}
} catch (Exception e) {
// this should never happen; all we can do here is to signal a
// strong error
throw new RuntimeException( e.toString() );
}
}
/**
* Update the modification times of all currently modified containers. This
* method should be called after prepareTX only.
*/
protected synchronized void updateModTimes() {
try {
if (debug) {
System.out.println( "[debug] updateModTimes()" );
}
DbModTimes command = new DbModTimes();
int count = 0;
DxIterator it = idTable.iterator();
while (it.next() != null) {
CacheObjectContainer container = (CacheObjectContainer)it.object();
if (container.state() == ObjectContainer.STATE_MODIFIED || container.state()
== ObjectContainer.STATE_CREATED) {
if (debug) {
System.out.println( "[debug] send: id:" + container.id() );
}
command.addObjectID( container.id() );
count++;
}
}
// avoid sending the command if nothing has changed anyway
if (count > 0) {
DxMap map = (DxMap)delegate.sendCommand( command, true );
it = map.iterator();
Long modTime = null;
while ((modTime = (Long)it.next()) != null) {
ObjectID id = (ObjectID)it.key();
if (debug) {
System.out.println( "[debug] receive: id:" + id + ", modTime:" + modTime );
}
CacheObjectContainer container = (CacheObjectContainer)idTable.elementForKey( id );
container.setModTime( modTime.longValue() );
}
}
} catch (Exception e) {
// this should never happen; all we can do here is to signal a
// strong error
throw new RuntimeException( e.toString() );
}
}
/**
* Try to free the specified amount of cache space. This will remove
* containers from the cache that are currently not used by a transaction
*/
protected synchronized void ensureSpace( long neededSpace ) {
if (freeMemory() < neededSpace) {
// build priority queue for all currently cached objects
DxMap priorityQueue = new DxTreeMap();
DxIterator it = idTable.iterator();
while (it.next() != null) {
CacheObjectContainer container = (CacheObjectContainer)it.object();
priorityQueue.addForKey( container, new Long( container.lastTouched() ) );
}
// free 100 objects at once until there is enough free memory or
// no unlocked container left, lowest priority first
it = priorityQueue.iterator();
CacheObjectContainer container = (CacheObjectContainer)it.next();
while (freeMemory() < neededSpace && container != null) {
for (int i = 0; i < 100 && container != null; i++) {
if (container.tx == null) {
idTable.removeForKey( container.id() );
container = (CacheObjectContainer)it.next();
} else {
System.out.println( "[debug] ensureSpace(): trying to free locked container." );
}
}
System.gc();
}
}
}
// free memory ****************************************
/**
* Initialize the internal memory counter so that freeMemory() returns
* correct results.
*/
protected void calcMemory() {
Runtime rt = Runtime.getRuntime();
try {
DxBag bag = new DxArrayBag();
for (;;) {
bag.add( new byte[100000] );
}
} catch (OutOfMemoryError e) {
totalMemory = rt.totalMemory();
rt.gc();
}
}
/**
* Return the amount of *total* free memory in the system. The results
* returned by Runtime.freeMemory() may change overtime and so its
* useless for ozone.<p>
*
*
* Note: this will not work properly if some kind of weal references are
* used in this VM. In case of empty space we need to force teh GC to also
* free weak references.
*/
public long freeMemory() {
Runtime rt = Runtime.getRuntime();
long hiddenMemory = totalMemory - rt.totalMemory();
// keep 2MB free at least
return Math.max( rt.freeMemory() + hiddenMemory - 2000000L, 0 );
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -