⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractpollingioprocessor.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.core.polling;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.file.FileRegion;import org.apache.mina.core.filterchain.IoFilterChain;import org.apache.mina.core.future.DefaultIoFuture;import org.apache.mina.core.service.AbstractIoService;import org.apache.mina.core.service.IoProcessor;import org.apache.mina.core.session.AbstractIoSession;import org.apache.mina.core.session.IoSession;import org.apache.mina.core.session.IoSessionConfig;import org.apache.mina.core.write.WriteRequest;import org.apache.mina.core.write.WriteRequestQueue;import org.apache.mina.core.write.WriteToClosedSessionException;import org.apache.mina.util.ExceptionMonitor;import org.apache.mina.util.NamePreservingRunnable;/** * An abstract implementation of {@link IoProcessor} which helps * transport developers to write an {@link IoProcessor} easily. * This class is in charge of active polling a set of {@link IoSession} * and trigger events when some I/O operation is possible. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 751744 $, $Date: 2009-03-09 17:53:13 +0100 (Mon, 09 Mar 2009) $ */public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {    /**     * The maximum loop count for a write operation until     * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.     * It is similar to what a spin lock is for in concurrency programming.     * It improves memory utilization and write throughput significantly.     */    private static final int WRITE_SPIN_COUNT = 256;        /** A timeout used for the select, as we need to get out to deal with idle sessions */    private static final long SELECT_TIMEOUT = 1000L;    /** A map containing the last Thread ID for each class */    private static final Map<Class<?>, AtomicInteger> threadIds =         new HashMap<Class<?>, AtomicInteger>();    private final Object lock = new Object();    private final String threadName;    private final Executor executor;    /** A Session queue containing the newly created sessions */    private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();    private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();    private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();    private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();    /** The processor thread : it handles the incoming messages */    private Processor processor;        private long lastIdleCheckTime;    private final Object disposalLock = new Object();    private volatile boolean disposing;    private volatile boolean disposed;    private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);    /**     * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}     * for handling I/Os events.     *      * @param executor the {@link Executor} for handling I/O events     */    protected AbstractPollingIoProcessor(Executor executor) {        if (executor == null) {            throw new NullPointerException("executor");        }        this.threadName = nextThreadName();        this.executor = executor;    }    /**     * Compute the thread ID for this class instance. As we may have different     * classes, we store the last ID number into a Map associating the class     * name to the last assigned ID.     *        * @return a name for the current thread, based on the class name and     * an incremental value, starting at 1.      */    private String nextThreadName() {        Class<?> cls = getClass();        int newThreadId;                // We synchronize this block to avoid a concurrent access to         // the actomicInteger (it can be modified by another thread, while        // being seen as null by another thread)        synchronized( threadIds ) {            // Get the current ID associated to this class' name            AtomicInteger threadId = threadIds.get(cls);                        if (threadId == null) {                // We never have seen this class before, just create a                // new ID starting at 1 for it, and associate this ID                // with the class name in the map.                newThreadId = 1;                threadIds.put(cls, new AtomicInteger(newThreadId));            } else {                // Just increment the lat ID, and get it.                newThreadId = threadId.incrementAndGet();            }        }                // Now we can compute the name for this thread        return cls.getSimpleName() + '-' + newThreadId;    }    /**     * {@inheritDoc}     */    public final boolean isDisposing() {        return disposing;    }    /**     * {@inheritDoc}     */    public final boolean isDisposed() {        return disposed;    }        /**     * {@inheritDoc}     */    public final void dispose() {        if (disposed) {            return;        }        synchronized (disposalLock) {            if (!disposing) {                disposing = true;                startupProcessor();            }        }        disposalFuture.awaitUninterruptibly();        disposed = true;    }    /**     * Dispose the resources used by this {@link IoProcessor} for polling      * the client connections     * @throws Exception if some low level IO error occurs     */    protected abstract void dispose0() throws Exception;    /**     * poll those sessions for the given timeout     * @param timeout milliseconds before the call timeout if no event appear     * @return The number of session ready for read or for write     * @throws Exception if some low level IO error occurs     */    protected abstract int select(long timeout) throws Exception;        /**     * poll those sessions forever     * @return The number of session ready for read or for write     * @throws Exception if some low level IO error occurs     */    protected abstract int select() throws Exception;        /**     * Say if the list of {@link IoSession} polled by this {@link IoProcessor}      * is empty     * @return true if at least a session is managed by this {@link IoProcessor}     */    protected abstract boolean isSelectorEmpty();        /**     * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.     */    protected abstract void wakeup();        /**     * Get an {@link Iterator} for the list of {@link IoSession} polled by this     * {@link IoProcessor}        * @return {@link Iterator} of {@link IoSession}     */    protected abstract Iterator<T> allSessions();        /**     * Get an {@link Iterator} for the list of {@link IoSession} found selected      * by the last call of {@link AbstractPollingIoProcessor#select(int)     * @return {@link Iterator} of {@link IoSession} read for I/Os operation     */    protected abstract Iterator<T> selectedSessions();        /**     * Get the state of a session (preparing, open, closed)     * @param session the {@link IoSession} to inspect     * @return the state of the session     */    protected abstract SessionState state(T session);    /**     * Is the session ready for writing     * @param session the session queried     * @return true is ready, false if not ready     */    protected abstract boolean isWritable(T session);    /**     * Is the session ready for reading     * @param session the session queried     * @return true is ready, false if not ready     */    protected abstract boolean isReadable(T session);    /**     * register a session for writing     * @param session the session registered     * @param interested true for registering, false for removing     */    protected abstract void setInterestedInWrite(T session, boolean interested)            throws Exception;    /**     * register a session for reading     * @param session the session registered     * @param interested true for registering, false for removing     */    protected abstract void setInterestedInRead(T session, boolean interested)            throws Exception;    /**     * is this session registered for reading     * @param session the session queried     * @return true is registered for reading     */    protected abstract boolean isInterestedInRead(T session);    /**     * is this session registered for writing     * @param session the session queried     * @return true is registered for writing     */    protected abstract boolean isInterestedInWrite(T session);    /**     * Initialize the polling of a session. Add it to the polling process.      * @param session the {@link IoSession} to add to the polling     * @throws Exception any exception thrown by the underlying system calls     */    protected abstract void init(T session) throws Exception;        /**     * Destroy the underlying client socket handle     * @param session the {@link IoSession}     * @throws Exception any exception thrown by the underlying system calls     */    protected abstract void destroy(T session) throws Exception;        /**     * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}.      * Is called when the session was found ready for reading.     * @param session the session to read     * @param buf the buffer to fill     * @return the number of bytes read     * @throws Exception any exception thrown by the underlying system calls     */    protected abstract int read(T session, IoBuffer buf) throws Exception;    /**     * Write a sequence of bytes to a {@link IoSession}, means to be called when a session     * was found ready for writing.     * @param session the session to write     * @param buf the buffer to write     * @param length the number of bytes to write can be superior to the number of bytes remaining     * in the buffer     * @return the number of byte written     * @throws Exception any exception thrown by the underlying system calls     */    protected abstract int write(T session, IoBuffer buf, int length) throws Exception;        /**     * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting     * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so      * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call.      * @param session the session to write     * @param region the file region to write

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -