⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcphelper.java

📁 短信发送
💻 JAVA
字号:
/**
 * Created at Nov 22, 2008
 */
package com.jdev.net.connector;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import com.jdev.net.data.SocketDataHelper;
import com.jdev.net.event.Notifier;
import com.jdev.net.queue.QueueFactory;
import com.jdev.net.queue.Request;
import com.jdev.net.queue.Response;
import com.jdev.util.Debug;

/**
 * <p>Title: TcpHelper</p>
 * <p>Description: </p>
 * @author Lawrence
 * @version 
 */
public class TcpHelper implements Runnable {

	private final static String module = TcpHelper.class.getName();
	private final SelectionKey sk;
	private final SocketChannel sc;
	private SocketDataHelper socketDataHandler;
	
	private static final int READING = 0, SENDING = 1, WAITING = 2;
	private static int state = READING;
	private static Notifier notifier = Notifier.getNotifier();
	private final static int BUFFER_SIZE = 1024;
	
	private Request request;
	private Response response;
	
	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	public void run() {
		try{
			if (state == READING){
				read(sk);
			}else if(state == SENDING){
				send(sk);
			}
		}catch(Exception e){
			
		}

	}

	public void close(){
		if (sc != null) {
			try{
				sk.cancel();
				sc.close();
			}catch(Exception e){
				Debug.logError(e, module);
			}
		}
	}
	
    /**
     * 处理向客户发送数据
     * @param key SelectionKey
     */
	private void send(SelectionKey key) throws Exception {
    	SocketChannel channel = (SocketChannel) key.channel();
        try {
        	response = new Response(channel);
        	if(request == null)
        		request = new Request(channel);
        	        	       	
            // 包装数据
//        	if(fType == 1 && socketDataHandler.getRequestCount()>0){
//        		ByteBuffer buffer = ByteBuffer.wrap(socketDataHandler.sendRequest());
//        		request.setDataInput(buffer.array());
//            	response.send(request.getDataInput());
//        	}else if(fType == 2 && socketDataHandler.getResponseCount()>0){
//        		ByteBuffer buffer = ByteBuffer.wrap(socketDataHandler.sendResponse());
//        		request.setDataInput(buffer.array());
//            	response.send(request.getDataInput());
//        	}
        	
            // 触发onWrite事件
            notifier.fireOnWrite(request, response);
            
            // 发送数据
            if(response.getDataInput()!=null && response.getDataInput().length>0) {
            	response.send(response.getDataInput());            	
            } else if(request.getDataInput()!=null && request.getDataInput().length>0) {
            	// 从发送队列取数据
            	response.send(request.getDataInput());
            } else
            	response.send("OK".getBytes());

            state = READING;
            key.interestOps(SelectionKey.OP_READ);
            
        }
        catch (Exception e) {
        	notifier.fireOnError("Error occured in Writer: " + e.getMessage());
            
			try {
				request = new Request(channel);
				channel.finishConnect();
				channel.close();
				channel.socket().close();
				notifier.fireOnClosed(request);
			} catch (Exception e1) {
				Debug.logError(e1, module);
			}
            
        }
	}

    /**
     * 处理连接数据读取
     * @param key SelectionKey
     */
	private void read(SelectionKey key) throws Exception {
        try {
            // 读取客户端数据
            SocketChannel channel = (SocketChannel) key.channel();
            byte[] clientData =  readRequest(channel);

            if(clientData.length<=0){
//            	notifier.fireOnError("No Data received in Reader");
	            // 提交主控线程进行写处理
	            state = SENDING;
	            key.interestOps(SelectionKey.OP_WRITE);
//    			try {
//    				channel.finishConnect();
//    				channel.close();
//    				channel.socket().close();
//    				request = new Request(channel);
//    				notifier.fireOnClosed(request);
//    			} catch (Exception e1) {
//    				Debug.logError(e1, module);
//    			}
            }
            else{
	            
	            request = new Request(channel);
	            // 包装数据
//	            if(fType == 1){
//	            	socketDataHandler.receiveResponse(clientData);
//	            }else if(fType == 2){
//	            	socketDataHandler.receiveRequest(clientData);
//	            }
	            
	            request.setDataInput(clientData);
	            // 触发onRead
	            notifier.fireOnRead(request);
	
	            // 提交主控线程进行写处理
	            state = SENDING;
	            key.interestOps(SelectionKey.OP_WRITE);
	            
            }
        }
        catch (Exception e) {
        	notifier.fireOnError("Error occured in Reader: " + e.getMessage());
			try {
	            SocketChannel channel = (SocketChannel) key.channel();
				channel.finishConnect();
				channel.close();
				channel.socket().close();
				request = new Request(channel);
				notifier.fireOnClosed(request);
			} catch (Exception e1) {
				Debug.logError(e1, module);
			}
		}
	}

    /**
     * 读取客户端发出请求数据
     * @param sc 套接通道
     */
	private byte[] readRequest(SocketChannel channel) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        int off = 0;
        int r = 0;
        byte[] data = new byte[BUFFER_SIZE * 10];

        while ( true ) {
            buffer.clear();
            r = channel.read(buffer);
            if (r == -1|| r == 0) break;
            if ( (off + r) > data.length) {
                data = grow(data, BUFFER_SIZE * 10);
            }
            byte[] buf = buffer.array();
            System.arraycopy(buf, 0, data, off, r);
            off += r;
        }
        byte[] req = new byte[off];
        System.arraycopy(data, 0, req, 0, off);
        return req;
	}

    /**
     * 数组扩容
     * @param src byte[] 源数组数据
     * @param size int 扩容的增加量
     * @return byte[] 扩容后的数组
     */
	private byte[] grow(byte[] src, int size) {
        byte[] tmp = new byte[src.length + size];
        System.arraycopy(src, 0, tmp, 0, src.length);
        return tmp;
	}

	/**
	 * @param sk
	 * @param sc
	 */
	public TcpHelper(SelectionKey sk, SocketChannel sc) {
		this.sk = sk;
		this.sc = sc;
		socketDataHandler = new SocketDataHelper(QueueFactory.TCP_QUEUE);

	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -