⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 porter.java

📁 一个功能强大的聊天程序.....基本实现所有功能....强烈推荐下载
💻 JAVA
字号:
/*
* LumaQQ - Java QQ Client
*
* Copyright (C) 2004 notXX
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package edu.tsinghua.lumaqq.qq.net;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import edu.tsinghua.lumaqq.qq.packets.PacketParseException;

/**
 * 发送接收线程
 * 
 * @author notxx
 */
public final class Porter extends Thread {
	/** Logger */
	private static final Log log = LogFactory.getLog(Porter.class);

	/** 线程是否结束的标志 */
	protected boolean shutdown = false;
	/** 端口选择器 */
	protected Selector selector;
	
	// port列表
	private List<IPort> ports;
	// proxy列表
	private List<IProxy> proxies;
	
	// 连接释放请求
	private Queue<Object> disposeQueue;
	
	// 新连接请求
	private List<Object> newConnections;
	
	/**
	 * 构造一个Porter.
	 */
	public Porter() {
	    ports = new ArrayList<IPort>();
	    proxies = new ArrayList<IProxy>();
	    newConnections = new Vector<Object>();
	    disposeQueue = new LinkedList<Object>();
		setName("Porter");
		setDaemon(true);
	    // 创建新的Selector
		try {
			selector = Selector.open();
		} catch (IOException e) {
			log.debug(e);
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 注册一个port到porter中
	 * 
	 * @param port
	 * 		IPort实现
	 * @throws ClosedChannelException
	 * 		如果注册失败
	 */
	public void register(IPort port) throws ClosedChannelException {
	    SelectableChannel channel = port.channel();
	    if(channel instanceof SocketChannel)
		    channel.register(selector, SelectionKey.OP_CONNECT, port.getNIOHandler());
	    else if(channel instanceof DatagramChannel)
		    channel.register(selector, SelectionKey.OP_READ, port.getNIOHandler());
	    if(!ports.contains(port))
	        ports.add(port);
	}
	
	/**
	 * 以指定的操作注册channel
	 * 
	 * @param port
	 * @param ops
	 * @throws ClosedChannelException
	 */
	public void register(IPort port, int ops) throws ClosedChannelException {
	    SelectableChannel channel = port.channel();
	    if(channel instanceof SocketChannel)
		    channel.register(selector, ops, port.getNIOHandler());
	    else if(channel instanceof DatagramChannel)
		    channel.register(selector, ops, port.getNIOHandler());
	    if(!ports.contains(port))
	        ports.add(port);
	}
	
	/**
	 * 注册一个代理,用在代理验证中
	 * 
	 * @param proxy
	 * 		IProxy实现类
	 * @throws ClosedChannelException
	 * 		如果注册失败
	 */
	public void register(IProxy proxy) throws ClosedChannelException {
	    SelectableChannel channel = proxy.channel();
	    if(channel instanceof SocketChannel)
		    channel.register(selector, SelectionKey.OP_CONNECT, proxy.getNIOHandler());
	    else if(channel instanceof DatagramChannel)
		    channel.register(selector, SelectionKey.OP_READ, proxy.getNIOHandler());
	    if(!proxies.contains(proxy))
	        proxies.add(proxy);
	}
	
	/**
	 * 删除一个port,这个port的channel将被关闭
	 * 
	 * @param port
	 * 		IPort实现
	 * @throws IOException
	 */
	private void deregister(IPort port) {
		if(port == null)
			return;
		
	    if(!ports.remove(port))
	    	return;
    	SelectionKey key = port.channel().keyFor(selector);
    	if(key != null)
    		key.cancel();
        port.dispose();
	}
	
	/**
	 * 删除一个proxy
	 * 
	 * @param proxy
	 */
	private void deregister(IProxy proxy) {
		if(proxy == null)
			return;
		
	    if(!proxies.remove(proxy))
	    	return;
    	SelectionKey key = proxy.channel().keyFor(selector);
    	if(key != null)
    		key.cancel();
        proxy.dispose();
	}
	
	/**
	 * 发送错误事件到所有port
	 * 
	 * @param e
	 * 		包含错误信息的Exception
	 */
	private void dispatchErrorToAll(Exception e) {
		for(IPort port : ports)
			port.getNIOHandler().processError(e);
	    for(IProxy proxy: proxies)
			proxy.getNIOHandler().processError(e);
	}
	
	/**
	 * 通知所有port发送包
	 * @throws IOException
	 */
	private void notifySend() {
	    int size = ports.size();
	    for(int i = 0; i < size; i++) {
	        INIOHandler handler = null;
	        try {
		        handler = ((IPort)ports.get(i)).getNIOHandler();
                handler.processWrite();
            } catch (IOException e) {
	            log.error(e.getMessage());
	            handler.processError(e);
            } catch (IndexOutOfBoundsException e) {                
            }
	    }
	    
	    size = proxies.size();
	    for(int i = 0; i < size; i++) {
	        INIOHandler handler = null;
	        try {
		        handler = ((IProxy)proxies.get(i)).getNIOHandler();
                handler.processWrite();
            } catch (IOException e) {
	            log.error(e.getMessage());
	            handler.processError(e);
            } catch (IndexOutOfBoundsException e) {                
            }
	    }
	}
	
	/**
	 * 不断运转维护所有注册的IPort对象.
	 * 通过调用它们的几个函数分别做到清空发送队列/填充接收队列/维护队列的功能.
	 * @see IPort#send(ByteBuffer)
	 * @see IPort#receive(ByteBuffer)
	 * @see IPort#maintain()
	 */
	public void run() {
		log.debug("Porter已经启动");		
		int n = 0;
	    while(!shutdown) {		    		    
	        // do select
            try {
                n = selector.select(3000);
                // 如果要shutdown,关闭selector退出
                if (shutdown) {
                    selector.close();
                	break;			        
                }
            } catch (IOException e) {
	            log.error(e.getMessage());
	            dispatchErrorToAll(e);
            } 
            
            // 处理连接释放请求
            processDisposeQueue();
            
		    // 如果select返回大于0,处理事件
		    if(n > 0) {
		        for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
					// 得到下一个Key
					SelectionKey sk = i.next();
					i.remove();
					// 检查其是否还有效
	                if(!sk.isValid())
	                    continue;

					// 处理
					INIOHandler handler = (INIOHandler)sk.attachment();
		            try {
                        if(sk.isConnectable())
                            handler.processConnect(sk);
                        else if (sk.isReadable())
                            handler.processRead(sk);
                    } catch (IOException e) {
        	            log.error(e.getMessage());
        	            handler.processError(e);
                    } catch (PacketParseException e) {
        	            log.debug("包解析错误: " + e.getMessage());
    	            } catch (RuntimeException e) {
    	                log.error(e.getMessage());
    	            }
		        }
		        
		        n = 0;
		    }
		    
		    checkNewConnection();		    
		    notifySend();		    
		}
	    
        selector = null;
        shutdown = false;
		log.debug("Porter已经退出");
	}
	
	/**
	 * 添加释放请求
	 * 
	 * @param p
	 */
	public void addDisposeRequest(IPort p) {
		synchronized(disposeQueue) {
			disposeQueue.offer(p);
		}
	}
	
	/**
	 * 添加释放请求
	 * 
	 * @param p
	 */
	public void addDisposeRequest(IProxy p) {
		synchronized(disposeQueue) {
			disposeQueue.offer(p);
		}
	}
	
    /**
     * 检查是否有新连接要加入
     */
    private void checkNewConnection() {	 
        while(!newConnections.isEmpty()) {
            Object handler = newConnections.remove(0);
	        if(handler instanceof IProxy) {
		        try {
	                register((IProxy)handler);
	            } catch (ClosedChannelException e1) {
	            }
	        } else if(handler instanceof IPort) {
		        try {
	                register((IPort)handler);
	            } catch (ClosedChannelException e1) {
	            }
	        }            
        }
    }
    
    /**
     * 处理连接释放请求
     */
    private void processDisposeQueue() {
    	synchronized(disposeQueue) {
    		while(!disposeQueue.isEmpty()) {
    			Object obj = disposeQueue.poll();
    			if(obj instanceof IPort)
    				deregister((IPort)obj);
    			else if(obj instanceof IProxy)
    				deregister((IProxy)obj);
    		}
    	}
    }

    /**
     * 关闭porter
     */
    public void shutdown() {
	    if(selector != null) {
		    shutdown = true;
	        selector.wakeup();	        
	    }
    }
    
    /**
     * 唤醒selector
     */
    public void wakeup() {
        selector.wakeup();
    }
    
    /**
     * 唤醒selector然后注册这个proxy
     * 
     * @param proxy
     */
    public void wakeup(Object handler) {
        newConnections.add(handler);
        selector.wakeup();
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -