⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 psocketconnection.java

📁 移动CMPP的网关,JAVA又一实现,非常完整的代码.
💻 JAVA
字号:
package com.grail.comm.core;

import com.grail.util.*;

import java.io.*;
import java.net.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * <p>
 * Title: 短信 项目
 * </p>
 * 
 * <p>
 * Description:
 * </p>
 * 
 * <p>
 * Copyright: Copyright (c) 2004 合力阳光
 * </p>
 * 
 * <p>
 * Company: 合力阳光
 * </p>
 * 
 * @author ray/刘有为
 * @version 1.0
 */
public abstract class PSocketConnection extends PLayer {

	/** *******************定义的常量标识************************ */
	protected static String NOT_INIT;//表示不能正确初始化

	protected static String CONNECTING;//表示正在连接

	protected static String RECONNECTING;//从新连接

	protected static String CONNECTED;//过去的连接

	protected static String HEARTBEATING;//心跳

	protected static String RECEIVEING;//接收

	protected static String CLOSEING;//正在关闭

	protected static String CLOSED;//以关闭

	protected static String UNKNOWN_HOST;//无法查找到主机

	protected static String PORT_ERROR;//端口错误

	protected static String CONNECT_REFUSE;//连接拒绝

	protected static String NO_ROUTE_TO_HOST;//无法传递到主机

	protected static String RECEIVE_TIMEOUT;//暂停接收

	protected static String CLOSE_BY_PEER;//按点关闭

	protected static String RESET_BY_PEER;//按点复位

	protected static String CONNECTION_CLOSED;//连接关闭

	protected static String COMMUNICATION_ERROR;//通讯错误

	protected static String CONNECT_ERROR;//连接错误

	protected static String SEND_ERROR;//发送错误

	protected static String RECEIVE_ERROR;//接收错误

	protected static String CLOSE_ERROR;//关闭错误

	private String error;//错误

	protected Date errorTime;//错误时间

	protected String name;//名称

	protected String host;//主机

	protected int port;//端口

	protected String localHost;//本地主机

	protected int localPort;//本地端口

	protected int heartbeatInterval;//心跳间隔

	protected PReader in;//Socket读取

	protected PWriter out;//Socket发送

	protected static DateFormat df = new SimpleDateFormat(
			"yyyy-MM-dd HH:mm:ss.SSS");//时间格式化工具

	protected int readTimeout;//读取超时

	protected int reconnectInterval;//重新连接间隔

	protected Socket socket;//Socket

	protected WatchThread heartbeatThread;//守护线程用于监视心跳

	protected WatchThread receiveThread;//守护线程用于接收消息

	protected int transactionTimeout;//事务超时

	protected Resource resource;//资源读取器

	/** ******************构造方法********************** */
	public PSocketConnection(Args args) {
		super(null);
		errorTime = new Date();
		port = -1;
		localPort = -1;
		init(args);
	}

	protected PSocketConnection() {
		super(null);
		errorTime = new Date();
		port = -1;
		localPort = -1;
	}

	/** *********************************************** */
	/**
	 * <p>
	 * 初始化方法
	 * </p>
	 * 
	 * @param args
	 */
	protected void init(Args args) {
		resource = getResource();
		initResource();
		error = NOT_INIT;
		setAttributes(args);
	/*	if (heartbeatInterval > 0) {
			*//** 内部类表示心跳线程* *//*
			class HeartbeatThread extends WatchThread {

				public void task() {
					System.out.println("--------- 启动心跳线程------");
					try {
						Thread.sleep(heartbeatInterval);
					} catch (InterruptedException interruptedexception) {
					}
					if (error == null && out != null)
						try {
							heartbeat();//心跳信息
						} catch (IOException ex) {
							setError(String
									.valueOf(PSocketConnection.SEND_ERROR)
									+ String.valueOf(explain(ex)));
						}
				}

				public HeartbeatThread() {
					super(String.valueOf(String.valueOf(name)).concat(
							"-heartbeat"));
					setState(PSocketConnection.HEARTBEATING);//设置状态
				}
			}

			heartbeatThread = new HeartbeatThread();
			heartbeatThread.start();//启动心跳线程
		}*/
		/** *****************内部类表示接收线程********************* */
		class ReceiveThread extends WatchThread {

			public void task() {

				System.out.println("-----------启动接受线程---------");
				try {
					if (error == null) {
						System.out.println("----------- 读取信息 -------------");
						PMessage m = in.read();//读取信息
						if (m != null && m != null) {
							System.out.println("-------- 受到消息 -----");
							onReceive(m);
						}

					} else {
						if (error != PSocketConnection.NOT_INIT)
							try {
								System.out.println("--------- 休眠等待重新连接 -----");
								Thread.sleep(reconnectInterval);//休眠等待重新连接
							} catch (InterruptedException interruptedexception) {
								//interruptedexception.printStackTrace();
							}
						connect();

					}

				} catch (IOException ioexception) {
					ioexception.printStackTrace();
				}

			}

			public ReceiveThread() {
				super(String.valueOf(String.valueOf(name)).concat("-receive"));
			}
		}

		receiveThread = new ReceiveThread();
		receiveThread.start();
	}

	/**
	 * <p>
	 * 设置属性
	 * </p>
	 * 
	 * @param args
	 */
	public void setAttributes(Args args) {
		if (name != null
				&& name.equals(String.valueOf(String.valueOf((new StringBuffer(
						String.valueOf(String.valueOf(String.valueOf(host)))))
						.append(':').append(port)))))
			name = null;
		String oldHost = host;
		int oldPort = port;
		String oldLocalHost = localHost;
		int oldLocalPort = localPort;
		host = args.get("host", null);
		port = args.get("port", -1);
		localHost = args.get("local-host", null);
		localPort = args.get("local-port", -1);
		name = args.get("name", null);
		if (name == null)
			name = String.valueOf(String.valueOf((new StringBuffer(String
					.valueOf(String.valueOf(host)))).append(':').append(port)));
		readTimeout = 1000 * args.get("read-timeout", readTimeout / 1000);
		if (socket != null)
			try {
				socket.setSoTimeout(readTimeout);
			} catch (SocketException socketexception) {
			}
		reconnectInterval = 1000 * args.get("reconnect-interval", -1);
		heartbeatInterval = 1000 * args.get("heartbeat-interval", -1);
		transactionTimeout = 1000 * args.get("transaction-timeout", -1);
		if (error == null
				&& host != null
				&& port != -1
				&& (!host.equals(oldHost) || port != port
						|| !host.equals(oldHost) || port != port)) {
			setError(resource.get("comm/need-reconnect"));
			receiveThread.interrupt();
		}
	}

	/*
	 * <p> 发送消息 </p> (non-Javadoc)
	 * 
	 * @see com.ray.insa2.comm.core.PLayer#send(com.ray.insa2.comm.core.PMessage)
	 */
	public void send(PMessage message) throws PException {
		if (error != null)
			throw new PException(String.valueOf(SEND_ERROR)
					+ String.valueOf(getError()));
		try {
			out.write(message);
			fireEvent(new PEvent(8, this, message));
		} catch (PException ex) {
			fireEvent(new PEvent(16, this, message));
			setError(String.valueOf(SEND_ERROR) + String.valueOf(explain(ex)));
			throw ex;
		} catch (Exception ex) {
			fireEvent(new PEvent(16, this, message));
			setError(String.valueOf(SEND_ERROR) + String.valueOf(explain(ex)));
		}
	}

	/**
	 * <p>
	 * 返回连接名称
	 * </p>
	 * 
	 * @return String
	 */
	public String getName() {
		return name;
	}

	/**
	 * <p>
	 * 返回主机名
	 * </p>
	 * 
	 * @return String
	 */
	public String getHost() {
		return host;
	}

	/**
	 * <p>
	 * 返回端口号
	 * </p>
	 * 
	 * @return int
	 */
	public int getPort() {
		return port;
	}

	/**
	 * <p>
	 * 返回重新连接间隔
	 * </p>
	 * 
	 * @return int
	 */
	public int getReconnectInterval() {
		return reconnectInterval / 1000;
	}

	public String toString() {
		return String.valueOf(String.valueOf((new StringBuffer(
				"PSocketConnection:")).append(name).append('(').append(host)
				.append(':').append(port).append(')')));
	}

	/**
	 * <p>
	 * 返回读取超时
	 * </p>
	 * 
	 * @return int
	 */
	public int getReadTimeout() {
		return readTimeout / 1000;
	}

	/**
	 * <p>
	 * 表示是否可用
	 * </p>
	 * 
	 * @return boolean
	 */
	public boolean available() {
		return error == null;
	}

	/**
	 * <p>
	 * 返回错误
	 * </p>
	 * @return String
	 */
	public String getError() {
		return error;
	}

	/**
	 * <p>
	 * 返回错误时间
	 * </p>
	 * 
	 * @return Date
	 */
	public Date getErrorTime() {
		return errorTime;
	}

	/*
	 * <p> 关闭Socket </p> (non-Javadoc)
	 * 
	 * @see com.ray.insa2.comm.core.PLayer#close()
	 */
	public synchronized void close() {
		try {
			if (socket != null) {
				socket.close();
				in = null;
				out = null;
				socket = null;
				if (heartbeatThread != null)
					heartbeatThread.kill();
				receiveThread.kill();
			}
		} catch (Exception exception) {
		}
		setError(NOT_INIT);
	}

	/**
	 * <p>
	 * 连接方法
	 * </p>
	 */
	protected synchronized void connect() {
		if (error == NOT_INIT)
			error = CONNECTING;
		else if (error == null)
			error = RECONNECTING;
		errorTime = new Date();

		if (socket != null)
			try {
				socket.close();
			} catch (IOException ioexception) {
			}

		try {
			if (port <= 0 || port > 65535) {//检验端口号
				setError(String.valueOf(String.valueOf((new StringBuffer(String
						.valueOf(String.valueOf(PORT_ERROR)))).append("port:")
						.append(port))));
				return;
			}
			if (localPort < -1 || localPort > 65535) {//检验本地端口号
				setError(String.valueOf(String.valueOf((new StringBuffer(String
						.valueOf(String.valueOf(PORT_ERROR)))).append(
						"local-port:").append(localPort))));
				return;
			}
			if (localHost != null) {//begin if code 1
				boolean isConnected = false;
				InetAddress localAddr = InetAddress.getByName(localHost);//得到本机地址
				if (localPort == -1) {//begin if code 2
					for (int p = (int) (Math.random() * (double) 64500); p < 0xdc758; p += 13)
						try {
							socket = new Socket(host, port, localAddr,
									1025 + p % 64500);
							isConnected = true;
							break;
						} catch (IOException ioexception1) {
						} catch (SecurityException securityexception) {
						}

					if (!isConnected)
						throw new SocketException(
								"Can not find an avaliable local port");
				} else {//end if code 2
					socket = new Socket(host, port, localAddr, localPort);
				}
			} else {//end if code 1
				socket = new Socket(host, port);
			}
			socket.setSoTimeout(readTimeout);
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket success : "
					+ socket.isConnected());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket isClosed: "
					+ socket.isClosed());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ service host: "
					+ socket.getInetAddress().getHostName());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ service prot: "
					+ socket.getPort());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ client  host  "
					+ socket.getLocalAddress().getHostName());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ client  prot: "
					+ socket.getLocalPort());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket is  out: "
					+ socket.getOutputStream());
			System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket is  is:  "
					+ socket.getInputStream());
			out = getWriter(socket.getOutputStream());
			in = getReader(socket.getInputStream());

			setError(null);
		} catch (IOException ex) {
			setError(String.valueOf(CONNECT_ERROR)
					+ String.valueOf(explain(ex)));
		}
	}

	/**
	 * <p>
	 * 设置错误
	 * </p>
	 * 
	 * @param desc
	 */
	protected void setError(String desc) {
	  System.out.println(desc);
		if (error == null && desc == null || desc != null && desc.equals(error))
			return;
		error = desc;
		errorTime = new Date();
		if (desc == null)
			desc = CONNECTED;
	}

	protected abstract PWriter getWriter(OutputStream outputstream);

	protected abstract PReader getReader(InputStream inputstream);

	protected abstract Resource getResource();

	protected void heartbeat() throws IOException {
	}

	/**
	 * <p>
	 * 初始化资源
	 * </p>
	 */
	public void initResource() {
		NOT_INIT = resource.get("comm/not-init");
		CONNECTING = resource.get("comm/connecting");
		RECONNECTING = resource.get("comm/reconnecting");
		CONNECTED = resource.get("comm/connected");
		HEARTBEATING = resource.get("comm/heartbeating");
		RECEIVEING = resource.get("comm/receiveing");
		CLOSEING = resource.get("comm/closeing");
		CLOSED = resource.get("comm/closed");
		UNKNOWN_HOST = resource.get("comm/unknown-host");
		PORT_ERROR = resource.get("comm/port-error");
		CONNECT_REFUSE = resource.get("comm/connect-refused");
		NO_ROUTE_TO_HOST = resource.get("comm/no-route");
		RECEIVE_TIMEOUT = resource.get("comm/receive-timeout");
		CLOSE_BY_PEER = resource.get("comm/close-by-peer");
		RESET_BY_PEER = resource.get("comm/reset-by-peer");
		CONNECTION_CLOSED = resource.get("comm/connection-closed");
		COMMUNICATION_ERROR = resource.get("comm/communication-error");
		CONNECT_ERROR = resource.get("comm/connect-error");
		SEND_ERROR = resource.get("comm/send-error");
		RECEIVE_ERROR = resource.get("comm/receive-error");
		CLOSE_ERROR = resource.get("comm/close-error");
	}

	/**
	 * <p>
	 * 异常解析方法
	 * </p>
	 * 
	 * @param ex
	 *            异常
	 * @return String
	 */
	protected String explain(Exception ex) {
		String msg = ex.getMessage();
		if (msg == null)
			msg = "";
		if (ex instanceof PException)
			return ex.getMessage();
		if (ex instanceof EOFException)
			return CLOSE_BY_PEER;
		if (msg.indexOf("Connection reset by peer") != -1)
			return RESET_BY_PEER;
		if (msg.indexOf("SocketTimeoutException") != -1)
			return RECEIVE_TIMEOUT;
		if (ex instanceof NoRouteToHostException)
			return NO_ROUTE_TO_HOST;
		if (ex instanceof ConnectException)
			return CONNECT_REFUSE;
		if (ex instanceof UnknownHostException)
			return UNKNOWN_HOST;
		if (msg.indexOf("errno: 128") != -1) {
			return NO_ROUTE_TO_HOST;
		} else {
			ex.printStackTrace();
			return ex.toString();
		}
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -