📄 connectionbean.java
字号:
mTimer = null;
addPacketListener(new ConnectionBean.PacketStreamMonitor());
}
/**
* 使用默认端口于指定的服务器建立连接。
*
* @param host <code>InetAddress</code>主机地址
* @exception java.io.IOException socket error in backend
*/
public void connect(InetAddress host) throws IOException
{
connect(host, host.getHostName(), DEFAULT_PORT);
}
/**
* 与指定的服务器端口建立连接。
*
* @param host <code>InetAddress</code> 主机地址
* @param port an <code>int</code> 端口号
* @exception java.io.IOException socket error in backend
*/
public synchronized void connect(InetAddress host, int port)
throws IOException
{
//setConnecting();
connect(host, host.getHostName(), port);
}
/**
* 与指定的服务器端口建立连接。
*
* @param host <code>InetAddress</code> 主机地址
* @param name <code>String</code> 主机名
* @param port an <code>int</code> 端口号
* @exception java.io.IOException socket error in backend
*/
public synchronized void connect(InetAddress host, String name, int port)
throws IOException
{
setConnecting();
Socket s = new Socket(host, port);
connect(s);
}
/**
* 开始和服务器通信,发送认证包。
*
* @param s <code>Socket</code> 已经建立连接socket。
*/
public synchronized void connect(Socket s) throws IOException
{
if (socket != null)
disconnect();
setConnecting();
setSocket(s);
connect(s.getInputStream(), s.getOutputStream());
}
/**
*开始和服务器通信,发送认证包。
*
* @param is <code>InputStream</code> 从服务器发来的数据输入流。
* @param os <code>OutputStream</code> 发送到服务器的数据输出流。
*/
private synchronized void connect(InputStream is, OutputStream os)
throws IOException
{
output = new OutputStreamHandler(osi);
output.setOutputStream(os);
output.start();
input = new InputStreamHandler(isi);
input.setInputStream(is);
input.start();
//TODO 加入开始的包
SimpleDateFormat formatter = new SimpleDateFormat("MMddHHmmss");
Date currentTime = new Date();
String dateString = formatter.format(currentTime);
// dateString="0913185441";
CmppConnectBuilder ccb = new CmppConnectBuilder();
ccb.setSourceAddr(sourceAddr);
ccb.setSpPassword(password);
ccb.setTimestamp(dateString);
send(ccb.builder());
}
/**
* 设置当前的连接状态为STATE_CONNECTING。
*/
private void setConnecting()
{
sendPacketList.clear();
if (status == ConnectionEvent.STATE_DISCONNECTED)
{
setConnectionState(
ConnectionEvent.STATE_CONNECTING,
ConnectionEvent.REASON_CLIENT_INITIATED);
idBuilder.resetSequenceId();
}
}
/**
* 设置socket.
* @param s <code>Socket</code> socket
*/
private synchronized void setSocket(Socket s)
{
socket = s;
}
/**
* 断开于服务器的连接,并且折纸断开状态,发送中断连接包。
*/
public synchronized void disconnect()
{
if (status.getValue() == ConnectionEvent.EState.CONNECTED)
{
CmppTerminateBuilder ctb = new CmppTerminateBuilder();
//ct.setSequenceId(idBuilder.getSequenceId());
// ctb.CreateNextSequenceId();
send(ctb.builder());
} else
{
disconnected(ConnectionEvent.REASON_CLIENT_INITIATED);
}
}
/**
* 断开于服务器的连接,设置断开状态。
*
* @param reason <code>ConnectionEvent.EReason</code> 断开的原因
*/
private synchronized void disconnected(ConnectionEvent.EReason reason)
{
if (mTimer != null)
mTimer.interrupt();
mTimer = null;
sendPacketList.clear();
if (socket != null)
{
try
{
if (input != null)
input.shutdown();
if (output != null)
output.shutdown();
socket.close();
try
{
input.join();
} catch (InterruptedException e)
{
}
} catch (IOException e)
{
/* do nothing */
}
socket = null;
}
input = null;
output = null;
setConnectionState(ConnectionEvent.STATE_DISCONNECTED, reason);
}
/**
* <code>addConnectionListener</code> 为连接状态改变的监听器列表增加一个
* <code>ConnectionListener</code> 。
*
* @param l <code>ConnectionListener</code>。相同的ConnectionListener不能增加两次。
*/
public final synchronized void addConnectionListener(ConnectionListener l)
{
if (!connectionListeners.contains(l))
connectionListeners.addElement(l);
}
/**
* <code>delConnectionListener</code>连接状态改变的监听器列表删除一个
* <code>ConnectionListener</code> 。
*
* @param l <code>ConnectionListener</code> 值
*/
public final synchronized void delConnectionListener(ConnectionListener l)
{
if (packetListeners.contains(l))
connectionListeners.removeElement(l);
}
/**
* <code>addPacketListener</code> 为连接包监听器列表增加一个
* <code>PacketListener</code>,用于监听已经发送或者受到的包 。
*
* @param l <code>PacketListener</code> 值
*/
public final synchronized void addPacketListener(PacketListener l)
{
if (!packetListeners.contains(l))
packetListeners.addElement(l);
}
/**
* <code>addPacketListener</code> 为连接包监听器列表增加一个
* <code>PacketListener</code>。
*
* @param l a <code>PacketListener</code> 值
*/
public final synchronized void delPacketListener(PacketListener l)
{
if (packetListeners.contains(l))
packetListeners.removeElement(l);
}
/**
* <code>send</code> 所有发送包需要调用的方法。
* <code>PacketListener</code>监听到数据包是否发送成功。
* @param packet <code>Packet</code> 发送到服务器的包。
*/
public void send(Packet packet)
{
if (!((CmppPacket) packet).isResponse())
{
sendPacketList.add(packet);
}
_log.log(
LogCommon.DEBUG_LEVEL,
"sendPacketList=" + ConnectionBean.this.sendPacketList.size());
/**TODO 改变 */
// if (status.getValue() == status.CONNECTED)
// return;
if (output != null)
output.send(packet);
}
/**
* <code>received</code> 为接收到包发送通知的方法. 这个方法仅仅只被<code>InputStreamHandler</code>调用,不要在其它的地方使用这个方法。
*
* @param packet a <code>Packet</code> 接收包.
*/
private void received(Packet packet)
{
fireReceivedPacket(packet);
}
/**
* <code>received</code> 为发送包发送通知的方法. 这个方法仅仅只被<code>OutputStreamHandler</code>调用,不要在其它的地方使用这个方法。
*
* @param packet a <code>Packet</code> 发送包.
*
*/
private void sent(Packet packet)
{
fireSendPacket(packet);
}
/**
* <code>sendFailed</code> 发送发送失败的通知, 这个方法仅仅只被<code>OutputStreamHandler</code>调用,不要在其它的地方使用这个方法。
*
* @param packet a <code>Packet</code> 发送包.
*/
private void sendFailed(Packet packet)
{
fireUnsendPacket(packet);
}
/**
* 发送接收到数据包的事件,这个方法在<code>InputStreamHandler</code>调用。
*
* @param <code>Packet</code> 发送到所有监听的包。
*/
protected final void fireReceivedPacket(Packet packet)
{
try
{
// 建立一个广播事件。
PacketEvent pe = new PacketEvent(this, packet);
Vector broadcast = (Vector) packetListeners.clone();
// 广播到所有的监听。
for (Enumeration e = broadcast.elements(); e.hasMoreElements();)
{
try
{
((PacketListener) e.nextElement()).receivedPacket(pe);
} catch (Throwable e1)
{
}
}
} catch (Throwable e)
{
}
}
/**
* 广播已经发送数据包的事件,这个方法在<code>OutputStreamHandler</code>调用。
*
* @param <code>Packet</code> 发送到所有监听的包。
*/
protected final void fireSendPacket(Packet packet)
{
try
{
// 建立一个广播事件。
PacketEvent pe = new PacketEvent(this, packet);
Vector broadcast = (Vector) packetListeners.clone();
// 广播到所有的监听。
for (Enumeration e = broadcast.elements(); e.hasMoreElements();)
{
try
{
((PacketListener) e.nextElement()).sentPacket(pe);
} catch (Throwable e1)
{
}
}
} catch (Throwable e)
{
}
}
/**
* 广播已经发送数据包失败的事件,这个方法在<code>OutputStreamHandler</code>调用。
*
* @param <code>Packet</code> 发送到所有监听的包。
*/
protected final void fireUnsendPacket(Packet packet)
{
try
{
// 建立一个广播事件。
PacketEvent pe = new PacketEvent(this, packet);
Vector broadcast = (Vector) packetListeners.clone();
// 广播到所有的监听。
for (Enumeration e = broadcast.elements(); e.hasMoreElements();)
{
try
{
((PacketListener) e.nextElement()).sendFailed(pe);
} catch (Throwable e1)
{
}
}
} catch (Throwable e)
{
}
}
/**
* 响应OutputStreamHandler挂了.
*/
public void onOutputDeath(Exception e)
{
disconnected(ConnectionEvent.REASON_IO_ERROR);
// throw new RuntimeException("Death of output thread: " + e.toString());
}
/**
*响应InputStreamHandler挂了.
*/
public void onInputDeath(Exception e)
{
disconnected(ConnectionEvent.REASON_IO_ERROR);
// throw new RuntimeException("Death of input thread: " + e.toString());
}
/**
* 返回当前当前连接状态。
*/
public final ConnectionEvent.EState getConnectionState()
{
return status;
}
/**
*设置连接状态,广播状态改变的事件。
*
* @param state 新的状态。
* @param reason 改变状态的原因。
*
* @see ConnectionEvent.EState
*/
protected final void setConnectionState(
ConnectionEvent.EState state,
ConnectionEvent.EReason reason)
{
ConnectionEvent ce =
new ConnectionEvent(this, state, getConnectionState(), reason);
this.status = state;
Vector broadcast = (Vector) connectionListeners.clone();
Enumeration e = broadcast.elements();
//广播事件
while (e.hasMoreElements())
{
try
{
((ConnectionListener) e.nextElement()).connectionChanged(ce);
} catch (Throwable e1)
{
}
}
}
/**
* 响应定时器
*
*/
private void onTimer()
{
_log.log(LogCommon.DEBUG_LEVEL ,"onTimer!!!");
mTimer.setEnableTimer(false);
for (Iterator iter = sendPacketList.values().iterator();
iter.hasNext();
)
{
try
{
SendPacketState sp = (SendPacketState) iter.next();
if (sp.isOverTime())
{
sp.addSendNum();
_log.fine("overTime packet");
if (sp.getSendNum() > RESENDCOUNT)
{
sendFailed(sp.getSendPacket());
} else
{
// output.send(sp.getPacket());
sp.addSendNum();
}
}
} catch (Throwable e1)
{
}
}
mTimer.setEnableTimer(true);
}
/**
* 响应测试链路
*
*/
private void onActiveConnect()
{
_log.fine("onActiveConnect!!!");
mTimer.setEnableActiveTimer(false);
CmppActiveTestBuilder catb = new CmppActiveTestBuilder();
send(catb.builder());
mTimer.setEnableActiveTimer(true);
}
/**
* 得到当前没有接收到回复包的发送包的数量。
* @return 发送包的数量
*/
public int getSendPackedNumber()
{
return sendPacketList.size();
}
static {
_log = Logger.getLogger("com.ll.smsbeans.ConnectBean");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -