⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpclientimpl.java

📁 同步接收web services请求
💻 JAVA
字号:
package com.aceway.vas.commons.tcp.impl;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.apache.log4j.Logger;

import com.aceway.vas.commons.tcp.IClientHandler;
import com.aceway.vas.commons.tcp.TcpClient;
import com.aceway.vas.commons.tcp.common.TypeConvert;

public class TcpClientImpl implements TcpClient {

	static Logger log = Logger.getLogger(TcpClientImpl.class);

	private SocketChannel sc = null;

	// 包的最大尺寸
	private int maxPacketSize = 1024;

	// 接收线程的运行标志
	private boolean val = true;

	// 回调函数
	private IClientHandler handler = null;

	// 接收缓冲
	private ByteBuffer recvBuff = null;

	// 接收进程
	static RecvThread rt = null;
	
	/**
	 * TCP线路维护线程 
	 */
	//static 

	// 存储当前的实例
	static List objs = new ArrayList();

	private SocketChannel getSocketChannel() {
		return this.sc;
	}

	private IClientHandler getClientHandler() {
		return this.handler;
	}

	private int getMaxPackageSize() {
		return this.maxPacketSize;
	}

	public boolean connect(String ip, int port) {
		try {
			InetSocketAddress addr = new InetSocketAddress(ip, port);
			// 生成一个socketchannel
			sc = SocketChannel.open(addr);
			// 连接到server
			sc.configureBlocking(false);
			// 存储所有的连接
			this.objs.add(this);
			// 添加TcpClientImpl的实例到接收线程
			rt.add(this);
			// 响应连接事件
			handler.onConnect(ip, port);
			return true;
		}catch(Exception e){
			log.error("连接错误, 可能远程的服务器已经关闭",e);
			return false;
		}
	}

	public boolean disconnect() {
		try {
			this.sc.close();
		} catch (Exception ex) {
			ex.printStackTrace();
			return false;
		}
		if (objs.contains(this)) {
			objs.remove(this);
		}

		handler.onDisconnect();
		return true;

	}

	public synchronized int send(byte[] bytes) {
		try {
			ByteBuffer bb = ByteBuffer.wrap(bytes);
			while (bb.hasRemaining())
				sc.write(bb);
			// 如果是消息的话
			if (TypeConvert.byte2int(bytes, 0) != 1)
				handler.onSendedMsg(bytes);
			else {
				// 否则如果是OBJECT的话
				byte[] temp = new byte[bytes.length - 8];
				System.arraycopy(bytes, 8, temp, 0, temp.length);
				ByteArrayInputStream byteIn = new ByteArrayInputStream(temp);
				ObjectInputStream in = new ObjectInputStream(byteIn);
				Object o = in.readObject();
				handler.onSendedMsg((Serializable) o);
			}
			return 0;
		} catch (Exception ex) {
			ex.printStackTrace();
			try {
	            sc.close();
            } catch (IOException e) {
	            e.printStackTrace();
            }
			return 1;
		}
	}

	// 从服务器端获取信息
	private class RecvThread extends Thread {
		Selector selector = null;

		//存放所有的TcpClientImpl实例
		List list = new ArrayList();

		public RecvThread() {
			try {
				selector = Selector.open();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

		public void add(TcpClientImpl client) {
			synchronized (list) {
				list.add(client);
				list.notifyAll();
				selector.wakeup();
			}

		}
		
		private void sliceAndDealWithData(TcpClientImpl ci, IClientHandler dh) {
			ByteBuffer buff = ci.recvBuff;

			int dataLen = buff.position();
			
			int  iPos = 0;	// 每次切包开始位置
			
			while (dataLen - iPos > 4) {
				buff.position(iPos);
				
				if (buff.getInt(iPos) == 1) {		// object
					if (dataLen - iPos > 8){
						int packageSize = buff.getInt(iPos + 4);
						if (packageSize > maxPacketSize){
							buff.clear();
							return;
						}
						if (dataLen - iPos - 8 >= packageSize ) {		// 如果buffer里有一个完整的包
							
							buff.position(iPos + 8);
							byte[] byteObj = new byte[packageSize];
							buff.get(byteObj, 0, packageSize); // 将消息存放入byteObj里

							/* ================还原对象=========开始======== */
							ByteArrayInputStream byteIn = new ByteArrayInputStream(
							        byteObj);
							ObjectInputStream in = null;
							Object o = null;
							try {
								in = new ObjectInputStream(byteIn);
								o = in.readObject();
							} catch (IOException e) {
								log.error("接收对象的时候发生错误");
								e.printStackTrace();
								break;
							} catch (ClassNotFoundException e) {
								log.error("接收对象的时候发生错误, 对象没有找到");
								e.printStackTrace();
								break;
							}
							dh.onReceiveMsg(
							        (Serializable) o);
							/* ================还原对象=========结束======== */
							
							iPos  += 8 + packageSize;
							continue;
						}	// if (dataLen - iPos > 8)
					}

					// not enough for one package
					break;
				} else 	{		// byte stream
					byte[] msg = new byte[dataLen - iPos];
					buff.get(msg, 0, dataLen - iPos); // 从buffer里取得当前的消息体
					int sliceResult = dh.slice(msg);
					if (sliceResult > 0) { // 当可以切到一个完整的包的时候
						byte[] msgPkt = new byte[sliceResult];
						System.arraycopy(msg, 0, msgPkt, 0, sliceResult);	// 获得当前一个完整的包
						dh.onReceiveMsg(msgPkt);			//回调
						
						iPos += sliceResult;
					}
				}
			}
				// 处理切包剩余部分
				if (dataLen - iPos > 0)	{// remaining
					buff.position(iPos);
					buff.limit(dataLen);
					buff.compact();
				} else {
					buff.clear();
				}

		}

		public void run() {
			int n = 0, read;
			while (val) {
				// try {
				//判断list里有没有TcpClientImpl实例, 如果有的话, 注册到接收线程的selector里
				while (list.size() > 0) {
					TcpClientImpl client = (TcpClientImpl) list.remove(list
							.size() - 1);
					SocketChannel sc = client.getSocketChannel();
					try {
						sc.register(selector, SelectionKey.OP_READ, client);
					} catch (ClosedChannelException e) {
						e.printStackTrace();
						break;
					}
				}
				try {
					n = selector.select();
				} catch (IOException e) {
					n = 0;
					val = false;
					e.printStackTrace();
				} catch(ClosedSelectorException e){
					n = 0;
					val = false;
					e.printStackTrace();
				}
				if (n > 0) {
					Set set = selector.selectedKeys();
					java.util.Iterator it = set.iterator();
					while (it.hasNext()) {
						SelectionKey skey = (SelectionKey) it.next();
						it.remove();
						try {
							if (skey.isReadable()) {
								SocketChannel sc = (SocketChannel) skey
										.channel();
								TcpClientImpl temp = (TcpClientImpl) skey
										.attachment();
								IClientHandler ch = temp.getClientHandler();
								ByteBuffer buffer = temp.recvBuff;
								buffer.limit(buffer.capacity());

								while ((read = sc.read(buffer)) != -1) {
									if (read == 0)
										break;

									//buffer.flip();
									sliceAndDealWithData(temp, ch);
								}
							}
						} catch (java.nio.channels.CancelledKeyException ex) {
							try {
								skey.channel().close();
							} catch (IOException e) {
								e.printStackTrace();
							}
							objs.remove((TcpClientImpl) skey.attachment());
							((TcpClientImpl) skey.attachment()).disconnect();
						} catch (ClosedSelectorException e1) {
							objs.remove((TcpClientImpl) skey.attachment());
							((TcpClientImpl) skey.attachment()).disconnect();
							val = false;
							log.warn("通信链路被关闭");
							log.warn(e1.getMessage());
						} catch (IOException e2) {
							objs.remove((TcpClientImpl) skey.attachment());
							((TcpClientImpl) skey.attachment()).disconnect();
							try {
								skey.channel().close();
							} catch (Exception ex) {
								ex.printStackTrace();
							}
							log.error("接收消息时发生错误, 可能远程服务器已经被关闭, 也可能本地关闭连接");
							log.error(e2.getMessage());
						}
					}
				}
			}
		}
	}

	public void setDataHandler(IClientHandler handler, int maxPacketSize) {
		this.handler = handler;
		this.maxPacketSize = maxPacketSize;
		this.recvBuff = ByteBuffer.allocate(maxPacketSize);
		if (rt == null) {
			rt = new RecvThread();
			rt.start();
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.aceway.communication.tcp.TcpClient#send(java.io.Serializable)
	 */
	public int send(Serializable obj) {
		// TODO Auto-generated method stub
		try {
			ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
			ObjectOutputStream out = new ObjectOutputStream(byteOut);
			out.writeObject(obj);

			int len = byteOut.toByteArray().length;
			byte[] bytes = new byte[len + 8];
			TypeConvert.int2byte(1, bytes, 0);
			TypeConvert.int2byte(len, bytes, 4);
			System.arraycopy(byteOut.toByteArray(), 0, bytes, 8, byteOut
					.toByteArray().length);
			return send(bytes);		// kongds modified on 2007-06-20

		} catch (Exception e) {
			log.error("发送失败",e);
			return 1;
		}

	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -