📄 abstractiosession.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.core.session;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.net.SocketAddress;import java.nio.channels.FileChannel;import java.util.Iterator;import java.util.Queue;import java.util.Set;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.file.DefaultFileRegion;import org.apache.mina.core.filterchain.IoFilterChain;import org.apache.mina.core.future.CloseFuture;import org.apache.mina.core.future.DefaultCloseFuture;import org.apache.mina.core.future.DefaultReadFuture;import org.apache.mina.core.future.DefaultWriteFuture;import org.apache.mina.core.future.IoFutureListener;import org.apache.mina.core.future.ReadFuture;import org.apache.mina.core.future.WriteFuture;import org.apache.mina.core.service.AbstractIoService;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoProcessor;import org.apache.mina.core.service.IoService;import org.apache.mina.core.service.TransportMetadata;import org.apache.mina.core.write.DefaultWriteRequest;import org.apache.mina.core.write.WriteException;import org.apache.mina.core.write.WriteRequest;import org.apache.mina.core.write.WriteRequestQueue;import org.apache.mina.core.write.WriteTimeoutException;import org.apache.mina.core.write.WriteToClosedSessionException;import org.apache.mina.util.CircularQueue;import org.apache.mina.util.ExceptionMonitor;/** * Base implementation of {@link IoSession}. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 756270 $, $Date: 2009-03-20 00:49:57 +0100 (Fri, 20 Mar 2009) $ */public abstract class AbstractIoSession implements IoSession { private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, "readyReadFutures"); private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, "waitingReadFutures"); private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { public void operationComplete(CloseFuture future) { AbstractIoSession s = (AbstractIoSession) future.getSession(); s.scheduledWriteBytes.set(0); s.scheduledWriteMessages.set(0); s.readBytesThroughput = 0; s.readMessagesThroughput = 0; s.writtenBytesThroughput = 0; s.writtenMessagesThroughput = 0; } }; /** * An internal write request object that triggers session close. * @see #writeRequestQueue */ private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); private final Object lock = new Object(); private IoSessionAttributeMap attributes; private WriteRequestQueue writeRequestQueue; private WriteRequest currentWriteRequest; // The Session creation's time */ private final long creationTime; /** An id generator guaranteed to generate unique IDs for the session */ private static AtomicLong idGenerator = new AtomicLong(0); /** The session ID */ private long sessionId; /** * A future that will be set 'closed' when the connection is closed. */ private final CloseFuture closeFuture = new DefaultCloseFuture(this); private volatile boolean closing; // traffic control private boolean readSuspended=false; private boolean writeSuspended=false; // Status variables private final AtomicBoolean scheduledForFlush = new AtomicBoolean(); private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); private final AtomicInteger scheduledWriteMessages = new AtomicInteger(); private long readBytes; private long writtenBytes; private long readMessages; private long writtenMessages; private long lastReadTime; private long lastWriteTime; private long lastThroughputCalculationTime; private long lastReadBytes; private long lastWrittenBytes; private long lastReadMessages; private long lastWrittenMessages; private double readBytesThroughput; private double writtenBytesThroughput; private double readMessagesThroughput; private double writtenMessagesThroughput; private int idleCountForBoth; private int idleCountForRead; private int idleCountForWrite; private long lastIdleTimeForBoth; private long lastIdleTimeForRead; private long lastIdleTimeForWrite; private boolean deferDecreaseReadBuffer = true; /** * TODO Add method documentation */ protected AbstractIoSession() { // Initialize all the Session counters to the current time long currentTime = System.currentTimeMillis(); creationTime = currentTime; lastThroughputCalculationTime = currentTime; lastReadTime = currentTime; lastWriteTime = currentTime; lastIdleTimeForBoth = currentTime; lastIdleTimeForRead = currentTime; lastIdleTimeForWrite = currentTime; // TODO add documentation closeFuture.addListener(SCHEDULED_COUNTER_RESETTER); // Set a new ID for this session sessionId = idGenerator.incrementAndGet(); } /** * {@inheritDoc} * * We use an AtomicLong to guarantee that the session ID are * unique. */ public final long getId() { return sessionId; } /** * TODO Add method documentation */ public abstract IoProcessor getProcessor(); /** * {@inheritDoc} */ public final boolean isConnected() { return !closeFuture.isClosed(); } /** * {@inheritDoc} */ public final boolean isClosing() { return closing || closeFuture.isClosed(); } /** * {@inheritDoc} */ public final CloseFuture getCloseFuture() { return closeFuture; } /** * TODO Add method documentation */ public final boolean isScheduledForFlush() { return scheduledForFlush.get(); } /** * TODO Add method documentation */ public final boolean setScheduledForFlush(boolean flag) { if (flag) { return scheduledForFlush.compareAndSet(false, true); } else { scheduledForFlush.set(false); return true; } } /** * {@inheritDoc} */ public final CloseFuture close(boolean rightNow) { if (rightNow) { return close(); } else { return closeOnFlush(); } } /** * {@inheritDoc} */ public final CloseFuture close() { synchronized (lock) { if (isClosing()) { return closeFuture; } else { closing = true; } } getFilterChain().fireFilterClose(); return closeFuture; } private final CloseFuture closeOnFlush() { getWriteRequestQueue().offer(this, CLOSE_REQUEST); getProcessor().flush(this); return closeFuture; } /** * {@inheritDoc} */ public final ReadFuture read() { if (!getConfig().isUseReadOperation()) { throw new IllegalStateException("useReadOperation is not enabled."); } Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = readyReadFutures.poll(); if (future != null) { if (future.isClosed()) { // Let other readers get notified. readyReadFutures.offer(future); } } else { future = new DefaultReadFuture(this); getWaitingReadFutures().offer(future); } } return future; } /** * TODO Add method documentation */ public final void offerReadFuture(Object message) { newReadFuture().setRead(message); } /** * TODO Add method documentation */ public final void offerFailedReadFuture(Throwable exception) { newReadFuture().setException(exception); } /** * TODO Add method documentation */ public final void offerClosedReadFuture() { Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); synchronized (readyReadFutures) { newReadFuture().setClosed(); } } /** * TODO Add method documentation */ private ReadFuture newReadFuture() { Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = waitingReadFutures.poll(); if (future == null) { future = new DefaultReadFuture(this); readyReadFutures.offer(future); } } return future; } /** * TODO Add method documentation */ private Queue<ReadFuture> getReadyReadFutures() { Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); if (readyReadFutures == null) { readyReadFutures = new CircularQueue<ReadFuture>(); Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent( READY_READ_FUTURES_KEY, readyReadFutures); if (oldReadyReadFutures != null) { readyReadFutures = oldReadyReadFutures; } } return readyReadFutures; } /** * TODO Add method documentation */ private Queue<ReadFuture> getWaitingReadFutures() { Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY); if (waitingReadyReadFutures == null) { waitingReadyReadFutures = new CircularQueue<ReadFuture>(); Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent( WAITING_READ_FUTURES_KEY, waitingReadyReadFutures); if (oldWaitingReadyReadFutures != null) { waitingReadyReadFutures = oldWaitingReadyReadFutures; } } return waitingReadyReadFutures; } /** * {@inheritDoc} */ public WriteFuture write(Object message) { return write(message, null); } /** * {@inheritDoc} */ public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) { throw new NullPointerException("message"); } // We can't send a message to a connected session if we don't have // the remote address if (!getTransportMetadata().isConnectionless() && remoteAddress != null) { throw new UnsupportedOperationException(); } // If the session has been closed or is closing, we can't either // send a message to the remote side. We generate a future // containing an exception. if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; // TODO: remove this code as soon as we use InputStream // instead of Object for the message. try { if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) { // Nothing to write : probably an error in the user code throw new IllegalArgumentException( "message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) { FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) { File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel(); message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } // Now, we can write the message. First, create a future WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); // TODO : This is not our business ! The caller has created a FileChannel, // he has to close it ! if (openedFileChannel != null) { // If we opened a FileChannel, it needs to be closed when the write has completed final FileChannel finalChannel = openedFileChannel; writeFuture.addListener(new IoFutureListener<WriteFuture>() { public void operationComplete(WriteFuture future) { try { finalChannel.close();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -