⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketioprocessor.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.transport.socket.nio;import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IdleStatus;import org.apache.mina.common.IoFilter.WriteRequest;import org.apache.mina.common.WriteTimeoutException;import org.apache.mina.util.NamePreservingRunnable;/** * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 556539 $, $Date: 2007-07-16 16:48:36 +0900 (월, 16  7월 2007) $, */class SocketIoProcessor {    private final Object lock = new Object();    private final String threadName;    private final Executor executor;    private Selector selector;    private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();    private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();    private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();    private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();    private Worker worker;    private long lastIdleCheckTime = System.currentTimeMillis();    SocketIoProcessor(String threadName, Executor executor) {        this.threadName = threadName;        this.executor = executor;    }    void addNew(SocketSessionImpl session) throws IOException {        newSessions.add(session);        startupWorker();    }    void remove(SocketSessionImpl session) throws IOException {        scheduleRemove(session);        startupWorker();    }    private void startupWorker() throws IOException {        synchronized (lock) {            if (worker == null) {                selector = Selector.open();                worker = new Worker();                executor.execute(new NamePreservingRunnable(worker));            }            selector.wakeup();        }    }    void flush(SocketSessionImpl session) {        scheduleFlush(session);        Selector selector = this.selector;        if (selector != null) {            selector.wakeup();        }    }    void updateTrafficMask(SocketSessionImpl session) {        scheduleTrafficControl(session);        Selector selector = this.selector;        if (selector != null) {            selector.wakeup();        }    }    private void scheduleRemove(SocketSessionImpl session) {        removingSessions.add(session);    }    private void scheduleFlush(SocketSessionImpl session) {        flushingSessions.add(session);    }    private void scheduleTrafficControl(SocketSessionImpl session) {        trafficControllingSessions.add(session);    }    private void doAddNew() {        for (;;) {            SocketSessionImpl session = newSessions.poll();            if (session == null)                break;            SocketChannel ch = session.getChannel();            try {                ch.configureBlocking(false);                session.setSelectionKey(ch.register(selector,                        SelectionKey.OP_READ, session));                // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here                // in AbstractIoFilterChain.fireSessionOpened().                session.getServiceListeners().fireSessionCreated(session);            } catch (IOException e) {                // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute                // and call ConnectFuture.setException().                session.getFilterChain().fireExceptionCaught(session, e);            }        }    }    private void doRemove() {        for (;;) {            SocketSessionImpl session = removingSessions.poll();            if (session == null)                break;            SocketChannel ch = session.getChannel();            SelectionKey key = session.getSelectionKey();            // Retry later if session is not yet fully initialized.            // (In case that Session.close() is called before addSession() is processed)            if (key == null) {                scheduleRemove(session);                break;            }            // skip if channel is already closed            if (!key.isValid()) {                continue;            }            try {                key.cancel();                ch.close();            } catch (IOException e) {                session.getFilterChain().fireExceptionCaught(session, e);            } finally {                releaseWriteBuffers(session);                session.getServiceListeners().fireSessionDestroyed(session);            }        }    }    private void process(Set<SelectionKey> selectedKeys) {        for (SelectionKey key : selectedKeys) {            SocketSessionImpl session = (SocketSessionImpl) key.attachment();            if (key.isReadable() && session.getTrafficMask().isReadable()) {                read(session);            }            if (key.isWritable() && session.getTrafficMask().isWritable()) {                scheduleFlush(session);            }        }        selectedKeys.clear();    }    private void read(SocketSessionImpl session) {        ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());        SocketChannel ch = session.getChannel();        try {            int readBytes = 0;            int ret;            try {                while ((ret = ch.read(buf.buf())) > 0) {                    readBytes += ret;                }            } finally {                buf.flip();            }            session.increaseReadBytes(readBytes);            if (readBytes > 0) {                session.getFilterChain().fireMessageReceived(session, buf);                buf = null;                                if (readBytes * 2 < session.getReadBufferSize()) {                    if (session.getReadBufferSize() > 64) {                        session.setReadBufferSize(session.getReadBufferSize() >>> 1);                    }                } else if (readBytes == session.getReadBufferSize()) {                    session.setReadBufferSize(session.getReadBufferSize() << 1);                }            }            if (ret < 0) {                scheduleRemove(session);            }        } catch (Throwable e) {            if (e instanceof IOException)                scheduleRemove(session);            session.getFilterChain().fireExceptionCaught(session, e);        } finally {            if (buf != null)                buf.release();        }    }    private void notifyIdleness() {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -