📄 abstractpollingioprocessor.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.core.polling;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.file.FileRegion;import org.apache.mina.core.filterchain.IoFilterChain;import org.apache.mina.core.future.DefaultIoFuture;import org.apache.mina.core.service.AbstractIoService;import org.apache.mina.core.service.IoProcessor;import org.apache.mina.core.session.AbstractIoSession;import org.apache.mina.core.session.IoSession;import org.apache.mina.core.session.IoSessionConfig;import org.apache.mina.core.write.WriteRequest;import org.apache.mina.core.write.WriteRequestQueue;import org.apache.mina.core.write.WriteToClosedSessionException;import org.apache.mina.util.ExceptionMonitor;import org.apache.mina.util.NamePreservingRunnable;/** * An abstract implementation of {@link IoProcessor} which helps * transport developers to write an {@link IoProcessor} easily. * This class is in charge of active polling a set of {@link IoSession} * and trigger events when some I/O operation is possible. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 751744 $, $Date: 2009-03-09 17:53:13 +0100 (Mon, 09 Mar 2009) $ */public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> { /** * The maximum loop count for a write operation until * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value. * It is similar to what a spin lock is for in concurrency programming. * It improves memory utilization and write throughput significantly. */ private static final int WRITE_SPIN_COUNT = 256; /** A timeout used for the select, as we need to get out to deal with idle sessions */ private static final long SELECT_TIMEOUT = 1000L; /** A map containing the last Thread ID for each class */ private static final Map<Class<?>, AtomicInteger> threadIds = new HashMap<Class<?>, AtomicInteger>(); private final Object lock = new Object(); private final String threadName; private final Executor executor; /** A Session queue containing the newly created sessions */ private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>(); private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>(); private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>(); private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>(); /** The processor thread : it handles the incoming messages */ private Processor processor; private long lastIdleCheckTime; private final Object disposalLock = new Object(); private volatile boolean disposing; private volatile boolean disposed; private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null); /** * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor} * for handling I/Os events. * * @param executor the {@link Executor} for handling I/O events */ protected AbstractPollingIoProcessor(Executor executor) { if (executor == null) { throw new NullPointerException("executor"); } this.threadName = nextThreadName(); this.executor = executor; } /** * Compute the thread ID for this class instance. As we may have different * classes, we store the last ID number into a Map associating the class * name to the last assigned ID. * * @return a name for the current thread, based on the class name and * an incremental value, starting at 1. */ private String nextThreadName() { Class<?> cls = getClass(); int newThreadId; // We synchronize this block to avoid a concurrent access to // the actomicInteger (it can be modified by another thread, while // being seen as null by another thread) synchronized( threadIds ) { // Get the current ID associated to this class' name AtomicInteger threadId = threadIds.get(cls); if (threadId == null) { // We never have seen this class before, just create a // new ID starting at 1 for it, and associate this ID // with the class name in the map. newThreadId = 1; threadIds.put(cls, new AtomicInteger(newThreadId)); } else { // Just increment the lat ID, and get it. newThreadId = threadId.incrementAndGet(); } } // Now we can compute the name for this thread return cls.getSimpleName() + '-' + newThreadId; } /** * {@inheritDoc} */ public final boolean isDisposing() { return disposing; } /** * {@inheritDoc} */ public final boolean isDisposed() { return disposed; } /** * {@inheritDoc} */ public final void dispose() { if (disposed) { return; } synchronized (disposalLock) { if (!disposing) { disposing = true; startupProcessor(); } } disposalFuture.awaitUninterruptibly(); disposed = true; } /** * Dispose the resources used by this {@link IoProcessor} for polling * the client connections * @throws Exception if some low level IO error occurs */ protected abstract void dispose0() throws Exception; /** * poll those sessions for the given timeout * @param timeout milliseconds before the call timeout if no event appear * @return The number of session ready for read or for write * @throws Exception if some low level IO error occurs */ protected abstract int select(long timeout) throws Exception; /** * poll those sessions forever * @return The number of session ready for read or for write * @throws Exception if some low level IO error occurs */ protected abstract int select() throws Exception; /** * Say if the list of {@link IoSession} polled by this {@link IoProcessor} * is empty * @return true if at least a session is managed by this {@link IoProcessor} */ protected abstract boolean isSelectorEmpty(); /** * Interrupt the {@link AbstractPollingIoProcessor#select(int) call. */ protected abstract void wakeup(); /** * Get an {@link Iterator} for the list of {@link IoSession} polled by this * {@link IoProcessor} * @return {@link Iterator} of {@link IoSession} */ protected abstract Iterator<T> allSessions(); /** * Get an {@link Iterator} for the list of {@link IoSession} found selected * by the last call of {@link AbstractPollingIoProcessor#select(int) * @return {@link Iterator} of {@link IoSession} read for I/Os operation */ protected abstract Iterator<T> selectedSessions(); /** * Get the state of a session (preparing, open, closed) * @param session the {@link IoSession} to inspect * @return the state of the session */ protected abstract SessionState state(T session); /** * Is the session ready for writing * @param session the session queried * @return true is ready, false if not ready */ protected abstract boolean isWritable(T session); /** * Is the session ready for reading * @param session the session queried * @return true is ready, false if not ready */ protected abstract boolean isReadable(T session); /** * register a session for writing * @param session the session registered * @param interested true for registering, false for removing */ protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception; /** * register a session for reading * @param session the session registered * @param interested true for registering, false for removing */ protected abstract void setInterestedInRead(T session, boolean interested) throws Exception; /** * is this session registered for reading * @param session the session queried * @return true is registered for reading */ protected abstract boolean isInterestedInRead(T session); /** * is this session registered for writing * @param session the session queried * @return true is registered for writing */ protected abstract boolean isInterestedInWrite(T session); /** * Initialize the polling of a session. Add it to the polling process. * @param session the {@link IoSession} to add to the polling * @throws Exception any exception thrown by the underlying system calls */ protected abstract void init(T session) throws Exception; /** * Destroy the underlying client socket handle * @param session the {@link IoSession} * @throws Exception any exception thrown by the underlying system calls */ protected abstract void destroy(T session) throws Exception; /** * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}. * Is called when the session was found ready for reading. * @param session the session to read * @param buf the buffer to fill * @return the number of bytes read * @throws Exception any exception thrown by the underlying system calls */ protected abstract int read(T session, IoBuffer buf) throws Exception; /** * Write a sequence of bytes to a {@link IoSession}, means to be called when a session * was found ready for writing. * @param session the session to write * @param buf the buffer to write * @param length the number of bytes to write can be superior to the number of bytes remaining * in the buffer * @return the number of byte written * @throws Exception any exception thrown by the underlying system calls */ protected abstract int write(T session, IoBuffer buf, int length) throws Exception; /** * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call. * @param session the session to write * @param region the file region to write
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -