⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 orderedthreadpoolexecutor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                Thread.yield(); // Let others take the signal.                continue;            }            Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);                        synchronized (tasksQueue) {                                for (Runnable task: tasksQueue) {                    getQueueHandler().polled(this, (IoEvent) task);                    answer.add(task);                }                                tasksQueue.clear();            }        }        return answer;    }    /**     * {@inheritDoc}     */    @Override    public void execute(Runnable task) {        if (shutdown) {            rejectTask(task);        }        // Check that it's a IoEvent task        checkTaskType(task);        IoEvent event = (IoEvent) task;        IoSession session = event.getSession();                // Get the session's queue of events        Queue<Runnable> tasksQueue = getTasksQueue(session);        boolean offerSession;        boolean offerEvent = true;                // propose the new event to the event queue handler. If we        // use a throttle queue handler, the message may be rejected        // if the maximum size has been reached.        if (eventQueueHandler != null) {            offerEvent = eventQueueHandler.accept(this, event);        }                if (offerEvent) {            // Ok, the message has been accepted            synchronized (tasksQueue) {                offerSession = tasksQueue.isEmpty();                // Inject the event into the executor taskQueue                tasksQueue.offer(event);            }        } else {            offerSession = false;        }        if (offerSession) {            waitingSessions.offer(session);        }        addWorkerIfNecessary();        if (offerEvent) {            if (eventQueueHandler != null) {                eventQueueHandler.offered(this, event);            }        }    }    private void rejectTask(Runnable task) {        getRejectedExecutionHandler().rejectedExecution(task, this);    }    private void checkTaskType(Runnable task) {        if (!(task instanceof IoEvent)) {            throw new IllegalArgumentException("task must be an IoEvent or its subclass.");        }    }    /**     * {@inheritDoc}     */    @Override    public int getActiveCount() {        synchronized (workers) {            return workers.size() - idleWorkers.get();        }    }    /**     * {@inheritDoc}     */    @Override    public long getCompletedTaskCount() {        synchronized (workers) {            long answer = completedTaskCount;            for (Worker w: workers) {                answer += w.completedTaskCount;            }            return answer;        }    }    /**     * {@inheritDoc}     */    @Override    public int getLargestPoolSize() {        return largestPoolSize;    }    /**     * {@inheritDoc}     */    @Override    public int getPoolSize() {        synchronized (workers) {            return workers.size();        }    }    /**     * {@inheritDoc}     */    @Override    public long getTaskCount() {        return getCompletedTaskCount();    }    /**     * {@inheritDoc}     */    @Override    public boolean isTerminating() {        synchronized (workers) {            return isShutdown() && !isTerminated();        }    }    /**     * {@inheritDoc}     */    @Override    public int prestartAllCoreThreads() {        int answer = 0;        synchronized (workers) {            for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {                addWorker();                answer ++;            }        }        return answer;    }    /**     * {@inheritDoc}     */    @Override    public boolean prestartCoreThread() {        synchronized (workers) {            if (workers.size() < super.getCorePoolSize()) {                addWorker();                return true;            } else {                return false;            }        }    }    /**     * {@inheritDoc}     */    @Override    public BlockingQueue<Runnable> getQueue() {        throw new UnsupportedOperationException();    }    /**     * {@inheritDoc}     */    @Override    public void purge() {        // Nothing to purge in this implementation.    }    /**     * {@inheritDoc}     */    @Override    public boolean remove(Runnable task) {        checkTaskType(task);        IoEvent event = (IoEvent) task;        IoSession session = event.getSession();        Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE);                if (tasksQueue == null) {            return false;        }        boolean removed;                synchronized (tasksQueue) {            removed = tasksQueue.remove(task);        }        if (removed) {            getQueueHandler().polled(this, event);        }        return removed;    }    /**     * {@inheritDoc}     */    @Override    public int getCorePoolSize() {        return super.getCorePoolSize();    }    /**     * {@inheritDoc}     */    @Override    public void setCorePoolSize(int corePoolSize) {        if (corePoolSize < 0) {            throw new IllegalArgumentException("corePoolSize: " + corePoolSize);        }        if (corePoolSize > super.getMaximumPoolSize()) {            throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");        }        synchronized (workers) {            if (super.getCorePoolSize()> corePoolSize) {                for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {                    removeWorker();                }            }            super.setCorePoolSize(corePoolSize);        }    }    private Queue<Runnable> getTasksQueue(IoSession session) {        Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);                if (tasksQueue == null) {            tasksQueue = new ConcurrentLinkedQueue<Runnable>();            Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue);                    if (oldTasksQueue != null) {                tasksQueue = oldTasksQueue;            }        }                return tasksQueue;    }    private class Worker implements Runnable {        private volatile long completedTaskCount;        private Thread thread;        public void run() {            thread = Thread.currentThread();            try {                for (;;) {                    IoSession session = fetchSession();                    idleWorkers.decrementAndGet();                    if (session == null) {                        synchronized (workers) {                            if (workers.size() > getCorePoolSize()) {                                // Remove now to prevent duplicate exit.                                workers.remove(this);                                break;                            }                        }                    }                    if (session == EXIT_SIGNAL) {                        break;                    }                    try {                        if (session != null) {                            runTasks(getTasksQueue(session));                        }                    } finally {                        idleWorkers.incrementAndGet();                    }                }            } finally {                synchronized (workers) {                    workers.remove(this);                    OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;                    workers.notifyAll();                }            }        }        private IoSession fetchSession() {            IoSession session = null;            long currentTime = System.currentTimeMillis();            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);            for (;;) {                try {                    long waitTime = deadline - currentTime;                    if (waitTime <= 0) {                        break;                    }                    try {                        session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);                        break;                    } finally {                        if (session == null) {                            currentTime = System.currentTimeMillis();                        }                    }                } catch (InterruptedException e) {                    // Ignore.                    continue;                }            }            return session;        }        private void runTasks(Queue<Runnable> tasksQueue) {            for (;;) {                Runnable task;                                synchronized (tasksQueue) {                    if ( tasksQueue.isEmpty()) {                        break;                    }                    task = tasksQueue.poll();                    if (task == null) {                        break;                    }                }                if (eventQueueHandler != null) {                    eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);                }                runTask(task);            }        }        private void runTask(Runnable task) {            beforeExecute(thread, task);            boolean ran = false;            try {                task.run();                ran = true;                afterExecute(task, null);                completedTaskCount ++;            } catch (RuntimeException e) {                if (!ran) {                    afterExecute(task, e);                }                throw e;            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -