⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractiosession.java

📁 mina是以Java实现的一个开源的网络程序框架
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.core.session;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.net.SocketAddress;import java.nio.channels.FileChannel;import java.util.Iterator;import java.util.Queue;import java.util.Set;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.file.DefaultFileRegion;import org.apache.mina.core.filterchain.IoFilterChain;import org.apache.mina.core.future.CloseFuture;import org.apache.mina.core.future.DefaultCloseFuture;import org.apache.mina.core.future.DefaultReadFuture;import org.apache.mina.core.future.DefaultWriteFuture;import org.apache.mina.core.future.IoFutureListener;import org.apache.mina.core.future.ReadFuture;import org.apache.mina.core.future.WriteFuture;import org.apache.mina.core.service.AbstractIoService;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoProcessor;import org.apache.mina.core.service.IoService;import org.apache.mina.core.service.TransportMetadata;import org.apache.mina.core.write.DefaultWriteRequest;import org.apache.mina.core.write.WriteException;import org.apache.mina.core.write.WriteRequest;import org.apache.mina.core.write.WriteRequestQueue;import org.apache.mina.core.write.WriteTimeoutException;import org.apache.mina.core.write.WriteToClosedSessionException;import org.apache.mina.util.CircularQueue;import org.apache.mina.util.ExceptionMonitor;/** * Base implementation of {@link IoSession}. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 756270 $, $Date: 2009-03-20 00:49:57 +0100 (Fri, 20 Mar 2009) $ */public abstract class AbstractIoSession implements IoSession {    private static final AttributeKey READY_READ_FUTURES_KEY =        new AttributeKey(AbstractIoSession.class, "readyReadFutures");        private static final AttributeKey WAITING_READ_FUTURES_KEY =        new AttributeKey(AbstractIoSession.class, "waitingReadFutures");    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =        new IoFutureListener<CloseFuture>() {            public void operationComplete(CloseFuture future) {                AbstractIoSession s = (AbstractIoSession) future.getSession();                s.scheduledWriteBytes.set(0);                s.scheduledWriteMessages.set(0);                s.readBytesThroughput = 0;                s.readMessagesThroughput = 0;                s.writtenBytesThroughput = 0;                s.writtenMessagesThroughput = 0;            }    };    /**     * An internal write request object that triggers session close.     * @see #writeRequestQueue     */    private static final WriteRequest CLOSE_REQUEST =        new DefaultWriteRequest(new Object());    private final Object lock = new Object();    private IoSessionAttributeMap attributes;    private WriteRequestQueue writeRequestQueue;    private WriteRequest currentWriteRequest;        // The Session creation's time */    private final long creationTime;    /** An id generator guaranteed to generate unique IDs for the session */    private static AtomicLong idGenerator = new AtomicLong(0);        /** The session ID */    private long sessionId;        /**     * A future that will be set 'closed' when the connection is closed.     */    private final CloseFuture closeFuture = new DefaultCloseFuture(this);    private volatile boolean closing;        // traffic control    private boolean readSuspended=false;    private boolean writeSuspended=false;    // Status variables    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();    private long readBytes;    private long writtenBytes;    private long readMessages;    private long writtenMessages;    private long lastReadTime;    private long lastWriteTime;    private long lastThroughputCalculationTime;    private long lastReadBytes;    private long lastWrittenBytes;    private long lastReadMessages;    private long lastWrittenMessages;    private double readBytesThroughput;    private double writtenBytesThroughput;    private double readMessagesThroughput;    private double writtenMessagesThroughput;    private int idleCountForBoth;    private int idleCountForRead;    private int idleCountForWrite;    private long lastIdleTimeForBoth;    private long lastIdleTimeForRead;    private long lastIdleTimeForWrite;    private boolean deferDecreaseReadBuffer = true;    /**     * TODO Add method documentation     */    protected AbstractIoSession() {        // Initialize all the Session counters to the current time         long currentTime = System.currentTimeMillis();        creationTime = currentTime;        lastThroughputCalculationTime = currentTime;        lastReadTime = currentTime;        lastWriteTime = currentTime;        lastIdleTimeForBoth = currentTime;        lastIdleTimeForRead = currentTime;        lastIdleTimeForWrite = currentTime;                // TODO add documentation        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);                // Set a new ID for this session        sessionId = idGenerator.incrementAndGet();    }    /**     * {@inheritDoc}     *      * We use an AtomicLong to guarantee that the session ID are     * unique.     */    public final long getId() {        return sessionId;    }    /**     * TODO Add method documentation     */    public abstract IoProcessor getProcessor();    /**     * {@inheritDoc}     */    public final boolean isConnected() {        return !closeFuture.isClosed();    }    /**     * {@inheritDoc}     */    public final boolean isClosing() {        return closing || closeFuture.isClosed();    }    /**     * {@inheritDoc}     */    public final CloseFuture getCloseFuture() {        return closeFuture;    }    /**     * TODO Add method documentation     */    public final boolean isScheduledForFlush() {        return scheduledForFlush.get();    }    /**     * TODO Add method documentation     */    public final boolean setScheduledForFlush(boolean flag) {        if (flag) {            return scheduledForFlush.compareAndSet(false, true);        } else {            scheduledForFlush.set(false);            return true;        }    }    /**     * {@inheritDoc}     */    public final CloseFuture close(boolean rightNow) {        if (rightNow) {            return close();        } else {            return closeOnFlush();        }    }    /**     * {@inheritDoc}     */    public final CloseFuture close() {        synchronized (lock) {            if (isClosing()) {                return closeFuture;            } else {                closing = true;            }        }        getFilterChain().fireFilterClose();        return closeFuture;    }    private final CloseFuture closeOnFlush() {        getWriteRequestQueue().offer(this, CLOSE_REQUEST);        getProcessor().flush(this);        return closeFuture;    }    /**     * {@inheritDoc}     */    public final ReadFuture read() {        if (!getConfig().isUseReadOperation()) {            throw new IllegalStateException("useReadOperation is not enabled.");        }        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();        ReadFuture future;        synchronized (readyReadFutures) {            future = readyReadFutures.poll();            if (future != null) {                if (future.isClosed()) {                    // Let other readers get notified.                    readyReadFutures.offer(future);                }            } else {                future = new DefaultReadFuture(this);                getWaitingReadFutures().offer(future);            }        }        return future;    }    /**     * TODO Add method documentation     */    public final void offerReadFuture(Object message) {        newReadFuture().setRead(message);    }    /**     * TODO Add method documentation     */    public final void offerFailedReadFuture(Throwable exception) {        newReadFuture().setException(exception);    }    /**     * TODO Add method documentation     */    public final void offerClosedReadFuture() {        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();        synchronized (readyReadFutures) {            newReadFuture().setClosed();        }    }    /**     * TODO Add method documentation     */    private ReadFuture newReadFuture() {        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();        Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();        ReadFuture future;        synchronized (readyReadFutures) {            future = waitingReadFutures.poll();            if (future == null) {                future = new DefaultReadFuture(this);                readyReadFutures.offer(future);            }        }        return future;    }    /**     * TODO Add method documentation     */    private Queue<ReadFuture> getReadyReadFutures() {        Queue<ReadFuture> readyReadFutures =            (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);        if (readyReadFutures == null) {            readyReadFutures = new CircularQueue<ReadFuture>();            Queue<ReadFuture> oldReadyReadFutures =                (Queue<ReadFuture>) setAttributeIfAbsent(                        READY_READ_FUTURES_KEY, readyReadFutures);            if (oldReadyReadFutures != null) {                readyReadFutures = oldReadyReadFutures;            }        }        return readyReadFutures;    }    /**     * TODO Add method documentation     */    private Queue<ReadFuture> getWaitingReadFutures() {        Queue<ReadFuture> waitingReadyReadFutures =            (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);        if (waitingReadyReadFutures == null) {            waitingReadyReadFutures = new CircularQueue<ReadFuture>();            Queue<ReadFuture> oldWaitingReadyReadFutures =                (Queue<ReadFuture>) setAttributeIfAbsent(                        WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);            if (oldWaitingReadyReadFutures != null) {                waitingReadyReadFutures = oldWaitingReadyReadFutures;            }        }        return waitingReadyReadFutures;    }    /**     * {@inheritDoc}     */    public WriteFuture write(Object message) {        return write(message, null);    }    /**     * {@inheritDoc}     */    public WriteFuture write(Object message, SocketAddress remoteAddress) {        if (message == null) {            throw new NullPointerException("message");        }        // We can't send a message to a connected session if we don't have         // the remote address        if (!getTransportMetadata().isConnectionless() &&                remoteAddress != null) {            throw new UnsupportedOperationException();        }                // If the session has been closed or is closing, we can't either        // send a message to the remote side. We generate a future        // containing an exception.        if (isClosing() || !isConnected()) {            WriteFuture future = new DefaultWriteFuture(this);            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);            WriteException writeException = new WriteToClosedSessionException(request);            future.setException(writeException);            return future;        }        FileChannel openedFileChannel = null;                // TODO: remove this code as soon as we use InputStream        // instead of Object for the message.        try {            if (message instanceof IoBuffer                    && !((IoBuffer) message).hasRemaining()) {                // Nothing to write : probably an error in the user code                throw new IllegalArgumentException(                "message is empty. Forgot to call flip()?");            } else if (message instanceof FileChannel) {                FileChannel fileChannel = (FileChannel) message;                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());            } else if (message instanceof File) {                File file = (File) message;                openedFileChannel = new FileInputStream(file).getChannel();                message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());            }        } catch (IOException e) {            ExceptionMonitor.getInstance().exceptionCaught(e);            return DefaultWriteFuture.newNotWrittenFuture(this, e);        }        // Now, we can write the message. First, create a future        WriteFuture writeFuture = new DefaultWriteFuture(this);        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);                // Then, get the chain and inject the WriteRequest into it        IoFilterChain filterChain = getFilterChain();        filterChain.fireFilterWrite(writeRequest);        // TODO : This is not our business ! The caller has created a FileChannel,        // he has to close it !        if (openedFileChannel != null) {            // If we opened a FileChannel, it needs to be closed when the write has completed            final FileChannel finalChannel = openedFileChannel;            writeFuture.addListener(new IoFutureListener<WriteFuture>() {                public void operationComplete(WriteFuture future) {                    try {                        finalChannel.close();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -