📄 opcconnector.java
字号:
package com.zcsoft.opc;
/**
* <p>Title: 现场总线通讯 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005-2006</p>
* <p>Company: Zhicheng Software&Service Co. Ltd.</p>
* @author 蒋智湘
* @version 1.0
*/
import java.util.*;
import java.net.*;
import java.io.*;
/**
* 同OPC控制中心应用程序(OpcCtrl.exe)进行控制通讯的连接类
* 使用方法示例:
* <pre>
* OpcConnector oc = new OpcConnector();
* oc.setProtocol("S7");
* oc.setTopic("demo");
* Item item1 = new Item();
* item1.setID("DB101,DBB1,1");
* Item item2 = new Item();
* item2.setID("DB101,DBB2,1");
* ...
* ItemGroup ig1 = new ItemGroup();
* ig1.setName("grp1");
* ig1.addItem(item1);
* ig1.addItem(item2);
* ...
* oc.addGroup(ig1);
* ...
* oc.connect();
* </pre>
* 当前版本支持的最大加入的变量组的个数上限为32765。对于组上的变量个数的上限也是32765。
*
*/
public class OpcConnector implements Runnable, CommandSent
{
/** 通信协议,如S7。该属性值将作为实际变量名的一部分 */
private String protocol;
/** 通信主题,或应用程序名。该属性值将作为实际变量名的一部分 */
private String topic;
/** 是否已经同OPC控制中心建立好了连接 */
private boolean connected = false;
/** 所有数据项组的集合 */
private List groups = new ArrayList(6);
/** OPC控制中心程序所在机器的IP地址 */
private InetAddress hostRequest;
/** OPC控制中心程序接收控制请求的TCP端口 */
private int portRequest = 8089;
/** 接收UDP报文的线程 */
private Thread threadRcv;
/** 接收OPC控制中心程序发送过来的UDP报文的套接字 */
private DatagramSocket socketRcvUDP;
/**
* 构造一个在8087端口上监听UDP报文的实例
*/
public OpcConnector() throws SocketException
{
this(8087);
}
/**
*
* @param portReceive 该实例使用哪个本机端口接收控制中心发送过来的UDP通知报文
*/
public OpcConnector(int portReceive) throws SocketException
{
socketRcvUDP = new DatagramSocket(portReceive);
threadRcv = new Thread(this);
threadRcv.start();
}
public void run()
{
byte[] buffer;
try
{
buffer = new byte[Math.min(4096, socketRcvUDP.getReceiveBufferSize())];
}
catch (SocketException ex)
{
ex.printStackTrace();
return;
}
while (true)
{
try
{
DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
socketRcvUDP.receive(dp);
int readed = dp.getLength();
int offset = dp.getOffset();
byte[] data = dp.getData();
byte firstByte = data[offset++];
if (firstByte == 'C')//最频繁出现的数值变化通知
{
dataChanged(data, offset, readed - 1);
}
else if (firstByte == 'W')//异步写完成
{
writeCompleted(data, offset, readed - 1);
}
else if (firstByte == 'R')//异步读完成
{
dataChanged(data, offset, readed - 1);
}
else if(firstByte == 'E'//OPC控制中心退出
&& bytesABeginWithB(data, offset, new byte[]{'x', 'i', 't'}))
{
connected = false;
}
else if(firstByte == 'S'//OPC控制中心重新启动了,就再连上它
&& bytesABeginWithB(data, offset, new byte[]{'t', 'a', 'r', 't', 'u', 'p'}))
{
try
{
connectGroups(groups.size(), true);
}
catch (IOException ex)
{
ex.printStackTrace();
}
}
else if (firstByte == 'N')//异步操作取消执行完成通知
{
}
else
{
System.err.println("Unknown packet");
}
}
catch (IOException ex)
{
System.err.println(ex);
break;
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
threadRcv = null;
}
/**
* 判定字节数组A的前B.length各字节同字节数组B的对应字节相同
*/
static boolean bytesABeginWithB(byte[] A, int offset, byte[] B)
{
if (A.length - offset < B.length) return false;
for (int i = 0; i < B.length; i++)
{
if (A[offset + i] != B[i]) return false;
}
return true;
}
public void setProtocol(String protocol)
{
checkStatus();
this.protocol = protocol;
}
public void setTopic(String topic)
{
checkStatus();
this.topic = topic;
}
/**
* 在连接前添加一个变量值实例。
*
* @param aGroup 不为null的ItemGroup实例
*/
public final void addGroup(ItemGroup aGroup)
{
checkStatus();
aGroup.cmdSentHook = this;
this.groups.add(aGroup);
}
private void checkStatus() throws IllegalStateException
{
if (connected)
{
throw new IllegalStateException("connected");
}
}
/**
* 建立本实例同OPC控制中心之间的连接。
* @throws IOException 连接过程可能出现的套接字IO异常
*/
public void connect() throws IOException
{
if (this.topic == null)
{
throw new IllegalStateException("Not specify topic");
}
if (this.protocol == null)
{
throw new IllegalStateException("Not specify protocol");
}
int cntGroups = groups.size();
if (cntGroups == 0)
{
throw new OpcException("none group");
}
ItemGroup ig;
int groupIndex;
//首先查询组是否被加入
TcpConnection conn = getConnection();
conn.writeHeader("Q?" + DELIMETER + cntGroups);
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
conn.writeHeader(ig.getName());
}
//conn.flushHeanders();
String hdr = conn.readHeader();
int index = getResultCode(hdr);
int cntGroupsNoFounded = 0;
if (index == -1)
{
cntGroupsNoFounded = cntGroups;
}
else if (index == 0)
{
for (groupIndex = 0; (hdr = conn.readHeader()) != null && groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
ig.index = index = Integer.parseInt(hdr);
// debug(ig.getName() + "->index = " + index);
if (index < 0) ++cntGroupsNoFounded;//累加未加入的组
//else 记录已经加入的组,准备调用刷新操作
}
}
else
{
conn.close();
throw new OpcException(hdr);
}
conn.close();
//如果存在已经加入的组,则刷新变量值
if (cntGroupsNoFounded != cntGroups)
{
refreshGroups();
}
//如果还有没加入组,则尝试加入
if (cntGroupsNoFounded > 0)
{
connectGroups(cntGroupsNoFounded, false);
}
connected = true;
}
public TcpConnection getConnection() throws IOException
{
if (hostRequest == null) hostRequest = InetAddress.getByName(null);
return new TcpConnection(hostRequest, portRequest);
}
/**
* 将变量组加到OPC控制中心
* @param cntGroupsToConnect 待连接的组的个数
* @param refresh 是否对那些已经确定索引值的组的索引值进行再确定
*/
void connectGroups(int cntGroupsToConnect, boolean refresh) throws IOException
{
TcpConnection conn = getConnection();
String header;
header = "AD" + DELIMETER + cntGroupsToConnect + DELIMETER + this.socketRcvUDP.getLocalPort();
conn.writeHeader(header);
//System.out.println(header);
String idPrefix = this.protocol + ":[" + this.topic + "]";
Item item;
int itemIndex, itemCount;
int groupIndex, cntGroups = groups.size();
ItemGroup ig;
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
if (!refresh && ig.index != -1) continue;
itemCount = ig.itemCount();
header = ig.getName() + DELIMETER + 'D'//(ig.isActive()?'A':'D')
//只能在加入后单独设定激活状态才有效。不然在这个方法还没有执行完成前,就收到了dataChanged报文
//此时getGroupAt(int)很可能返回null
+ DELIMETER + itemCount
+ DELIMETER + ig.getUpdateRate();
conn.writeHeader(header);
//System.out.println("\t" + header);
for (itemIndex = 0; itemIndex < itemCount; itemIndex++)
{
item = ig.getItem(itemIndex);
header = idPrefix.concat(item.ID)
+ DELIMETER + (item.active?'A':'D')
+ DELIMETER + item.value.vt;
conn.writeHeader(header);
//System.out.println("\t\t" + header);
}
}
//conn.flushHeanders();
String hdr = conn.readHeader();
if (getResultCode(hdr) != 0)
{
conn.close();
throw new OpcException(hdr);
}
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
if (!refresh && ig.index != -1) continue;
hdr = conn.readHeader();
if (hdr != null)
{
ig.index = Integer.parseInt(hdr);
// debug(ig.getName() + ".index = " + ig.index);
}
}
conn.close();
//激活实际需要激活的变量组
activateGroups();
}
void activateGroups() throws IOException
{
int groupIndex, cntGroups = groups.size();
ItemGroup ig;
TcpConnection conn = getConnection();
conn.writeHeader("AC A" + DELIMETER + cntGroups);
for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
{
ig = (ItemGroup)groups.get(groupIndex);
if (ig.index >= 0 && ig.isActive())
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -