📄 gprsclientthread.java
字号:
/* * To change this template, choose Tools | Templates * and open the template in the editor. * GPRS连接线程,处理接收和发送与此连接相关的数据 */package documenteditor;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.Socket;import java.util.Date;import java.util.Properties;import java.util.Vector;import java.util.logging.Level;import java.util.logging.Logger;import javax.swing.table.DefaultTableModel;import javax.swing.tree.DefaultMutableTreeNode;/** * * @author wangzhi */public class GPRSClientThread implements Runnable{ private Socket socket = null; private Socket OutputSocket = null;//向省站发送数据的客户端联接 private InputStream socketIn = null; //从GPRS接收输入流 private OutputStream socketOut = null; //向GPRS发送输出流 private InputStream OutputsocketIn = null; //从省站服务端接收输入流 private OutputStream OutputsocketOut = null; //向省站服务端发送输出流 private boolean IsUploadToSheng = false;//是否上传到省局 private MessageQueue ToShengMessage = null;//存储要发送到省局的数据 private InputThread inputThread = null;//客户端输入线程 private MessageQueue ReceMessage = null;//接受信息列队 private MessageQueue SendMessage = null;//发送信息列队 private Date LoginTime = new Date();//登录时间 private Date LastTime = null;//最后一次接收到数据的时间 private Date LastRtdTime = null;//最后一次接收到实时数据的时间 private Date LastTimingTime = null;//最后一次接收到定时数据的时间 private boolean Shutdown = false;//是否结束线程 private StringBuffer ID = new StringBuffer("00000000000000");//终端唯一标识 private String Name = "";//终端名称,便于识记 private String InetAddress = "";//IP地址 private String Port = "";//通讯端口号 private String[] Items = null;//所涉及的十六个污染物 private String ItemsSet = "";//通道是否上传 private GCSendThread gcst = null;//发送数据线程 private DBConn dbconn = null; private DefaultMutableTreeNode tsn = null;//树型节点指针 private DefaultTableModel NowDataTableModel = new ReadOnlyTableModel();//实时数据 private DefaultTableModel TimingDataTableModel = new ReadOnlyTableModel();//定时数据 private DefaultTableModel FixDataTableModel = new ReadOnlyTableModel();//历史数据 private Properties ItmeProp = new Properties();//检测项目对应表 private boolean getfixing = false;//是否在补取历史数据 private String site_id = ""; private MessageQueue devMessage = null;//视图信息显示队列 private ChangIcoThread changIcoThread = null;//图标动画线程 private LogClass logClass = null;//写日志线程 private int split = 7;//烟道分割配置,即前多少个项目属于第一烟道 private Vector UseSetChannV = new Vector();//用于设置的通道集合 private Vector UseIsChannV = new Vector();//用于设置的通道是否上传集合 private String MN_ = "00000000000000";//用于上传到省局的MN号 private String ST = "32";//系统编号:水污染源32,气污染源31 public GPRSClientThread(Socket socket,DBConn dbconn,MessageQueue devMessage,ChangIcoThread changIcoThread,LogClass logClass){ try { this.socket = socket; this.devMessage = devMessage; this.changIcoThread = changIcoThread; this.logClass = logClass; InetAddress = socket.getInetAddress().toString(); Port = Integer.toString(socket.getPort()); socketIn = socket.getInputStream(); socketOut = socket.getOutputStream(); ReceMessage = new MessageQueue(500); SendMessage = new MessageQueue(500); ToShengMessage = new MessageQueue(500); LastTime = new Date(); LastRtdTime = new Date(); LastTimingTime = new Date(); Items = new String[16]; gcst = new GCSendThread(this); gcst.start(); this.dbconn = dbconn; InputStream in = getClass().getResourceAsStream("ItmeConfig.properties"); ItmeProp.load(in); in.close(); setTsn(new DefaultMutableTreeNode(this)); } catch (IOException ex) { Logger.getLogger(GPRSClientThread.class.getName()).log(Level.SEVERE, null, ex); } } public void InitOutputSocket(){ try { IsUploadToSheng = true; //每次都断掉重连 if(OutputSocket!=null){ OutputSocket.close(); } OutputSocket = new Socket("125.32.96.146", 8888); logClass.SendWriteLog("省局服务端连接成功:125.32.96.146:8888", "Output"+ID.toString()); //OutputSocket = new Socket("127.0.0.1", 8888); OutputsocketIn = OutputSocket.getInputStream(); OutputsocketOut = OutputSocket.getOutputStream(); inputThread = new InputThread(OutputsocketIn, SendMessage, logClass); inputThread.start(); } catch (IOException ex) { //Logger.getLogger(GPRSClientThread.class.getName()).log(Level.SEVERE, null, ex); logClass.SendWriteLog("省局服务端连接失败:125.32.96.146:8888", "Output"+ID.toString()); OutputSocket = null; } } public GPRSClientThread(){ } //每隔一秒钟从终端接受一次数据 public void run(){ while(!Shutdown){ try { byte[] buff = new byte[10240]; ByteArrayOutputStream buffer= new ByteArrayOutputStream(); int Len = -1; if((Len=socketIn.read(buff))!=-1){ buffer.write(buff, 0, Len); String ReStr = new String(buffer.toByteArray()); setLastTime(new Date()); String[] Strs = ReStr.split("\r\n"); for(int i=0;i<Strs.length;i++){ ReceMessage.Enqueue(Strs[i]); if(!ID.toString().equals("00000000000000")){ getLogClass().SendWriteLog(Strs[i], ID.toString()); if((OutputSocket==null)&&IsUploadToSheng){ InitOutputSocket(); } if(OutputSocket!=null){ String temp = Strs[i]; try{ while(ToShengMessage.size()>0){ String old = ToShengMessage.Dequeue(); OutputsocketOut.write(old.getBytes()); logClass.SendWriteLog(old, "Output"+ID.toString()); } temp = temp.replaceAll(ID.toString(), MN_); if(temp.indexOf("Avg")>0){ temp = temp.replaceAll("CN=2011", "CN=2051"); temp = temp.replaceAll("231-", "B01-"); temp = Protocol_WR_WDC07A.AddMaxMin(temp); temp = Protocol_WR_WDC07A.ExchangeCRC(temp); InitOutputSocket(); OutputsocketOut.write(temp.getBytes()); logClass.SendWriteLog(temp, "Output"+ID.toString()); } } catch (IOException ex) { logClass.SendWriteLog("省局服务端异常:125.32.96.146:8888", "Output"+ID.toString()); OutputSocket = null; ToShengMessage.Enqueue(temp); } } } } changIcoThread.setReceing(true); } data_processing(); Thread.sleep(1000); } catch (IOException ex) { try { //Logger.getLogger(GPRSClientThread.class.getName()).log(Level.SEVERE, null, ex); Shutdown = true; socket.close(); } catch (IOException ex1) { Logger.getLogger(GPRSClientThread.class.getName()).log(Level.SEVERE, null, ex1); } } catch (InterruptedException ex) { Logger.getLogger(GPRSClientThread.class.getName()).log(Level.SEVERE, null, ex); } } devMessage.Enqueue("#站点:<"+site_id+">"+Name+"离线("+InetAddress+":"+Port+") 时间:<"+Protocol_WR_WDC07A.df3.format(new Date())+">"); } //处理接受到的数据 private void data_processing(){ while(ReceMessage.size()>0){ byte[] bytes = ReceMessage.DequeueByte(); String CN = Protocol_WR_WDC07A.GetCN(bytes); if(CN.equals("5061")){ String MN = Protocol_WR_WDC07A.GetValue("MN",Protocol_WR_WDC07A.GetCP(bytes)); if(!MN.equals("")){ ID.delete(0, ID.length()); getID().append(MN); dbconn.SendToDBMessage(new ToDBConnMessage(this, new String(bytes))); } }else if(CN.equals("5021")){ String Channel = Protocol_WR_WDC07A.GetChannel(bytes); String[] Strs = Channel.split(","); UseSetChannV.clear(); for(int i=0;i<Strs.length;i++){ SetItem(i, Strs[i]); UseSetChannV.add(Strs[i]+"--"+ItmeProp.getProperty(Strs[i])); } }else if(CN.equals("5011")){ String CP = Protocol_WR_WDC07A.GetCP(bytes); String TDSet = Protocol_WR_WDC07A.GetValue("TDSet", CP); ItemsSet = TDSet; SetItems_(); }else if(CN.equals("2011")){ if(ID.toString().equals("00000000000000")){ //String MN = Protocol_WR_WDC07A.GetValue("MN",Protocol_WR_WDC07A.GetCP(bytes)); String MN = Protocol_WR_WDC07A.GetID(bytes); if(!MN.equals("")){ ID.delete(0, ID.length()); getID().append(MN); String S5061 = new String(bytes); S5061 = S5061.replaceAll("CN=2011", "CN=5061"); S5061 = S5061.replaceAll("CP=&&", "CP=&&MN="+MN+";"); logClass.SendWriteLog(S5061, "BieJing"); dbconn.SendToDBMessage(new ToDBConnMessage(this, S5061)); } } if(ItemsSet.equals("")){ Vector ReiVector = Protocol_WR_WDC07A.GetItms(bytes); for(int i=0;i<ReiVector.size();i++){ SetItem(i, (String)ReiVector.get(i)); ItemsSet = ItemsSet +"1"; } SetItems_(); } Vector ReVector = Protocol_WR_WDC07A.GetValues(bytes); String Str = new String(bytes);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -