📄 datagramacceptordelegate.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.transport.socket.nio.support;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.channels.DatagramChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IoAcceptor;import org.apache.mina.common.IoFilter.WriteRequest;import org.apache.mina.common.IoHandler;import org.apache.mina.common.IoServiceConfig;import org.apache.mina.common.IoSession;import org.apache.mina.common.IoSessionRecycler;import org.apache.mina.common.RuntimeIOException;import org.apache.mina.common.support.BaseIoAcceptor;import org.apache.mina.common.support.IoServiceListenerSupport;import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;import org.apache.mina.transport.socket.nio.DatagramServiceConfig;import org.apache.mina.transport.socket.nio.DatagramSessionConfig;import org.apache.mina.util.NamePreservingRunnable;/** * {@link IoAcceptor} for datagram transport (UDP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13 7월 2007) $ */public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramService { private static final AtomicInteger nextId = new AtomicInteger(); private final Object lock = new Object(); private final IoAcceptor wrapper; private final Executor executor; private final int id = nextId.getAndIncrement(); private Selector selector; private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig(); private final Map<SocketAddress, DatagramChannel> channels = new ConcurrentHashMap<SocketAddress, DatagramChannel>(); private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>(); private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>(); private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>(); private Worker worker; /** * Creates a new instance. */ public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) { this.wrapper = wrapper; this.executor = executor; // The default reuseAddress of an accepted socket should be 'true'. defaultConfig.getSessionConfig().setReuseAddress(true); } public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException { if (handler == null) throw new NullPointerException("handler"); if (config == null) { config = getDefaultConfig(); } if (address != null && !(address instanceof InetSocketAddress)) throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); RegistrationRequest request = new RegistrationRequest(address, handler, config); registerQueue.add(request); startupWorker(); selector.wakeup(); synchronized (request) { while (!request.done) { try { request.wait(); } catch (InterruptedException e) { throw new RuntimeIOException(e); } } } if (request.exception != null) { throw (IOException) new IOException("Failed to bind") .initCause(request.exception); } } public void unbind(SocketAddress address) { if (address == null) throw new NullPointerException("address"); CancellationRequest request = new CancellationRequest(address); try { startupWorker(); } catch (IOException e) { // IOException is thrown only when Worker thread is not // running and failed to open a selector. We simply throw // IllegalArgumentException here because we can simply // conclude that nothing is bound to the selector. throw new IllegalArgumentException("Address not bound: " + address); } cancelQueue.add(request); selector.wakeup(); synchronized (request) { while (!request.done) { try { request.wait(); } catch (InterruptedException e) { throw new RuntimeIOException(e); } } } if (request.exception != null) { throw new RuntimeException("Failed to unbind", request.exception); } } public void unbindAll() { List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels .keySet()); for (SocketAddress address : addresses) { unbind(address); } } @Override public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (localAddress == null) { throw new NullPointerException("localAddress"); } Selector selector = this.selector; DatagramChannel ch = channels.get(localAddress); if (selector == null || ch == null) { throw new IllegalArgumentException("Unknown localAddress: " + localAddress); } SelectionKey key = ch.keyFor(selector); if (key == null) { throw new IllegalArgumentException("Unknown localAddress: " + localAddress); } RegistrationRequest req = (RegistrationRequest) key.attachment(); IoSession session; IoSessionRecycler sessionRecycler = getSessionRecycler(req); synchronized (sessionRecycler) { session = sessionRecycler.recycle(localAddress, remoteAddress); if (session != null) { return session; } // If a new session needs to be created. // Note that the local address is the service address in the // acceptor side, and I didn't call getLocalSocketAddress(). // This avoids strange cases where getLocalSocketAddress() on the // underlying socket would return an IPv6 address while the // specified service address is an IPv4 address. DatagramSessionImpl datagramSession = new DatagramSessionImpl( wrapper, this, req.config, ch, req.handler, req.address, req.address); datagramSession.setRemoteAddress(remoteAddress); datagramSession.setSelectionKey(key); getSessionRecycler(req).put(datagramSession); session = datagramSession; } try { buildFilterChain(req, session); getListeners().fireSessionCreated(session); } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } return session; } private IoSessionRecycler getSessionRecycler(RegistrationRequest req) { IoSessionRecycler sessionRecycler; if (req.config instanceof DatagramServiceConfig) { sessionRecycler = ((DatagramServiceConfig) req.config) .getSessionRecycler(); } else { sessionRecycler = defaultConfig.getSessionRecycler(); } return sessionRecycler; } @Override public IoServiceListenerSupport getListeners() { return super.getListeners(); } private void buildFilterChain(RegistrationRequest req, IoSession session) throws Exception { this.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain()); req.config.getThreadModel().buildFilterChain(session.getFilterChain()); } public DatagramAcceptorConfig getDefaultConfig() { return defaultConfig; } /** * Sets the config this acceptor will use by default. * * @param defaultConfig the default config. * @throws NullPointerException if the specified value is <code>null</code>. */ public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) { if (defaultConfig == null) { throw new NullPointerException("defaultConfig"); } this.defaultConfig = defaultConfig; } private void startupWorker() throws IOException { synchronized (lock) { if (worker == null) { selector = Selector.open(); worker = new Worker(); executor.execute(new NamePreservingRunnable(worker)); } } } public void flushSession(DatagramSessionImpl session) { scheduleFlush(session); Selector selector = this.selector; if (selector != null) { selector.wakeup(); } } public void closeSession(DatagramSessionImpl session) { } private void scheduleFlush(DatagramSessionImpl session) { flushingSessions.add(session); } private class Worker implements Runnable { public void run() { Thread.currentThread().setName("DatagramAcceptor-" + id); for (;;) { try { int nKeys = selector.select(); registerNew();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -