📄 psocketconnection.java
字号:
package com.grail.comm.core;
import com.grail.util.*;
import java.io.*;
import java.net.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* <p>
* Title: 短信 项目
* </p>
*
* <p>
* Description:
* </p>
*
* <p>
* Copyright: Copyright (c) 2004 合力阳光
* </p>
*
* <p>
* Company: 合力阳光
* </p>
*
* @author ray/刘有为
* @version 1.0
*/
public abstract class PSocketConnection extends PLayer {
/** *******************定义的常量标识************************ */
protected static String NOT_INIT;//表示不能正确初始化
protected static String CONNECTING;//表示正在连接
protected static String RECONNECTING;//从新连接
protected static String CONNECTED;//过去的连接
protected static String HEARTBEATING;//心跳
protected static String RECEIVEING;//接收
protected static String CLOSEING;//正在关闭
protected static String CLOSED;//以关闭
protected static String UNKNOWN_HOST;//无法查找到主机
protected static String PORT_ERROR;//端口错误
protected static String CONNECT_REFUSE;//连接拒绝
protected static String NO_ROUTE_TO_HOST;//无法传递到主机
protected static String RECEIVE_TIMEOUT;//暂停接收
protected static String CLOSE_BY_PEER;//按点关闭
protected static String RESET_BY_PEER;//按点复位
protected static String CONNECTION_CLOSED;//连接关闭
protected static String COMMUNICATION_ERROR;//通讯错误
protected static String CONNECT_ERROR;//连接错误
protected static String SEND_ERROR;//发送错误
protected static String RECEIVE_ERROR;//接收错误
protected static String CLOSE_ERROR;//关闭错误
private String error;//错误
protected Date errorTime;//错误时间
protected String name;//名称
protected String host;//主机
protected int port;//端口
protected String localHost;//本地主机
protected int localPort;//本地端口
protected int heartbeatInterval;//心跳间隔
protected PReader in;//Socket读取
protected PWriter out;//Socket发送
protected static DateFormat df = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss.SSS");//时间格式化工具
protected int readTimeout;//读取超时
protected int reconnectInterval;//重新连接间隔
protected Socket socket;//Socket
protected WatchThread heartbeatThread;//守护线程用于监视心跳
protected WatchThread receiveThread;//守护线程用于接收消息
protected int transactionTimeout;//事务超时
protected Resource resource;//资源读取器
/** ******************构造方法********************** */
public PSocketConnection(Args args) {
super(null);
errorTime = new Date();
port = -1;
localPort = -1;
init(args);
}
protected PSocketConnection() {
super(null);
errorTime = new Date();
port = -1;
localPort = -1;
}
/** *********************************************** */
/**
* <p>
* 初始化方法
* </p>
*
* @param args
*/
protected void init(Args args) {
resource = getResource();
initResource();
error = NOT_INIT;
setAttributes(args);
/* if (heartbeatInterval > 0) {
*//** 内部类表示心跳线程* *//*
class HeartbeatThread extends WatchThread {
public void task() {
System.out.println("--------- 启动心跳线程------");
try {
Thread.sleep(heartbeatInterval);
} catch (InterruptedException interruptedexception) {
}
if (error == null && out != null)
try {
heartbeat();//心跳信息
} catch (IOException ex) {
setError(String
.valueOf(PSocketConnection.SEND_ERROR)
+ String.valueOf(explain(ex)));
}
}
public HeartbeatThread() {
super(String.valueOf(String.valueOf(name)).concat(
"-heartbeat"));
setState(PSocketConnection.HEARTBEATING);//设置状态
}
}
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();//启动心跳线程
}*/
/** *****************内部类表示接收线程********************* */
class ReceiveThread extends WatchThread {
public void task() {
System.out.println("-----------启动接受线程---------");
try {
if (error == null) {
System.out.println("----------- 读取信息 -------------");
PMessage m = in.read();//读取信息
if (m != null && m != null) {
System.out.println("-------- 受到消息 -----");
onReceive(m);
}
} else {
if (error != PSocketConnection.NOT_INIT)
try {
System.out.println("--------- 休眠等待重新连接 -----");
Thread.sleep(reconnectInterval);//休眠等待重新连接
} catch (InterruptedException interruptedexception) {
//interruptedexception.printStackTrace();
}
connect();
}
} catch (IOException ioexception) {
ioexception.printStackTrace();
}
}
public ReceiveThread() {
super(String.valueOf(String.valueOf(name)).concat("-receive"));
}
}
receiveThread = new ReceiveThread();
receiveThread.start();
}
/**
* <p>
* 设置属性
* </p>
*
* @param args
*/
public void setAttributes(Args args) {
if (name != null
&& name.equals(String.valueOf(String.valueOf((new StringBuffer(
String.valueOf(String.valueOf(String.valueOf(host)))))
.append(':').append(port)))))
name = null;
String oldHost = host;
int oldPort = port;
String oldLocalHost = localHost;
int oldLocalPort = localPort;
host = args.get("host", null);
port = args.get("port", -1);
localHost = args.get("local-host", null);
localPort = args.get("local-port", -1);
name = args.get("name", null);
if (name == null)
name = String.valueOf(String.valueOf((new StringBuffer(String
.valueOf(String.valueOf(host)))).append(':').append(port)));
readTimeout = 1000 * args.get("read-timeout", readTimeout / 1000);
if (socket != null)
try {
socket.setSoTimeout(readTimeout);
} catch (SocketException socketexception) {
}
reconnectInterval = 1000 * args.get("reconnect-interval", -1);
heartbeatInterval = 1000 * args.get("heartbeat-interval", -1);
transactionTimeout = 1000 * args.get("transaction-timeout", -1);
if (error == null
&& host != null
&& port != -1
&& (!host.equals(oldHost) || port != port
|| !host.equals(oldHost) || port != port)) {
setError(resource.get("comm/need-reconnect"));
receiveThread.interrupt();
}
}
/*
* <p> 发送消息 </p> (non-Javadoc)
*
* @see com.ray.insa2.comm.core.PLayer#send(com.ray.insa2.comm.core.PMessage)
*/
public void send(PMessage message) throws PException {
if (error != null)
throw new PException(String.valueOf(SEND_ERROR)
+ String.valueOf(getError()));
try {
out.write(message);
fireEvent(new PEvent(8, this, message));
} catch (PException ex) {
fireEvent(new PEvent(16, this, message));
setError(String.valueOf(SEND_ERROR) + String.valueOf(explain(ex)));
throw ex;
} catch (Exception ex) {
fireEvent(new PEvent(16, this, message));
setError(String.valueOf(SEND_ERROR) + String.valueOf(explain(ex)));
}
}
/**
* <p>
* 返回连接名称
* </p>
*
* @return String
*/
public String getName() {
return name;
}
/**
* <p>
* 返回主机名
* </p>
*
* @return String
*/
public String getHost() {
return host;
}
/**
* <p>
* 返回端口号
* </p>
*
* @return int
*/
public int getPort() {
return port;
}
/**
* <p>
* 返回重新连接间隔
* </p>
*
* @return int
*/
public int getReconnectInterval() {
return reconnectInterval / 1000;
}
public String toString() {
return String.valueOf(String.valueOf((new StringBuffer(
"PSocketConnection:")).append(name).append('(').append(host)
.append(':').append(port).append(')')));
}
/**
* <p>
* 返回读取超时
* </p>
*
* @return int
*/
public int getReadTimeout() {
return readTimeout / 1000;
}
/**
* <p>
* 表示是否可用
* </p>
*
* @return boolean
*/
public boolean available() {
return error == null;
}
/**
* <p>
* 返回错误
* </p>
* @return String
*/
public String getError() {
return error;
}
/**
* <p>
* 返回错误时间
* </p>
*
* @return Date
*/
public Date getErrorTime() {
return errorTime;
}
/*
* <p> 关闭Socket </p> (non-Javadoc)
*
* @see com.ray.insa2.comm.core.PLayer#close()
*/
public synchronized void close() {
try {
if (socket != null) {
socket.close();
in = null;
out = null;
socket = null;
if (heartbeatThread != null)
heartbeatThread.kill();
receiveThread.kill();
}
} catch (Exception exception) {
}
setError(NOT_INIT);
}
/**
* <p>
* 连接方法
* </p>
*/
protected synchronized void connect() {
if (error == NOT_INIT)
error = CONNECTING;
else if (error == null)
error = RECONNECTING;
errorTime = new Date();
if (socket != null)
try {
socket.close();
} catch (IOException ioexception) {
}
try {
if (port <= 0 || port > 65535) {//检验端口号
setError(String.valueOf(String.valueOf((new StringBuffer(String
.valueOf(String.valueOf(PORT_ERROR)))).append("port:")
.append(port))));
return;
}
if (localPort < -1 || localPort > 65535) {//检验本地端口号
setError(String.valueOf(String.valueOf((new StringBuffer(String
.valueOf(String.valueOf(PORT_ERROR)))).append(
"local-port:").append(localPort))));
return;
}
if (localHost != null) {//begin if code 1
boolean isConnected = false;
InetAddress localAddr = InetAddress.getByName(localHost);//得到本机地址
if (localPort == -1) {//begin if code 2
for (int p = (int) (Math.random() * (double) 64500); p < 0xdc758; p += 13)
try {
socket = new Socket(host, port, localAddr,
1025 + p % 64500);
isConnected = true;
break;
} catch (IOException ioexception1) {
} catch (SecurityException securityexception) {
}
if (!isConnected)
throw new SocketException(
"Can not find an avaliable local port");
} else {//end if code 2
socket = new Socket(host, port, localAddr, localPort);
}
} else {//end if code 1
socket = new Socket(host, port);
}
socket.setSoTimeout(readTimeout);
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket success : "
+ socket.isConnected());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket isClosed: "
+ socket.isClosed());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ service host: "
+ socket.getInetAddress().getHostName());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ service prot: "
+ socket.getPort());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ client host "
+ socket.getLocalAddress().getHostName());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ client prot: "
+ socket.getLocalPort());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket is out: "
+ socket.getOutputStream());
System.out.println("^^^^^^^^^^^^^^^^^^^^^^ socket is is: "
+ socket.getInputStream());
out = getWriter(socket.getOutputStream());
in = getReader(socket.getInputStream());
setError(null);
} catch (IOException ex) {
setError(String.valueOf(CONNECT_ERROR)
+ String.valueOf(explain(ex)));
}
}
/**
* <p>
* 设置错误
* </p>
*
* @param desc
*/
protected void setError(String desc) {
System.out.println(desc);
if (error == null && desc == null || desc != null && desc.equals(error))
return;
error = desc;
errorTime = new Date();
if (desc == null)
desc = CONNECTED;
}
protected abstract PWriter getWriter(OutputStream outputstream);
protected abstract PReader getReader(InputStream inputstream);
protected abstract Resource getResource();
protected void heartbeat() throws IOException {
}
/**
* <p>
* 初始化资源
* </p>
*/
public void initResource() {
NOT_INIT = resource.get("comm/not-init");
CONNECTING = resource.get("comm/connecting");
RECONNECTING = resource.get("comm/reconnecting");
CONNECTED = resource.get("comm/connected");
HEARTBEATING = resource.get("comm/heartbeating");
RECEIVEING = resource.get("comm/receiveing");
CLOSEING = resource.get("comm/closeing");
CLOSED = resource.get("comm/closed");
UNKNOWN_HOST = resource.get("comm/unknown-host");
PORT_ERROR = resource.get("comm/port-error");
CONNECT_REFUSE = resource.get("comm/connect-refused");
NO_ROUTE_TO_HOST = resource.get("comm/no-route");
RECEIVE_TIMEOUT = resource.get("comm/receive-timeout");
CLOSE_BY_PEER = resource.get("comm/close-by-peer");
RESET_BY_PEER = resource.get("comm/reset-by-peer");
CONNECTION_CLOSED = resource.get("comm/connection-closed");
COMMUNICATION_ERROR = resource.get("comm/communication-error");
CONNECT_ERROR = resource.get("comm/connect-error");
SEND_ERROR = resource.get("comm/send-error");
RECEIVE_ERROR = resource.get("comm/receive-error");
CLOSE_ERROR = resource.get("comm/close-error");
}
/**
* <p>
* 异常解析方法
* </p>
*
* @param ex
* 异常
* @return String
*/
protected String explain(Exception ex) {
String msg = ex.getMessage();
if (msg == null)
msg = "";
if (ex instanceof PException)
return ex.getMessage();
if (ex instanceof EOFException)
return CLOSE_BY_PEER;
if (msg.indexOf("Connection reset by peer") != -1)
return RESET_BY_PEER;
if (msg.indexOf("SocketTimeoutException") != -1)
return RECEIVE_TIMEOUT;
if (ex instanceof NoRouteToHostException)
return NO_ROUTE_TO_HOST;
if (ex instanceof ConnectException)
return CONNECT_REFUSE;
if (ex instanceof UnknownHostException)
return UNKNOWN_HOST;
if (msg.indexOf("errno: 128") != -1) {
return NO_ROUTE_TO_HOST;
} else {
ex.printStackTrace();
return ex.toString();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -