⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketconnector.java

📁 apache 的一个socket框架
💻 JAVA
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.transport.socket.nio;import java.io.IOException;import java.net.ConnectException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.common.ConnectFuture;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IoConnector;import org.apache.mina.common.IoConnectorConfig;import org.apache.mina.common.IoHandler;import org.apache.mina.common.IoServiceConfig;import org.apache.mina.common.support.AbstractIoFilterChain;import org.apache.mina.common.support.BaseIoConnector;import org.apache.mina.common.support.DefaultConnectFuture;import org.apache.mina.util.NamePreservingRunnable;import org.apache.mina.util.NewThreadExecutor;/** * {@link IoConnector} for socket transport (TCP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $ */public class SocketConnector extends BaseIoConnector {    private static final AtomicInteger nextId = new AtomicInteger();    private final Object lock = new Object();    private final int id = nextId.getAndIncrement();    private final String threadName = "SocketConnector-" + id;    private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();    private final SocketIoProcessor[] ioProcessors;    private final int processorCount;    private final Executor executor;    private Selector selector;    private Worker worker;    private int processorDistributor = 0;    private int workerTimeout = 60; // 1 min.    /**     * Create a connector with a single processing thread using a NewThreadExecutor     */    public SocketConnector() {        this(1, new NewThreadExecutor());    }    /**     * Create a connector with the desired number of processing threads     *     * @param processorCount Number of processing threads     * @param executor Executor to use for launching threads     */    public SocketConnector(int processorCount, Executor executor) {        if (processorCount < 1) {            throw new IllegalArgumentException(                    "Must have at least one processor");        }        this.executor = executor;        this.processorCount = processorCount;        ioProcessors = new SocketIoProcessor[processorCount];        for (int i = 0; i < processorCount; i++) {            ioProcessors[i] = new SocketIoProcessor(                    "SocketConnectorIoProcessor-" + id + "." + i, executor);        }    }    /**     * How many seconds to keep the connection thread alive between connection requests     *     * @return Number of seconds to keep connection thread alive     */    public int getWorkerTimeout() {        return workerTimeout;    }    /**     * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.     *     * @param workerTimeout Number of seconds to keep thread alive. Must be >=0     */    public void setWorkerTimeout(int workerTimeout) {        if (workerTimeout < 0) {            throw new IllegalArgumentException("Must be >= 0");        }        this.workerTimeout = workerTimeout;    }    public ConnectFuture connect(SocketAddress address, IoHandler handler,            IoServiceConfig config) {        return connect(address, null, handler, config);    }    public ConnectFuture connect(SocketAddress address,            SocketAddress localAddress, IoHandler handler,            IoServiceConfig config) {        if (address == null)            throw new NullPointerException("address");        if (handler == null)            throw new NullPointerException("handler");        if (!(address instanceof InetSocketAddress))            throw new IllegalArgumentException("Unexpected address type: "                    + address.getClass());        if (localAddress != null                && !(localAddress instanceof InetSocketAddress))            throw new IllegalArgumentException(                    "Unexpected local address type: " + localAddress.getClass());        if (config == null) {            config = getDefaultConfig();        }        SocketChannel ch = null;        boolean success = false;        try {            ch = SocketChannel.open();            ch.socket().setReuseAddress(true);            if (localAddress != null) {                ch.socket().bind(localAddress);            }            ch.configureBlocking(false);            if (ch.connect(address)) {                DefaultConnectFuture future = new DefaultConnectFuture();                newSession(ch, handler, config, future);                success = true;                return future;            }            success = true;        } catch (IOException e) {            return DefaultConnectFuture.newFailedFuture(e);        } finally {            if (!success && ch != null) {                try {                    ch.close();                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                }            }        }        ConnectionRequest request = new ConnectionRequest(ch, handler, config);        synchronized (lock) {            try {                startupWorker();            } catch (IOException e) {                try {                    ch.close();                } catch (IOException e2) {                    ExceptionMonitor.getInstance().exceptionCaught(e2);                }                return DefaultConnectFuture.newFailedFuture(e);            }        }        connectQueue.add(request);        selector.wakeup();        return request;    }    public SocketConnectorConfig getDefaultConfig() {        return defaultConfig;    }    /**     * Sets the config this connector will use by default.     *     * @param defaultConfig the default config.     * @throws NullPointerException if the specified value is <code>null</code>.     */    public void setDefaultConfig(SocketConnectorConfig defaultConfig) {        if (defaultConfig == null) {            throw new NullPointerException("defaultConfig");        }        this.defaultConfig = defaultConfig;    }    private synchronized void startupWorker() throws IOException {        if (worker == null) {            selector = Selector.open();            worker = new Worker();            executor.execute(new NamePreservingRunnable(worker));        }    }    private void registerNew() {        if (connectQueue.isEmpty())            return;        for (;;) {            ConnectionRequest req = connectQueue.poll();            if (req == null)                break;            SocketChannel ch = req.channel;            try {                ch.register(selector, SelectionKey.OP_CONNECT, req);            } catch (IOException e) {                req.setException(e);            }        }    }    private void processSessions(Set<SelectionKey> keys) {        for (SelectionKey key : keys) {            if (!key.isConnectable())                continue;            SocketChannel ch = (SocketChannel) key.channel();            ConnectionRequest entry = (ConnectionRequest) key.attachment();            boolean success = false;            try {                ch.finishConnect();                newSession(ch, entry.handler, entry.config, entry);                success = true;            } catch (Throwable e) {                entry.setException(e);            } finally {                key.cancel();                if (!success) {                    try {                        ch.close();                    } catch (IOException e) {                        ExceptionMonitor.getInstance().exceptionCaught(e);                    }                }            }        }        keys.clear();    }    private void processTimedOutSessions(Set<SelectionKey> keys) {        long currentTime = System.currentTimeMillis();        for (SelectionKey key : keys) {            if (!key.isValid())                continue;            ConnectionRequest entry = (ConnectionRequest) key.attachment();            if (currentTime >= entry.deadline) {                entry.setException(new ConnectException());                try {                    key.channel().close();                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                } finally {                    key.cancel();                }            }        }    }    private void newSession(SocketChannel ch, IoHandler handler,            IoServiceConfig config, ConnectFuture connectFuture)            throws IOException {        SocketSessionImpl session = new SocketSessionImpl(this,                nextProcessor(), getListeners(), config, ch, handler, ch                        .socket().getRemoteSocketAddress());        try {            getFilterChainBuilder().buildFilterChain(session.getFilterChain());            config.getFilterChainBuilder().buildFilterChain(                    session.getFilterChain());            config.getThreadModel().buildFilterChain(session.getFilterChain());        } catch (Throwable e) {            throw (IOException) new IOException("Failed to create a session.")                    .initCause(e);        }        // Set the ConnectFuture of the specified session, which will be        // removed and notified by AbstractIoFilterChain eventually.        session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,                connectFuture);        // Forward the remaining process to the SocketIoProcessor.        session.getIoProcessor().addNew(session);    }    private SocketIoProcessor nextProcessor() {        if (this.processorDistributor == Integer.MAX_VALUE) {            this.processorDistributor = Integer.MAX_VALUE % this.processorCount;        }        return ioProcessors[processorDistributor++ % processorCount];    }    private class Worker implements Runnable {        private long lastActive = System.currentTimeMillis();        public void run() {            Thread.currentThread().setName(SocketConnector.this.threadName);            for (;;) {                try {                    int nKeys = selector.select(1000);                    registerNew();                    if (nKeys > 0) {                        processSessions(selector.selectedKeys());                    }                    processTimedOutSessions(selector.keys());                    if (selector.keys().isEmpty()) {                        if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) {                            synchronized (lock) {                                if (selector.keys().isEmpty()                                        && connectQueue.isEmpty()) {                                    worker = null;                                    try {                                        selector.close();                                    } catch (IOException e) {                                        ExceptionMonitor.getInstance()                                                .exceptionCaught(e);                                    } finally {                                        selector = null;                                    }                                    break;                                }                            }                        }                    } else {                        lastActive = System.currentTimeMillis();                    }                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e1) {                        ExceptionMonitor.getInstance().exceptionCaught(e1);                    }                }            }        }    }    private class ConnectionRequest extends DefaultConnectFuture {        private final SocketChannel channel;        private final long deadline;        private final IoHandler handler;        private final IoServiceConfig config;        private ConnectionRequest(SocketChannel channel, IoHandler handler,                IoServiceConfig config) {            this.channel = channel;            long timeout;            if (config instanceof IoConnectorConfig) {                timeout = ((IoConnectorConfig) config)                        .getConnectTimeoutMillis();            } else {                timeout = ((IoConnectorConfig) getDefaultConfig())                        .getConnectTimeoutMillis();            }            this.deadline = System.currentTimeMillis() + timeout;            this.handler = handler;            this.config = config;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -