📄 socketacceptor.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.transport.socket.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IoAcceptor;import org.apache.mina.common.IoHandler;import org.apache.mina.common.IoServiceConfig;import org.apache.mina.common.support.BaseIoAcceptor;import org.apache.mina.util.NamePreservingRunnable;import org.apache.mina.util.NewThreadExecutor;/** * {@link IoAcceptor} for socket transport (TCP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $ */public class SocketAcceptor extends BaseIoAcceptor { private static final AtomicInteger nextId = new AtomicInteger(); private final Executor executor; private final Object lock = new Object(); private final int id = nextId.getAndIncrement(); private final String threadName = "SocketAcceptor-" + id; private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig(); private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>(); private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>(); private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>(); private final SocketIoProcessor[] ioProcessors; private final int processorCount; private Selector selector; private Worker worker; private int processorDistributor = 0; /** * Create an acceptor with a single processing thread using a NewThreadExecutor */ public SocketAcceptor() { this(1, new NewThreadExecutor()); } /** * Create an acceptor with the desired number of processing threads * * @param processorCount Number of processing threads * @param executor Executor to use for launching threads */ public SocketAcceptor(int processorCount, Executor executor) { if (processorCount < 1) { throw new IllegalArgumentException( "Must have at least one processor"); } // The default reuseAddress of an accepted socket should be 'true'. defaultConfig.getSessionConfig().setReuseAddress(true); this.executor = executor; this.processorCount = processorCount; ioProcessors = new SocketIoProcessor[processorCount]; for (int i = 0; i < processorCount; i++) { ioProcessors[i] = new SocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor); } } /** * Binds to the specified <code>address</code> and handles incoming connections with the specified * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. * * @throws IOException if failed to bind */ public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException { if (handler == null) { throw new NullPointerException("handler"); } if (address != null && !(address instanceof InetSocketAddress)) { throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); } if (config == null) { config = getDefaultConfig(); } RegistrationRequest request = new RegistrationRequest(address, handler, config); registerQueue.add(request); startupWorker(); selector.wakeup(); try { request.done.await(); } catch (InterruptedException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } if (request.exception != null) { throw request.exception; } } private synchronized void startupWorker() throws IOException { synchronized (lock) { if (worker == null) { selector = Selector.open(); worker = new Worker(); executor.execute(new NamePreservingRunnable(worker)); } } } public void unbind(SocketAddress address) { if (address == null) { throw new NullPointerException("address"); } CancellationRequest request = new CancellationRequest(address); try { startupWorker(); } catch (IOException e) { // IOException is thrown only when Worker thread is not // running and failed to open a selector. We simply throw // IllegalArgumentException here because we can simply // conclude that nothing is bound to the selector. throw new IllegalArgumentException("Address not bound: " + address); } cancelQueue.add(request); selector.wakeup(); try { request.done.await(); } catch (InterruptedException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } if (request.exception != null) { request.exception.fillInStackTrace(); throw request.exception; } } public void unbindAll() { List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels .keySet()); for (SocketAddress address : addresses) { unbind(address); } } private class Worker implements Runnable { public void run() { Thread.currentThread().setName(SocketAcceptor.this.threadName); for (;;) { try { int nKeys = selector.select(); registerNew(); if (nKeys > 0) { processSessions(selector.selectedKeys()); } cancelKeys(); if (selector.keys().isEmpty()) { synchronized (lock) { if (selector.keys().isEmpty() && registerQueue.isEmpty() && cancelQueue.isEmpty()) { worker = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -