📄 socketioprocessor.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.transport.socket.nio;import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IdleStatus;import org.apache.mina.common.IoFilter.WriteRequest;import org.apache.mina.common.WriteTimeoutException;import org.apache.mina.util.NamePreservingRunnable;/** * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 556539 $, $Date: 2007-07-16 16:48:36 +0900 (월, 16 7월 2007) $, */class SocketIoProcessor { private final Object lock = new Object(); private final String threadName; private final Executor executor; private Selector selector; private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>(); private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>(); private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>(); private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>(); private Worker worker; private long lastIdleCheckTime = System.currentTimeMillis(); SocketIoProcessor(String threadName, Executor executor) { this.threadName = threadName; this.executor = executor; } void addNew(SocketSessionImpl session) throws IOException { newSessions.add(session); startupWorker(); } void remove(SocketSessionImpl session) throws IOException { scheduleRemove(session); startupWorker(); } private void startupWorker() throws IOException { synchronized (lock) { if (worker == null) { selector = Selector.open(); worker = new Worker(); executor.execute(new NamePreservingRunnable(worker)); } selector.wakeup(); } } void flush(SocketSessionImpl session) { scheduleFlush(session); Selector selector = this.selector; if (selector != null) { selector.wakeup(); } } void updateTrafficMask(SocketSessionImpl session) { scheduleTrafficControl(session); Selector selector = this.selector; if (selector != null) { selector.wakeup(); } } private void scheduleRemove(SocketSessionImpl session) { removingSessions.add(session); } private void scheduleFlush(SocketSessionImpl session) { flushingSessions.add(session); } private void scheduleTrafficControl(SocketSessionImpl session) { trafficControllingSessions.add(session); } private void doAddNew() { for (;;) { SocketSessionImpl session = newSessions.poll(); if (session == null) break; SocketChannel ch = session.getChannel(); try { ch.configureBlocking(false); session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). session.getServiceListeners().fireSessionCreated(session); } catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). session.getFilterChain().fireExceptionCaught(session, e); } } } private void doRemove() { for (;;) { SocketSessionImpl session = removingSessions.poll(); if (session == null) break; SocketChannel ch = session.getChannel(); SelectionKey key = session.getSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.close() is called before addSession() is processed) if (key == null) { scheduleRemove(session); break; } // skip if channel is already closed if (!key.isValid()) { continue; } try { key.cancel(); ch.close(); } catch (IOException e) { session.getFilterChain().fireExceptionCaught(session, e); } finally { releaseWriteBuffers(session); session.getServiceListeners().fireSessionDestroyed(session); } } } private void process(Set<SelectionKey> selectedKeys) { for (SelectionKey key : selectedKeys) { SocketSessionImpl session = (SocketSessionImpl) key.attachment(); if (key.isReadable() && session.getTrafficMask().isReadable()) { read(session); } if (key.isWritable() && session.getTrafficMask().isWritable()) { scheduleFlush(session); } } selectedKeys.clear(); } private void read(SocketSessionImpl session) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); try { int readBytes = 0; int ret; try { while ((ret = ch.read(buf.buf())) > 0) { readBytes += ret; } } finally { buf.flip(); } session.increaseReadBytes(readBytes); if (readBytes > 0) { session.getFilterChain().fireMessageReceived(session, buf); buf = null; if (readBytes * 2 < session.getReadBufferSize()) { if (session.getReadBufferSize() > 64) { session.setReadBufferSize(session.getReadBufferSize() >>> 1); } } else if (readBytes == session.getReadBufferSize()) { session.setReadBufferSize(session.getReadBufferSize() << 1); } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) scheduleRemove(session); session.getFilterChain().fireExceptionCaught(session, e); } finally { if (buf != null) buf.release(); } } private void notifyIdleness() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -