📄 orderedthreadpoolexecutor.java
字号:
Thread.yield(); // Let others take the signal. continue; } Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE); synchronized (tasksQueue) { for (Runnable task: tasksQueue) { getQueueHandler().polled(this, (IoEvent) task); answer.add(task); } tasksQueue.clear(); } } return answer; } /** * {@inheritDoc} */ @Override public void execute(Runnable task) { if (shutdown) { rejectTask(task); } // Check that it's a IoEvent task checkTaskType(task); IoEvent event = (IoEvent) task; IoSession session = event.getSession(); // Get the session's queue of events Queue<Runnable> tasksQueue = getTasksQueue(session); boolean offerSession; boolean offerEvent = true; // propose the new event to the event queue handler. If we // use a throttle queue handler, the message may be rejected // if the maximum size has been reached. if (eventQueueHandler != null) { offerEvent = eventQueueHandler.accept(this, event); } if (offerEvent) { // Ok, the message has been accepted synchronized (tasksQueue) { offerSession = tasksQueue.isEmpty(); // Inject the event into the executor taskQueue tasksQueue.offer(event); } } else { offerSession = false; } if (offerSession) { waitingSessions.offer(session); } addWorkerIfNecessary(); if (offerEvent) { if (eventQueueHandler != null) { eventQueueHandler.offered(this, event); } } } private void rejectTask(Runnable task) { getRejectedExecutionHandler().rejectedExecution(task, this); } private void checkTaskType(Runnable task) { if (!(task instanceof IoEvent)) { throw new IllegalArgumentException("task must be an IoEvent or its subclass."); } } /** * {@inheritDoc} */ @Override public int getActiveCount() { synchronized (workers) { return workers.size() - idleWorkers.get(); } } /** * {@inheritDoc} */ @Override public long getCompletedTaskCount() { synchronized (workers) { long answer = completedTaskCount; for (Worker w: workers) { answer += w.completedTaskCount; } return answer; } } /** * {@inheritDoc} */ @Override public int getLargestPoolSize() { return largestPoolSize; } /** * {@inheritDoc} */ @Override public int getPoolSize() { synchronized (workers) { return workers.size(); } } /** * {@inheritDoc} */ @Override public long getTaskCount() { return getCompletedTaskCount(); } /** * {@inheritDoc} */ @Override public boolean isTerminating() { synchronized (workers) { return isShutdown() && !isTerminated(); } } /** * {@inheritDoc} */ @Override public int prestartAllCoreThreads() { int answer = 0; synchronized (workers) { for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) { addWorker(); answer ++; } } return answer; } /** * {@inheritDoc} */ @Override public boolean prestartCoreThread() { synchronized (workers) { if (workers.size() < super.getCorePoolSize()) { addWorker(); return true; } else { return false; } } } /** * {@inheritDoc} */ @Override public BlockingQueue<Runnable> getQueue() { throw new UnsupportedOperationException(); } /** * {@inheritDoc} */ @Override public void purge() { // Nothing to purge in this implementation. } /** * {@inheritDoc} */ @Override public boolean remove(Runnable task) { checkTaskType(task); IoEvent event = (IoEvent) task; IoSession session = event.getSession(); Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE); if (tasksQueue == null) { return false; } boolean removed; synchronized (tasksQueue) { removed = tasksQueue.remove(task); } if (removed) { getQueueHandler().polled(this, event); } return removed; } /** * {@inheritDoc} */ @Override public int getCorePoolSize() { return super.getCorePoolSize(); } /** * {@inheritDoc} */ @Override public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) { throw new IllegalArgumentException("corePoolSize: " + corePoolSize); } if (corePoolSize > super.getMaximumPoolSize()) { throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); } synchronized (workers) { if (super.getCorePoolSize()> corePoolSize) { for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) { removeWorker(); } } super.setCorePoolSize(corePoolSize); } } private Queue<Runnable> getTasksQueue(IoSession session) { Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE); if (tasksQueue == null) { tasksQueue = new ConcurrentLinkedQueue<Runnable>(); Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue); if (oldTasksQueue != null) { tasksQueue = oldTasksQueue; } } return tasksQueue; } private class Worker implements Runnable { private volatile long completedTaskCount; private Thread thread; public void run() { thread = Thread.currentThread(); try { for (;;) { IoSession session = fetchSession(); idleWorkers.decrementAndGet(); if (session == null) { synchronized (workers) { if (workers.size() > getCorePoolSize()) { // Remove now to prevent duplicate exit. workers.remove(this); break; } } } if (session == EXIT_SIGNAL) { break; } try { if (session != null) { runTasks(getTasksQueue(session)); } } finally { idleWorkers.incrementAndGet(); } } } finally { synchronized (workers) { workers.remove(this); OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount; workers.notifyAll(); } } } private IoSession fetchSession() { IoSession session = null; long currentTime = System.currentTimeMillis(); long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); for (;;) { try { long waitTime = deadline - currentTime; if (waitTime <= 0) { break; } try { session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); break; } finally { if (session == null) { currentTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { // Ignore. continue; } } return session; } private void runTasks(Queue<Runnable> tasksQueue) { for (;;) { Runnable task; synchronized (tasksQueue) { if ( tasksQueue.isEmpty()) { break; } task = tasksQueue.poll(); if (task == null) { break; } } if (eventQueueHandler != null) { eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task); } runTask(task); } } private void runTask(Runnable task) { beforeExecute(thread, task); boolean ran = false; try { task.run(); ran = true; afterExecute(task, null); completedTaskCount ++; } catch (RuntimeException e) { if (!ran) { afterExecute(task, e); } throw e; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -