📄 abstractpollingconnectionlessioacceptor.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */package org.apache.mina.core.polling;import java.net.SocketAddress;import java.util.Collections;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import org.apache.mina.core.RuntimeIoException;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.future.IoFuture;import org.apache.mina.core.service.AbstractIoAcceptor;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoProcessor;import org.apache.mina.core.session.AbstractIoSession;import org.apache.mina.core.session.ExpiringSessionRecycler;import org.apache.mina.core.session.IoSession;import org.apache.mina.core.session.IoSessionConfig;import org.apache.mina.core.session.IoSessionRecycler;import org.apache.mina.core.write.WriteRequest;import org.apache.mina.core.write.WriteRequestQueue;import org.apache.mina.util.ExceptionMonitor;/** * TODO Add documentation * {@link IoAcceptor} for datagram transport (UDP/IP). * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 751504 $, $Date: 2009-03-08 20:24:58 +0100 (Sun, 08 Mar 2009) $ * @org.apache.xbean.XBean */public abstract class AbstractPollingConnectionlessIoAcceptor<T extends AbstractIoSession, H> extends AbstractIoAcceptor { private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler(); private final Object lock = new Object(); private final IoProcessor<T> processor = new ConnectionlessAcceptorProcessor(); private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>(); private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER; private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); private volatile boolean selectable; /** The thread responsible of accepting incoming requests */ private Acceptor acceptor; private long lastIdleCheckTime; /** * Creates a new instance. */ protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) { this(sessionConfig, null); } /** * Creates a new instance. */ protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) { super(sessionConfig, executor); try { init(); selectable = true; } catch (RuntimeException e){ throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to initialize.", e); } finally { if (!selectable) { try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } } protected abstract void init() throws Exception; protected abstract void destroy() throws Exception; protected abstract int select() throws Exception; protected abstract int select(int timeout) throws Exception; protected abstract void wakeup(); protected abstract Iterator<H> selectedHandles(); protected abstract H open(SocketAddress localAddress) throws Exception; protected abstract void close(H handle) throws Exception; protected abstract SocketAddress localAddress(H handle) throws Exception; protected abstract boolean isReadable(H handle); protected abstract boolean isWritable(H handle); protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception; protected abstract int send(T session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception; protected abstract T newSession(IoProcessor<T> processor, H handle, SocketAddress remoteAddress) throws Exception; protected abstract void setInterestedInWrite(T session, boolean interested) throws Exception; /** * {@inheritDoc} */ @Override protected IoFuture dispose0() throws Exception { unbind(); if (!disposalFuture.isDone()) { startupAcceptor(); wakeup(); } return disposalFuture; } /** * {@inheritDoc} */ @Override protected final Set<SocketAddress> bindInternal( List<? extends SocketAddress> localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); // adds the Registration request to the queue for the Workers // to handle registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. wakeup(); // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>(); for (H handle:boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; } /** * {@inheritDoc} */ @Override protected final void unbind0( List<? extends SocketAddress> localAddresses) throws Exception { AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); cancelQueue.add(request); startupAcceptor(); wakeup(); request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } } /** * {@inheritDoc} */ public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { if (isDisposing()) { throw new IllegalStateException("Already disposed."); } if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } synchronized (bindLock) { if (!isActive()) { throw new IllegalStateException( "Can't create a session from a unbound service."); } try { return newSessionWithoutLock(remoteAddress, localAddress); } catch (RuntimeException e) { throw e; } catch (Error e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to create a session.", e); } } } private IoSession newSessionWithoutLock( SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { H handle = boundHandles.get(localAddress); if (handle == null) { throw new IllegalArgumentException("Unknown local address: " + localAddress); } IoSession session; IoSessionRecycler sessionRecycler = getSessionRecycler(); synchronized (sessionRecycler) { session = sessionRecycler.recycle(localAddress, remoteAddress); if (session != null) { return session; } // If a new session needs to be created. T newSession = newSession(processor, handle, remoteAddress); getSessionRecycler().put(newSession); session = newSession; } initSession(session, null, null); try { this.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); getListeners().fireSessionCreated(session); } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } return session; } public final IoSessionRecycler getSessionRecycler() { return sessionRecycler; } public final void setSessionRecycler(IoSessionRecycler sessionRecycler) { synchronized (bindLock) { if (isActive()) { throw new IllegalStateException( "sessionRecycler can't be set while the acceptor is bound."); } if (sessionRecycler == null) { sessionRecycler = DEFAULT_RECYCLER; } this.sessionRecycler = sessionRecycler; } } private class ConnectionlessAcceptorProcessor implements IoProcessor<T> { public void add(T session) { } public void flush(T session) { if (scheduleFlush(session)) { wakeup(); } } public void remove(T session) { getSessionRecycler().remove(session); getListeners().fireSessionDestroyed(session); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -