⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 opcconnector.java

📁 用于串口通讯测试的工具软件。完全使用java语言编写。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package com.zcsoft.opc;

/**
 * <p>Title: 现场总线通讯 </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2005-2006</p>
 * <p>Company: Zhicheng Software&Service Co. Ltd.</p>
 * @author 蒋智湘
 * @version 1.0
 */
import java.util.*;
import java.net.*;
import java.io.*;

/**
 * 同OPC控制中心应用程序(OpcCtrl.exe)进行控制通讯的连接类
 * 使用方法示例:
 * <pre>
 *    OpcConnector oc = new OpcConnector();
 *    oc.setProtocol("S7");
 *    oc.setTopic("demo");
 *    Item item1 = new Item();
 *    item1.setID("DB101,DBB1,1");
 *    Item item2 = new Item();
 *    item2.setID("DB101,DBB2,1");
 *    ...
 *    ItemGroup ig1 = new ItemGroup();
 *    ig1.setName("grp1");
 *    ig1.addItem(item1);
 *    ig1.addItem(item2);
 *    ...
 *    oc.addGroup(ig1);
 *    ...
 *    oc.connect();
 * </pre>
 * 当前版本支持的最大加入的变量组的个数上限为32765。对于组上的变量个数的上限也是32765。
 *
 */
public class OpcConnector implements Runnable, CommandSent
{

	/** 通信协议,如S7。该属性值将作为实际变量名的一部分 */
	private String protocol;
	/** 通信主题,或应用程序名。该属性值将作为实际变量名的一部分 */
	private String topic;
	/** 是否已经同OPC控制中心建立好了连接 */
	private boolean connected = false;
	/** 所有数据项组的集合 */
	private List groups = new ArrayList(6);
	/** OPC控制中心程序所在机器的IP地址 */
	private InetAddress hostRequest;
	/** OPC控制中心程序接收控制请求的TCP端口 */
	private int portRequest = 8089;
	/** 接收UDP报文的线程 */
	private Thread threadRcv;
	/** 接收OPC控制中心程序发送过来的UDP报文的套接字 */
	private DatagramSocket socketRcvUDP;

	/**
	 * 构造一个在8087端口上监听UDP报文的实例
	 */
	public OpcConnector() throws SocketException
	{
		this(8087);
	}

	/**
	 *
	 * @param portReceive 该实例使用哪个本机端口接收控制中心发送过来的UDP通知报文
	 */
	public OpcConnector(int portReceive) throws SocketException
	{
		socketRcvUDP = new DatagramSocket(portReceive);
		threadRcv = new Thread(this);
		threadRcv.start();
	}

	public void run()
	{
		byte[] buffer;
		try
		{
			buffer = new byte[Math.min(4096, socketRcvUDP.getReceiveBufferSize())];
		}
		catch (SocketException ex)
		{
			ex.printStackTrace();
			return;
		}
		while (true)
		{
			try
			{
				DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
				socketRcvUDP.receive(dp);
				int readed = dp.getLength();
				int offset = dp.getOffset();
				byte[] data = dp.getData();
				byte firstByte = data[offset++];
				if (firstByte == 'C')//最频繁出现的数值变化通知
				{
					dataChanged(data, offset, readed - 1);
				}
				else if (firstByte == 'W')//异步写完成
				{
					writeCompleted(data, offset, readed - 1);
				}
				else if (firstByte == 'R')//异步读完成
				{
					dataChanged(data, offset, readed - 1);
				}
				else if(firstByte == 'E'//OPC控制中心退出
					&&	bytesABeginWithB(data, offset, new byte[]{'x', 'i', 't'}))
				{
					connected = false;
				}
				else if(firstByte == 'S'//OPC控制中心重新启动了,就再连上它
					&&	bytesABeginWithB(data, offset, new byte[]{'t', 'a', 'r', 't', 'u', 'p'}))
				{
					try
					{
						connectGroups(groups.size(), true);
					}
					catch (IOException ex)
					{
						ex.printStackTrace();
					}
				}
				else if (firstByte == 'N')//异步操作取消执行完成通知
				{
				}
				else
				{
					System.err.println("Unknown packet");
				}
			}
			catch (IOException ex)
			{
				System.err.println(ex);
				break;
			}
			catch (Exception ex)
			{
				ex.printStackTrace();
			}
		}
		threadRcv = null;
	}

	/**
	 * 判定字节数组A的前B.length各字节同字节数组B的对应字节相同
	 */
	static boolean bytesABeginWithB(byte[] A, int offset, byte[] B)
	{
		if (A.length - offset < B.length) return false;
		for (int i = 0; i < B.length; i++)
		{
			if (A[offset + i] != B[i]) return false;
		}
	  return true;
	}

	public void setProtocol(String protocol)
	{
		checkStatus();
		this.protocol = protocol;
	}

	public void setTopic(String topic)
	{
		checkStatus();
		this.topic = topic;
	}

	/**
	 * 在连接前添加一个变量值实例。
	 *
	 * @param aGroup 不为null的ItemGroup实例
	 */
	public final void addGroup(ItemGroup aGroup)
	{
		checkStatus();
		aGroup.cmdSentHook = this;
		this.groups.add(aGroup);
	}

	private void checkStatus() throws IllegalStateException
	{
		if (connected)
		{
			throw new IllegalStateException("connected");
		}
	}

	/**
	 * 建立本实例同OPC控制中心之间的连接。
	 * @throws IOException 连接过程可能出现的套接字IO异常
	 */
	public void connect() throws IOException
	{
		if (this.topic == null)
		{
			throw new IllegalStateException("Not specify topic");
		}
		if (this.protocol == null)
		{
			throw new IllegalStateException("Not specify protocol");
		}
		int cntGroups = groups.size();
		if (cntGroups == 0)
		{
			throw new OpcException("none group");
		}

		ItemGroup ig;
		int groupIndex;
		//首先查询组是否被加入
		TcpConnection conn = getConnection();
		conn.writeHeader("Q?" + DELIMETER + cntGroups);
		for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
		{
			ig = (ItemGroup)groups.get(groupIndex);
			conn.writeHeader(ig.getName());
		}
		//conn.flushHeanders();
		String hdr = conn.readHeader();
		int index = getResultCode(hdr);
		int cntGroupsNoFounded = 0;
		if (index == -1)
		{
			cntGroupsNoFounded = cntGroups;
		}
		else if (index == 0)
		{
			for (groupIndex = 0; (hdr = conn.readHeader()) != null && groupIndex < cntGroups; groupIndex++)
			{
				ig = (ItemGroup)groups.get(groupIndex);
				ig.index = index = Integer.parseInt(hdr);
//				debug(ig.getName() + "->index = " + index);
				if (index < 0)	++cntGroupsNoFounded;//累加未加入的组
				//else 记录已经加入的组,准备调用刷新操作
			}
		}
		else
		{
			conn.close();
			throw new OpcException(hdr);
		}
		conn.close();

		//如果存在已经加入的组,则刷新变量值
		if (cntGroupsNoFounded != cntGroups)
		{
			refreshGroups();
		}

		//如果还有没加入组,则尝试加入
		if (cntGroupsNoFounded > 0)
		{
			connectGroups(cntGroupsNoFounded, false);
		}
		connected = true;
	}

	public TcpConnection getConnection() throws IOException
	{
		if (hostRequest == null) hostRequest = InetAddress.getByName(null);
		return new TcpConnection(hostRequest, portRequest);
	}

	/**
	 * 将变量组加到OPC控制中心
	 * @param cntGroupsToConnect 待连接的组的个数
	 * @param refresh 是否对那些已经确定索引值的组的索引值进行再确定
	 */
	void connectGroups(int cntGroupsToConnect, boolean refresh) throws IOException
	{
		TcpConnection conn = getConnection();
		String header;
		header = "AD" + DELIMETER + cntGroupsToConnect +  DELIMETER + this.socketRcvUDP.getLocalPort();
		conn.writeHeader(header);
		//System.out.println(header);
		String idPrefix = this.protocol + ":[" + this.topic + "]";
		Item item;
		int itemIndex, itemCount;
		int groupIndex, cntGroups = groups.size();
		ItemGroup ig;
		for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
		{
			ig = (ItemGroup)groups.get(groupIndex);
			if (!refresh && ig.index != -1) continue;
			itemCount = ig.itemCount();
			header = ig.getName() + DELIMETER + 'D'//(ig.isActive()?'A':'D')
			//只能在加入后单独设定激活状态才有效。不然在这个方法还没有执行完成前,就收到了dataChanged报文
			//此时getGroupAt(int)很可能返回null
								+ DELIMETER + itemCount
								+ DELIMETER + ig.getUpdateRate();
			conn.writeHeader(header);
			//System.out.println("\t" + header);
			for (itemIndex = 0; itemIndex < itemCount; itemIndex++)
			{
				item = ig.getItem(itemIndex);
				header = idPrefix.concat(item.ID)
									  + DELIMETER + (item.active?'A':'D')
									  + DELIMETER + item.value.vt;
				conn.writeHeader(header);
				//System.out.println("\t\t" + header);
			}
		}
		//conn.flushHeanders();
		String hdr = conn.readHeader();
		if (getResultCode(hdr) != 0)
		{
			conn.close();
			throw new OpcException(hdr);
		}
		for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
		{
			ig = (ItemGroup)groups.get(groupIndex);
			if (!refresh && ig.index != -1) continue;
			hdr = conn.readHeader();
			if (hdr != null)
			{
				ig.index = Integer.parseInt(hdr);
//				debug(ig.getName() + ".index = " + ig.index);
			}
		}
		conn.close();
		//激活实际需要激活的变量组
		activateGroups();
	}


	void activateGroups() throws IOException
	{
		int groupIndex, cntGroups = groups.size();
		ItemGroup ig;
		TcpConnection conn = getConnection();
		conn.writeHeader("AC A" + DELIMETER + cntGroups);
		for (groupIndex = 0; groupIndex < cntGroups; groupIndex++)
		{
			ig = (ItemGroup)groups.get(groupIndex);
			if (ig.index >= 0 && ig.isActive())

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -