⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datagramconnectordelegate.java

📁 apache 的一个socket框架
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* *  Licensed to the Apache Software Foundation (ASF) under one *  or more contributor license agreements.  See the NOTICE file *  distributed with this work for additional information *  regarding copyright ownership.  The ASF licenses this file *  to you under the Apache License, Version 2.0 (the *  "License"); you may not use this file except in compliance *  with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * *  Unless required by applicable law or agreed to in writing, *  software distributed under the License is distributed on an *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *  KIND, either express or implied.  See the License for the *  specific language governing permissions and limitations *  under the License. * */package org.apache.mina.transport.socket.nio.support;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.channels.DatagramChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.Iterator;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.ConnectFuture;import org.apache.mina.common.ExceptionMonitor;import org.apache.mina.common.IoConnector;import org.apache.mina.common.IoHandler;import org.apache.mina.common.IoServiceConfig;import org.apache.mina.common.IoSession;import org.apache.mina.common.IoSessionRecycler;import org.apache.mina.common.IoFilter.WriteRequest;import org.apache.mina.common.support.AbstractIoFilterChain;import org.apache.mina.common.support.BaseIoConnector;import org.apache.mina.common.support.DefaultConnectFuture;import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;import org.apache.mina.transport.socket.nio.DatagramServiceConfig;import org.apache.mina.transport.socket.nio.DatagramSessionConfig;import org.apache.mina.util.NamePreservingRunnable;/** * {@link IoConnector} for datagram transport (UDP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13  7월 2007) $ */public class DatagramConnectorDelegate extends BaseIoConnector implements        DatagramService {    private static final AtomicInteger nextId = new AtomicInteger();    private final Object lock = new Object();    private final IoConnector wrapper;    private final Executor executor;    private final int id = nextId.getAndIncrement();    private Selector selector;    private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();    private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();    private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();    private Worker worker;    /**     * Creates a new instance.     */    public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {        this.wrapper = wrapper;        this.executor = executor;    }    public ConnectFuture connect(SocketAddress address, IoHandler handler,            IoServiceConfig config) {        return connect(address, null, handler, config);    }    public ConnectFuture connect(SocketAddress address,            SocketAddress localAddress, IoHandler handler,            IoServiceConfig config) {        if (address == null)            throw new NullPointerException("address");        if (handler == null)            throw new NullPointerException("handler");        if (!(address instanceof InetSocketAddress))            throw new IllegalArgumentException("Unexpected address type: "                    + address.getClass());        if (localAddress != null                && !(localAddress instanceof InetSocketAddress)) {            throw new IllegalArgumentException(                    "Unexpected local address type: " + localAddress.getClass());        }        if (config == null) {            config = getDefaultConfig();        }        DatagramChannel ch = null;        boolean initialized = false;        try {            ch = DatagramChannel.open();            DatagramSessionConfig cfg;            if (config.getSessionConfig() instanceof DatagramSessionConfig) {                cfg = (DatagramSessionConfig) config.getSessionConfig();            } else {                cfg = getDefaultConfig().getSessionConfig();            }            ch.socket().setReuseAddress(cfg.isReuseAddress());            ch.socket().setBroadcast(cfg.isBroadcast());            ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());            ch.socket().setSendBufferSize(cfg.getSendBufferSize());            if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {                ch.socket().setTrafficClass(cfg.getTrafficClass());            }            if (localAddress != null) {                ch.socket().bind(localAddress);            }            ch.connect(address);            ch.configureBlocking(false);            initialized = true;        } catch (IOException e) {            return DefaultConnectFuture.newFailedFuture(e);        } finally {            if (!initialized && ch != null) {                try {                    ch.disconnect();                    ch.close();                } catch (IOException e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                }            }        }        RegistrationRequest request = new RegistrationRequest(ch, handler,                config);        try {            startupWorker();        } catch (IOException e) {            try {                ch.disconnect();                ch.close();            } catch (IOException e2) {                ExceptionMonitor.getInstance().exceptionCaught(e2);            }            return DefaultConnectFuture.newFailedFuture(e);        }        registerQueue.add(request);        selector.wakeup();        return request;    }    public DatagramConnectorConfig getDefaultConfig() {        return defaultConfig;    }    /**     * Sets the config this connector will use by default.     *     * @param defaultConfig the default config.     * @throws NullPointerException if the specified value is <code>null</code>.     */    public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {        if (defaultConfig == null) {            throw new NullPointerException("defaultConfig");        }        this.defaultConfig = defaultConfig;    }    private void startupWorker() throws IOException {        synchronized (lock) {            if (worker == null) {                selector = Selector.open();                worker = new Worker();                executor.execute(new NamePreservingRunnable(worker));            }        }    }    public void closeSession(DatagramSessionImpl session) {        try {            startupWorker();        } catch (IOException e) {            // IOException is thrown only when Worker thread is not            // running and failed to open a selector.  We simply return            // silently here because it we can simply conclude that            // this session is not managed by this connector or            // already closed.            return;        }        cancelQueue.add(session);        selector.wakeup();    }    public void flushSession(DatagramSessionImpl session) {        scheduleFlush(session);        Selector selector = this.selector;        if (selector != null) {            selector.wakeup();        }    }    private void scheduleFlush(DatagramSessionImpl session) {        flushingSessions.add(session);    }    public void updateTrafficMask(DatagramSessionImpl session) {        scheduleTrafficControl(session);        Selector selector = this.selector;        if (selector != null) {            selector.wakeup();        }    }    private void scheduleTrafficControl(DatagramSessionImpl session) {        trafficControllingSessions.add(session);    }    private void doUpdateTrafficMask() {        if (trafficControllingSessions.isEmpty())            return;        for (;;) {            DatagramSessionImpl session = trafficControllingSessions.poll();            if (session == null)                break;            SelectionKey key = session.getSelectionKey();            // Retry later if session is not yet fully initialized.            // (In case that Session.suspend??() or session.resume??() is            // called before addSession() is processed)            if (key == null) {                scheduleTrafficControl(session);                break;            }            // skip if channel is already closed            if (!key.isValid()) {                continue;            }            // The normal is OP_READ and, if there are write requests in the            // session's write queue, set OP_WRITE to trigger flushing.            int ops = SelectionKey.OP_READ;            if (!session.getWriteRequestQueue().isEmpty()) {                ops |= SelectionKey.OP_WRITE;            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -