📄 asyncexec.java
字号:
// You can redistribute this software and/or modify it under the terms of// the Ozone Core License version 1 published by ozone-db.org.//// Copyright (C) 2003-@year@, Leo Mekenkamp. All rights reserved.//// $Id: AsyncExec.java,v 1.4 2004/02/01 20:55:47 leomekenkamp Exp $package org.ozoneDB.core.storage.gammaStore;import java.io.ByteArrayOutputStream;import java.io.PrintStream;import java.util.LinkedHashMap;import java.util.Map;import java.util.logging.Level;import java.util.logging.Logger;/** * <p>Runs tasks in a separate thread. Tasks are identified by the keys used * to put them in this AsyncExec. When a task has been run, it will be * automatically removed from this instance.</p> * * <p>If a task throws a <code>Throwable</code> in its <code>run()</code> method, * it will be caught and displayed; the thread handling all tasks will continue * normally.</p> * * <p>For every instance of this class 1 thread is created, so keep this in * mind when creating instances...</p> * * * @author leo */public class AsyncExec { private static final Logger log = Logger.getLogger(AsyncExec.class.getName()); private static final int NOT_STARTED = 0; private static final int RUNNING = 1; private static final int STOPPING = 2; private static final int STOPPED = 3; private LinkedHashMap _map = new LinkedHashMap(); private volatile int status = NOT_STARTED; /** * key of the <code>Runnable</code> that is currently being processed, or * has recently finished its <code>run()</code> method. */ private volatile Object currentKey = null; private ProcessorThread thread; public AsyncExec(String threadName, int priority, boolean useDaemonThread) { thread = new ProcessorThread(threadName); thread.setPriority(priority); thread.setDaemon(useDaemonThread); thread.start(); synchronized (this) { while (getStatus() < RUNNING) { try { wait(1000); } catch (InterruptedException ignore) { } } } } /** * Puts a task into this instance. If a task is already present under that * key it will be 'overwritten', just like in <code>java.util.Map</code> * instances. * * @param key key used to identify the given task * @param task task to be run * @return Runnable the task already present under the given key * @throws IllegalStateException if stopped or stopping */ public synchronized Runnable put(Object key, Runnable task) { // first make sure that if the task is running, it can finish first remove(key); checkStatus(); Runnable result = (Runnable) getMap().put(key, task); notifyAll(); return result; } /** * @throws IllegalStateException if stopped or stopping */ private void checkStatus() { switch (getStatus()) { case NOT_STARTED: throw new IllegalStateException("not started"); // break; case RUNNING: break; case STOPPING: throw new IllegalStateException("stopping"); case STOPPED: throw new IllegalStateException("stopped"); // break; default: throw new IllegalStateException("It's worse than that: he's dead Jim."); } } /** * Returns the task that was put into this instance with the specified key. * If the task with corresponding specified key is being run at the very * moment of this call, then this method will block until the tasks * <code>run()</code> method has completed. * * @param key key used to lookup the task * @throws IllegalStateException if stopped or stopping * @return task for specified key (<code>null</code> if no such key) */ public synchronized Runnable get(Object key) { checkStatus(); while (key.equals(getCurrentKey())) { try { wait(); } catch (InterruptedException ignore) { } } return (Runnable) getMap().get(key); } /** Removes the task that was put into this instance with the specified key. * * @param key key used to lookup the task * @throws IllegalStateException if stopped or stopping * @return task for specified key (<code>null</code> if no such key) */ public synchronized Runnable remove(Object key) { if (key == null) { throw new IllegalArgumentException("key must not be null"); } while (key.equals(getCurrentKey())) { if (log.isLoggable(Level.FINE)) log.fine(this + ": key is currentKey: " + getCurrentKey()); try { wait(); } catch (InterruptedException ignore) { } } return (Runnable) getMap().remove(key); } /** * Returns the number of tasks currently in this instance. Note that this * number will decrease over time (if no new tasks are put in) every time * a task has completed. * * @return number of tasks */ public synchronized int size() { return getMap().size(); } /** * Tells this instance to stop; no more tasks can be put in or removed. * Blocks until all tasks have completed. */ public void stopWhenReady() { synchronized (this) { checkStatus(); setStatus(STOPPING); notifyAll(); } for (boolean interrupted = true; interrupted; ) { try { thread.join(); interrupted = false; } catch (InterruptedException ignore) { } } } private void setStatus(int status) { this.status = status; } private int getStatus() { return status; } private Object getCurrentKey() { return currentKey; } private void setCurrentKey(Object currentKey) { this.currentKey = currentKey; } private Map getMap() { return _map; } /** * Handles all execution of tasks. */ private class ProcessorThread extends Thread { public ProcessorThread(String name) { super(name); } public void run() { synchronized (AsyncExec.this) { setStatus(RUNNING); AsyncExec.this.notifyAll(); } if (log.isLoggable(Level.FINE)) log.fine(this + " started"); for (;;) { Runnable task; synchronized (AsyncExec.this) { // note that the first time we pass here the current key is // _never_ in the map! See comment on currentKey. if (log.isLoggable(Level.FINER)) log.finer("key has finished: " + getCurrentKey()); getMap().remove(getCurrentKey()); setCurrentKey(null); AsyncExec.this.notifyAll(); while (getMap().size() == 0) { if (getStatus() == STOPPING) { setStatus(STOPPED); if (log.isLoggable(Level.FINE)) log.fine(this + " almost stopped"); return; } try { // TODO: find out why this next call is needed (lockup if absent) AsyncExec.this.notifyAll(); AsyncExec.this.wait(); } catch (InterruptedException ignore) { } } Map.Entry entry = (Map.Entry) getMap().entrySet().iterator().next(); setCurrentKey(entry.getKey()); task = (Runnable) entry.getValue(); } try { if (log.isLoggable(Level.FINER)) log.finer("running task with key: " + getCurrentKey()); task.run(); } catch (Throwable t) { ByteArrayOutputStream buf = new ByteArrayOutputStream(); PrintStream printStream = new PrintStream(buf); t.printStackTrace(printStream); printStream.close(); log.severe(this + " has caught unhandled Throwable." + " Thread will NOT stop, Stacktrace will follow" + " both on stdout and here.\n" + buf.toString()); t.printStackTrace(); } } } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -