scheduledthreadpool.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 855 行 · 第 1/2 页
JAVA
855 行
* Called when the environment starts. */ public void environmentStart(EnvironmentClassLoader loader) { } /** * Called when the environment stops. */ public void environmentStop(EnvironmentClassLoader loader) { stop(); } /** * Serialize to a webbeans handle */ public Object writeReplace() { return new WebBeansHandle(ScheduledExecutorService.class); } @Override public String toString() { if (_loader instanceof EnvironmentClassLoader) return "ScheduledThreadPool[" + ((EnvironmentClassLoader) _loader).getId() + "]"; else return "ScheduledThreadPool[" + _loader + "]"; } class TaskFuture<T> implements Future<T>, Runnable { private final ClassLoader _loader; private final Callable<T> _callable; private final Runnable _runnable; private Thread _thread; private boolean _isCancelled; private boolean _isDone; private Exception _exception; private T _value; TaskFuture(ClassLoader loader, Callable<T> callable) { _loader = loader; _callable = callable; _runnable = null; } TaskFuture(ClassLoader loader, Runnable runnable, T result) { _loader = loader; _callable = null; _runnable = runnable; _value = result; } public boolean isCancelled() { return _isCancelled; } public boolean isDone() { return _isDone; } public boolean cancel(boolean mayInterrupt) { synchronized (this) { removeFuture(this); if (_isCancelled || _isDone) return false; _isCancelled = true; notifyAll(); } Thread thread = _thread; if (mayInterrupt && thread != null) thread.interrupt(); return true; } public T get() throws InterruptedException, ExecutionException { try { return get(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new IllegalStateException(e); } } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long expire = Alarm.getCurrentTime() + unit.toMillis(timeout); synchronized (this) { while (! _isDone && ! _isCancelled && Alarm.getCurrentTime() < expire && ! Thread.currentThread().isInterrupted()) { if (! Alarm.isTest()) wait(expire - Alarm.getCurrentTime()); else { wait(1000); break; } } } if (_exception != null) throw new ExecutionException(_exception); else if (_isDone) return _value; else if (_isCancelled) throw new CancellationException(); else throw new TimeoutException(); } public void run() { _thread = Thread.currentThread(); ClassLoader oldLoader = _thread.getContextClassLoader(); try { if (_isCancelled || _isDone || _isShutdown) return; _thread.setContextClassLoader(_loader); if (_callable != null) _value = _callable.call(); else _runnable.run(); } catch (RuntimeException e) { throw e; } catch (Exception e) { _exception = e; } finally { _thread.setContextClassLoader(oldLoader); _thread = null; _isDone = true; _threadPool.completeExecutorTask(); // alarm removeFuture(this); synchronized (this) { notifyAll(); } } } public String toString() { Object task = _callable != null ? _callable : _runnable; if (_isDone) return "TaskFuture[" + task + ",done]"; else if (_thread != null) { if (Alarm.isTest()) return "TaskFuture[" + task + ",active]"; else return "TaskFuture[" + task + "," + _thread + "]"; } else if (_isCancelled) return "TaskFuture[" + task + ",cancelled]"; else return "TaskFuture[" + task + ",pending]"; } } class AlarmFuture<T> implements ScheduledFuture<T>, AlarmListener { private final String _name; private final ClassLoader _loader; private final Callable<T> _callable; private final Runnable _runnable; private final Alarm _alarm; private final long _initialExpires; private final long _period; private final long _delay; private long _nextTime; private Thread _thread; private boolean _isCancelled; private boolean _isDone; private int _alarmCount; private Exception _exception; private T _value; AlarmFuture(ClassLoader loader, Callable<T> callable, long initialExpires, long period, long delay) { _name = "Scheduled[" + callable + "]"; _loader = loader; _callable = callable; _runnable = null; _initialExpires = initialExpires; _period = period; _delay = delay; _nextTime = initialExpires; _alarm = new Alarm(_name, this, loader); } AlarmFuture(ClassLoader loader, Runnable runnable, long initialExpires, long period, long delay) { _name = "Scheduled[" + runnable + "]"; _loader = loader; _callable = null; _runnable = runnable; _initialExpires = initialExpires; _period = period; _delay = delay; _alarm = new Alarm(_name, this, loader); } void queue() { _alarm.queueAt(_initialExpires); } public boolean isCancelled() { return _isCancelled; } public boolean isDone() { return _isDone; } public long getDelay(TimeUnit unit) { long delay = _nextTime - Alarm.getCurrentTime(); return TimeUnit.MILLISECONDS.convert(delay, unit); } public int compareTo(Delayed b) { long delta = (getDelay(TimeUnit.MILLISECONDS) - b.getDelay(TimeUnit.MILLISECONDS)); if (delta < 0) return -1; else if (delta > 0) return 1; else return 0; } public boolean cancel(boolean mayInterrupt) { synchronized (this) { if (_isCancelled || _isDone) return false; _isCancelled = true; _alarm.dequeue(); notifyAll(); } removeFuture(this); Thread thread = _thread; if (mayInterrupt && thread != null) thread.interrupt(); return true; } public T get() throws InterruptedException, ExecutionException { try { return get(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new IllegalStateException(e); } } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long expire = Alarm.getCurrentTime() + unit.toMillis(timeout); int count = _alarmCount; while (! _isDone && ! _isCancelled && count == _alarmCount && Alarm.getCurrentTime() < expire && ! Thread.currentThread().isInterrupted()) { synchronized (this) { wait(expire - Alarm.getCurrentTime()); } } if (_exception != null) throw new ExecutionException(_exception); else if (_isDone || count != _alarmCount) return _value; else if (_isCancelled) throw new CancellationException(); else throw new TimeoutException(); } public void handleAlarm(Alarm alarm) { if (_isCancelled || _isDone || _isShutdown) return; _thread = Thread.currentThread(); ClassLoader oldLoader = _thread.getContextClassLoader(); String oldName = _thread.getName(); try { _thread.setContextClassLoader(_loader); _thread.setName(_name); if (_callable != null) _value = _callable.call(); else _runnable.run(); } catch (Exception e) { log.log(Level.FINE, e.toString(), e); _exception = e; _isCancelled = true; } finally { _thread.setContextClassLoader(oldLoader); _thread.setName(oldName); _thread = null; synchronized (this) { _alarmCount++; if (_isCancelled || _isDone) { removeFuture(this); } else if (_delay > 0) { _nextTime = Alarm.getCurrentTime() + _delay; _alarm.queue(_delay); } else if (_period > 0) { long now = Alarm.getCurrentTime(); long next; do { next = _initialExpires + _alarmCount * _period; if (next < now) _alarmCount++; } while (next < now); _alarm.queueAt(next); } else { _isDone = true; removeFuture(this); } notifyAll(); } } } public String toString() { Object task = _callable != null ? _callable : _runnable; if (_isDone) return "AlarmFuture[" + task + ",done]"; else if (_thread != null) { if (Alarm.isTest()) return "AlarmFuture[" + task + ",active]"; else return "AlarmFuture[" + task + "," + _thread + "]"; } else if (_isCancelled) return "AlarmFuture[" + task + ",cancelled]"; else return "AlarmFuture[" + task + ",pending]"; } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?