⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datagramacceptordelegate.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.transport.socket.nio.support;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.channels.DatagramChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IoAcceptor;import org.apache.mina.common.IoFilter.WriteRequest;import org.apache.mina.common.IoHandler;import org.apache.mina.common.IoServiceConfig;import org.apache.mina.common.IoSession;import org.apache.mina.common.IoSessionRecycler;import org.apache.mina.common.RuntimeIOException;import org.apache.mina.common.support.BaseIoAcceptor;import org.apache.mina.common.support.IoServiceListenerSupport;import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;import org.apache.mina.transport.socket.nio.DatagramServiceConfig;import org.apache.mina.transport.socket.nio.DatagramSessionConfig;import org.apache.mina.util.NamePreservingRunnable;/** * {@link IoAcceptor} for datagram transport (UDP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13  7월 2007) $ */public class DatagramAcceptorDelegate extends BaseIoAcceptor implements        IoAcceptor, DatagramService {    private static final AtomicInteger nextId = new AtomicInteger();    private final Object lock = new Object();    private final IoAcceptor wrapper;    private final Executor executor;    private final int id = nextId.getAndIncrement();    private Selector selector;    private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();    private final Map<SocketAddress, DatagramChannel> channels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();    private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();    private Worker worker;    /**     * Creates a new instance.     */    public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) {        this.wrapper = wrapper;        this.executor = executor;        // The default reuseAddress of an accepted socket should be 'true'.        defaultConfig.getSessionConfig().setReuseAddress(true);    }    public void bind(SocketAddress address, IoHandler handler,            IoServiceConfig config) throws IOException {        if (handler == null)            throw new NullPointerException("handler");        if (config == null) {            config = getDefaultConfig();        }        if (address != null && !(address instanceof InetSocketAddress))            throw new IllegalArgumentException("Unexpected address type: "                    + address.getClass());        RegistrationRequest request = new RegistrationRequest(address, handler,                config);        registerQueue.add(request);        startupWorker();        selector.wakeup();        synchronized (request) {            while (!request.done) {                try {                    request.wait();                } catch (InterruptedException e) {                    throw new RuntimeIOException(e);                }            }        }        if (request.exception != null) {            throw (IOException) new IOException("Failed to bind")                    .initCause(request.exception);        }    }    public void unbind(SocketAddress address) {        if (address == null)            throw new NullPointerException("address");        CancellationRequest request = new CancellationRequest(address);        try {            startupWorker();        } catch (IOException e) {            // IOException is thrown only when Worker thread is not            // running and failed to open a selector.  We simply throw            // IllegalArgumentException here because we can simply            // conclude that nothing is bound to the selector.            throw new IllegalArgumentException("Address not bound: " + address);        }        cancelQueue.add(request);        selector.wakeup();        synchronized (request) {            while (!request.done) {                try {                    request.wait();                } catch (InterruptedException e) {                    throw new RuntimeIOException(e);                }            }        }        if (request.exception != null) {            throw new RuntimeException("Failed to unbind", request.exception);        }    }    public void unbindAll() {        List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels                .keySet());        for (SocketAddress address : addresses) {            unbind(address);        }    }    @Override    public IoSession newSession(SocketAddress remoteAddress,            SocketAddress localAddress) {        if (remoteAddress == null) {            throw new NullPointerException("remoteAddress");        }        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        Selector selector = this.selector;        DatagramChannel ch = channels.get(localAddress);        if (selector == null || ch == null) {            throw new IllegalArgumentException("Unknown localAddress: "                    + localAddress);        }        SelectionKey key = ch.keyFor(selector);        if (key == null) {            throw new IllegalArgumentException("Unknown localAddress: "                    + localAddress);        }        RegistrationRequest req = (RegistrationRequest) key.attachment();        IoSession session;        IoSessionRecycler sessionRecycler = getSessionRecycler(req);        synchronized (sessionRecycler) {            session = sessionRecycler.recycle(localAddress, remoteAddress);            if (session != null) {                return session;            }            // If a new session needs to be created.            // Note that the local address is the service address in the            // acceptor side, and I didn't call getLocalSocketAddress().            // This avoids strange cases where getLocalSocketAddress() on the            // underlying socket would return an IPv6 address while the            // specified service address is an IPv4 address.            DatagramSessionImpl datagramSession = new DatagramSessionImpl(                    wrapper, this, req.config, ch, req.handler, req.address,                    req.address);            datagramSession.setRemoteAddress(remoteAddress);            datagramSession.setSelectionKey(key);            getSessionRecycler(req).put(datagramSession);            session = datagramSession;        }        try {            buildFilterChain(req, session);            getListeners().fireSessionCreated(session);        } catch (Throwable t) {            ExceptionMonitor.getInstance().exceptionCaught(t);        }        return session;    }    private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {        IoSessionRecycler sessionRecycler;        if (req.config instanceof DatagramServiceConfig) {            sessionRecycler = ((DatagramServiceConfig) req.config)                    .getSessionRecycler();        } else {            sessionRecycler = defaultConfig.getSessionRecycler();        }        return sessionRecycler;    }    @Override    public IoServiceListenerSupport getListeners() {        return super.getListeners();    }    private void buildFilterChain(RegistrationRequest req, IoSession session)            throws Exception {        this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());        req.config.getFilterChainBuilder().buildFilterChain(                session.getFilterChain());        req.config.getThreadModel().buildFilterChain(session.getFilterChain());    }    public DatagramAcceptorConfig getDefaultConfig() {        return defaultConfig;    }    /**     * Sets the config this acceptor will use by default.     *     * @param defaultConfig the default config.     * @throws NullPointerException if the specified value is <code>null</code>.     */    public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {        if (defaultConfig == null) {            throw new NullPointerException("defaultConfig");        }        this.defaultConfig = defaultConfig;    }    private void startupWorker() throws IOException {        synchronized (lock) {            if (worker == null) {                selector = Selector.open();                worker = new Worker();                executor.execute(new NamePreservingRunnable(worker));            }        }    }    public void flushSession(DatagramSessionImpl session) {        scheduleFlush(session);        Selector selector = this.selector;        if (selector != null) {            selector.wakeup();        }    }    public void closeSession(DatagramSessionImpl session) {    }    private void scheduleFlush(DatagramSessionImpl session) {        flushingSessions.add(session);    }    private class Worker implements Runnable {        public void run() {            Thread.currentThread().setName("DatagramAcceptor-" + id);            for (;;) {                try {                    int nKeys = selector.select();                    registerNew();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -