📄 connection.java
字号:
if(session==null){
log.error("发送消息失败,消息:"+ msg + " 原因:session=null "+this );
return false;
}
return session.write(new LbpMessageAdapter(msg));
}
/**
* 发送数据时候使用,无超时时间,如果发送失败即返回
* @param msg 要发送的消息包
* @return true 发送成功,false 发送失败
* @throws IOException
*/
public boolean sendMessage(Message msg) {
return session.write(msg);
}
/**
* 发送数据包的方法,
* @param msg 要发送的消息包
* @param timeOut 超时时间,
* @return true 发送成功,false 发送失败
* @throws IOException
*/
public boolean sendMessage(Message msg, long timeOut) {
return session.write(msg, timeOut);
}
/**
* 检查链路检测时间是否到
* @return true 时间到,false时间未到
*/
public boolean checkLinkTimeExpired() {
return ((System.currentTimeMillis() - getLastReceiveTime()) > config
.getEnquireLinkTime() && ((System.currentTimeMillis() - getLastLinkTime()) > config
.getEnquireLinkTime()));
}
/**
* 检查链路检测时间是否超时
* @return true 超时,false未超时
*/
public boolean checkLinkTimeOut() {
return ((System.currentTimeMillis() - getLastLinkTime()) >= config
.getLinkTimeOut())
&& (getLastReceiveTime() < getLastLinkTime());
}
/**
* 检查流量阀值是否超出
* @return true 超出,false未超出
*/
public boolean checkReceiveFluxFlow() {
return (config.getReceiveFlux() > config.getMaxReceiveFlux());
}
/**
* 对接收流量加1,为了性能不用同步方法,因为流量增加允许有一点误差
* public synchronized void addReceiveFlux()
*/
public void addReceiveFlux() {
config.setReceiveFlux((config.getReceiveFlux() + 1));
}
/**
* 检查链路检测时间是否到
* @param conn
* @return
*/
public boolean checkMaxLinkFailCount() {
return (linkFailCount >= config.getMaxLinkFailCount());
}
/**
* 删除等待的消息包
* @param seq 消息seq
* @return
*/
protected LbpMessage removeWaitPacket(Integer seq) {
return (LbpMessage) waitPackets.remove(seq);
}
public String getConnName(){
return getConfig().getEmseInfo().getEmseNo();
}
/**
* 发送消息事件的方法,为空
* @param session 接收消息的会话
* @param message 具体的消息
*/
public void messageSent(Session session, Message message) {
if(log.isDebugEnabled()){
log.info("消息发往:" +this + message);
}
try{
if(log.isInfoEnabled()){
if(message instanceof LbpMessageAdapter){
log.info("消息发往[" + getConnName()+ "," + getConnID()+ "] " + ((LbpMessageAdapter)message).getLbpMessage().getSimpleInfo());
}else{
log.info("消息发往["+ getConnName()+ "," + getConnID()+ "] " + message);
}
}
}catch(Exception e){
log.error(e);
}
}
/**
* 链路空闲事件的方法,为空
* @param session 接收消息的会话
* @param message 具体的消息
*/
public void sessionIdle(Session session) {
if(log.isDebugEnabled()){
log.debug("sessionIdle:" +this );
}
}
/**
* 会话连接事件的方法
* @param session 会话
*/
public void connectionEstablished(Session session) {
setConnStatus(CONNECT);
if(log.isInfoEnabled()){
log.info( "连接建立完成: " + this);
}
clearLinkFail();//连接失败次数重新置位
//通知 InteIoProcess 连接建立
getInteIoProcess().connectionEstablished(this);
}
/**
* 会话关闭事件的方法
* @param session 会话
*/
public void connectionClosed(Session session) {
setConnStatus(DISCONNECT);
log.warn("连接已关闭 "+this);
//通知 InteIoProcess 连接断开
getInteIoProcess().connectionClosed(this);
}
/**
* 异常处理方法,须进一步完善
* @param session 连接对象
* @param throwable 异常对象
*/
public void exceptionCaught(Session session, Throwable throwable) {
if(throwable instanceof ConnectException) {
this.close();
log.error(this+"网络连接错误",throwable);
}else if(throwable instanceof IOException ){
this.close();
log.error(this + "IO异常:",throwable);
}else if(throwable instanceof MessageParseException){
this.close();
log.error(this + "消息解析异常:",throwable);
}else{
this.close();
log.error(this,throwable);
}
}
public void connected() {
session.start();
}
public boolean isConnected() {
return (session!=null && session.isConnected() && this.getConnStatus() != DISCONNECT);
}
public void close() {
if (lockMap != null) {
lockMap.clear();
}
if (waitPackets != null) {
waitPackets.clear();
}
this.clearLinkFail();
setConnStatus(DISCONNECT);
if (session!=null && session.isStarted()) {
session.close();
}
}
public ConnConfigMBean getConfig() {
return config;
}
public EventDispatcher getEventDispatcher() {
return eventDispatcher;
}
public void setEventDispatcher(EventDispatcher eventDispatcher) {
this.eventDispatcher = eventDispatcher;
}
public IoProcessor getIoProcessor() {
return ioProcessor;
}
public void setIoProcessor(IoProcessor ioProcessor) {
this.ioProcessor = ioProcessor;
}
public void setConfig(ConnConfigMBean config) {
this.config = config;
}
public MessageRecognizer getMsgRecognizer() {
return msgRecognizer;
}
public void setMsgRecognizer(MessageRecognizer msgRecognizer) {
this.msgRecognizer = msgRecognizer;
}
public int getConnStatus() {
return getConfig().getConnStatus();
}
public boolean isLoggin() {
return getConnStatus() == Connection.LOGIN_ON;
}
/**
* Returns millis time that I/O occurred last.
*/
public long getLastIoTime() {
return session.getLastIoTime();
}
/**
* Returns <code>true</code> if and only if this session is idle.
*/
public boolean isIdle() {
return session.isIdle();
}
public long getLastLinkTime() {
return getConfig().getLastLinkTime();
}
protected void setLastLinkTime(long lastLinkTime) {
getConfig().setLastLinkTime(lastLinkTime);
}
public long getLastReceiveTime() {
return getConfig().getLastReceiveTime();
}
public void setLastReceiveTime(long time) {
getConfig().setLastReceiveTime(time);
}
public int getReceiveFlux() {
return getConfig().getReceiveFlux();
}
public void setReceiveFlux(int receiveFlux) {
getConfig().setReceiveFlux(receiveFlux);
}
public int getSendFlux() {
return getConfig().getSendFlux();
}
public void setSendFlux(int sendFlux) {
getConfig().setSendFlux(sendFlux);
}
protected void setConnStatus(int connStatus) {
this.getConfig().setConnStatus(connStatus);
}
public int getBindMode() {
return bindMode;
}
public void setBindMode(int bindMode) {
this.bindMode = bindMode;
}
/**
* 链路测试失败次数
* @return
*/
public int getLinkFailCount() {
return linkFailCount;
}
public void addLinkFailCount() {
linkFailCount++;
}
public void clearLinkFail() {
linkFailCount = 0;
}
/**
* 销毁对像
*/
public void destroy(){
close();
//session =null;
lockMap = null;
waitPackets = null;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("conn_id=" + this.getConfig().getConnID());
if(session==null){
if(config!=null){
sb.append(" SocketAddress host=" +config.getHost() +" port="+ config.getPort());
}else{
sb.append(" session=null");
}
}else{
sb.append(" SocketAddress=" + session.getSocketAddress());
sb.append(" connected="+session.isConnected());
}
return sb.toString();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -