📄 serializer.java
字号:
// You can redistribute this software and/or modify it under the terms of// the Ozone Core License version 1 published by ozone-db.org.//// Copyright (C) 2003-@year@, Leo Mekenkamp. All rights reserved.//// $Id: Serializer.java,v 1.3 2004/02/01 20:55:47 leomekenkamp Exp $package org.ozoneDB.core.storage.gammaStore;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectOutputStream;import java.util.Collections;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedHashMap;import java.util.LinkedHashSet;import java.util.Map;import java.util.Set;import java.util.Map.Entry;import java.util.logging.Level;import java.util.logging.Logger;import org.ozoneDB.OzoneInternalException;/** * <p>Takes care of <code>Storable</code> implemeting instances, in that it * has different threads for serializing and writing to <code>Storage</code>. * <code>Remove()</code> ensures that if a null is * returned that the <code>Storable</code> for the given key has been * completely written to <code>Storage</code>.</p> * * <p>For every instance of this class 2 threads are created, so keep this in * mind when creating instances; this might be a resource hog...</p> * * @author leo */public class Serializer { private class SerializeTask implements Runnable { private int pass = 0; private Object id; private Storable storable; byte[] image; SerializeTask(Object id, Storable storable) { this.id = id; this.storable = storable; } Storable getStorable() { return storable; } public String toString() { return "id: " + id + ", pass: " + pass; } public void run() { if (log.isLoggable(Level.FINE)) { log.fine("serializeTask for " + storable.getStorageName() + ", pass " + pass); } switch (pass) { case 0: try { ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); ObjectOutputStream objectStream; if (streamFactory == null) { objectStream = new ObjectOutputStream(byteStream); } else { objectStream = new ObjectOutputStream(streamFactory.createOutputStream(byteStream)); } objectStream.writeObject(storable); objectStream.close(); image = byteStream.toByteArray(); pass++; synchronized (storeAsyncExec) { while (storeAsyncExec.size() > 1) { try { storeAsyncExec.wait(); } catch (InterruptedException ignore) { } } if (log.isLoggable(Level.FINE)) { log.fine("finishing serializeTask for " + storable.getStorageName() + ", pass " + (pass - 1)); } // Note that the next statement may cause run() to be called // again immediately, so we should do all important stuff // before the next call (including pass++). // Because this instance is in the two AsyncExecs at the // same time for a short time, we never run into // problems with get() when we first ask the one and // then the other AsyncExec. storeAsyncExec.put(id, this); } } catch (IOException e) { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "could not serialize, id: " + id + "; storable: " + storable, e); } throw new OzoneInternalException(e); } break; case 1: try { Storage storage = storageFactory.createStorage(storable.getStorageName()); storage.write(image); storage.close(); } catch (IOException e) { log.log(Level.SEVERE, "could not write " + storable.getStorageName(), e); throw new OzoneInternalException(e); } finally { synchronized (storeAsyncExec) { storeAsyncExec.notifyAll(); } } if (log.isLoggable(Level.FINE)) { log.fine("finished serializeTask for " + storable.getStorageName() + ", pass " + pass); } pass++; break; default: throw new OzoneInternalException("Pass " + pass + "? Nobody ever informs me about these things."); } } } private static final Logger log = Logger.getLogger(Serializer.class.getName()); private AsyncExec serializeAsyncExec; private AsyncExec storeAsyncExec; private StorageFactory storageFactory; private StreamFactory streamFactory; public Serializer(StorageFactory storageFactory, StreamFactory streamFactory, String name) { this.storageFactory = storageFactory; this.streamFactory = streamFactory; serializeAsyncExec = new AsyncExec(name + " serialize", Thread.MAX_PRIORITY, true); storeAsyncExec = new AsyncExec(name + " store", Thread.MAX_PRIORITY, true); } /** * Places a storable into a write queue. Works like 'Fire and forget': * method returns fast, while other threads take care of serialization and * the actual persisting (writing). */ public void put(Object id, Storable storable) { if (log.isLoggable(Level.FINER)) log.finer("enqueueing " + storable + " under id " + id); serializeAsyncExec.put(id, new SerializeTask(id, storable)); } /** * Removes a storable from the write queue. If it returns <code>null</code> * then the storable has already been written to disk or has never been * entered through a call to <code>put(Object, Storable)</code>. */ public Storable remove(Object id) { // Must remove from both execs to be sure that all data has been // written. If we only check the serializeAsyncExec and if it returns // null then (and only then) check storeAsynchExec we can return before // all data has been written SerializeTask taskA = (SerializeTask) serializeAsyncExec.remove(id); SerializeTask taskB = (SerializeTask) storeAsyncExec.remove(id); if (taskA == null) { taskA = taskB; } return taskA == null ? null : taskA.getStorable(); } public int size() { return serializeAsyncExec.size() + storeAsyncExec.size(); } public void stopWhenReady() { serializeAsyncExec.stopWhenReady(); storeAsyncExec.stopWhenReady(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -