⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nioreceiver.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.catalina.tribes.transport.nio;

import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.RxTaskPool;
import org.apache.catalina.tribes.transport.AbstractRxTask;
import org.apache.catalina.tribes.util.StringManager;
import java.util.LinkedList;
import java.util.Set;
import java.nio.channels.CancelledKeyException;

/**
 * @author Filip Hanik
 * @version $Revision: 487423 $ $Date: 2006-12-15 02:58:56 +0100 (ven., 15 déc. 2006) $
 */
public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {

    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(NioReceiver.class);

    /**
     * The string manager for this package.
     */
    protected StringManager sm = StringManager.getManager(Constants.Package);

    /**
     * The descriptive information about this implementation.
     */
    private static final String info = "NioReceiver/1.0";

    private Selector selector = null;
    private ServerSocketChannel serverChannel = null;

    protected LinkedList events = new LinkedList();
//    private Object interestOpsMutex = new Object();

    public NioReceiver() {
    }

    /**
     * Return descriptive information about this implementation and the
     * corresponding version number, in the format
     * <code>&lt;description&gt;/&lt;version&gt;</code>.
     */
    public String getInfo() {
        return (info);
    }

//    public Object getInterestOpsMutex() {
//        return interestOpsMutex;
//    }

    public void stop() {
        this.stopListening();
        super.stop();
    }

    /**
     * start cluster receiver
     * @throws Exception
     * @see org.apache.catalina.tribes.ClusterReceiver#start()
     */
    public void start() throws IOException {
        super.start();
        try {
            setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
        } catch (Exception x) {
            log.fatal("ThreadPool can initilzed. Listener not started", x);
            if ( x instanceof IOException ) throw (IOException)x;
            else throw new IOException(x.getMessage());
        }
        try {
            getBind();
            bind();
            Thread t = new Thread(this, "NioReceiver");
            t.setDaemon(true);
            t.start();
        } catch (Exception x) {
            log.fatal("Unable to start cluster receiver", x);
            if ( x instanceof IOException ) throw (IOException)x;
            else throw new IOException(x.getMessage());
        }
    }
    
    public AbstractRxTask createRxTask() {
        NioReplicationTask thread = new NioReplicationTask(this,this);
        thread.setUseBufferPool(this.getUseBufferPool());
        thread.setRxBufSize(getRxBufSize());
        thread.setOptions(getWorkerThreadOptions());
        return thread;
    }
    
    
    
    protected void bind() throws IOException {
        // allocate an unbound server socket channel
        serverChannel = ServerSocketChannel.open();
        // Get the associated ServerSocket to bind it with
        ServerSocket serverSocket = serverChannel.socket();
        // create a new Selector for use below
        selector = Selector.open();
        // set the port the server channel will listen to
        //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
        bind(serverSocket,getTcpListenPort(),getAutoBind());
        // set non-blocking mode for the listening socket
        serverChannel.configureBlocking(false);
        // register the ServerSocketChannel with the Selector
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
    }
    
    public void addEvent(Runnable event) {
        if ( selector != null ) {
            synchronized (events) {
                events.add(event);
            }
            if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
            if ( isListening() && selector!=null ) selector.wakeup();
        }
    }

    public void events() {
        if ( events.size() == 0 ) return;
        synchronized (events) {
            Runnable r = null;
            while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
                try {
                    if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
                    r.run();
                } catch ( Exception x ) {
                    log.error("",x);
                }
            }
            events.clear();
        }
    }
    
    public static void cancelledKey(SelectionKey key) {
        ObjectReader reader = (ObjectReader)key.attachment();
        if ( reader != null ) {
            reader.setCancelled(true);
            reader.finish();
        }
        key.cancel(); 
        key.attach(null);
        try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
        try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
        
    }
    
    protected void socketTimeouts() {
        //timeout
        Selector tmpsel = selector;
        Set keys =  (isListening()&&tmpsel!=null)?tmpsel.keys():null;
        if ( keys == null ) return;
        long now = System.currentTimeMillis();
        for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
            SelectionKey key = (SelectionKey) iter.next();
            try {
//                if (key.interestOps() == SelectionKey.OP_READ) {
//                    //only timeout sockets that we are waiting for a read from
//                    ObjectReader ka = (ObjectReader) key.attachment();
//                    long delta = now - ka.getLastAccess();
//                    if (delta > (long) getTimeout()) {
//                        cancelledKey(key);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -