⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.java

📁 Java写的ERP系统
💻 JAVA
字号:
/******************************************************************************
 * The contents of this file are subject to the   Compiere License  Version 1.1
 * ("License"); You may not use this file except in compliance with the License
 * You may obtain a copy of the License at http://www.compiere.org/license.html
 * Software distributed under the License is distributed on an  "AS IS"  basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
 * the specific language governing rights and limitations under the License.
 * The Original Code is                  Compiere  ERP & CRM  Business Solution
 * The Initial Developer of the Original Code is Jorg Janke  and ComPiere, Inc.
 * Portions created by Jorg Janke are Copyright (C) 1999-2001 Jorg Janke, parts
 * created by ComPiere are Copyright (C) ComPiere, Inc.;   All Rights Reserved.
 * Contributor(s): ______________________________________.
 *****************************************************************************/
package org.compiere.util;

import java.io.*;
import java.sql.*;
import java.util.*;
//import oracle.AQ.*;

import org.compiere.db.CConnection;

/**
 *  Message Queuing
 *
 *  @author 	Jorg Janke
 *  @version 	$Id: Queue.java,v 1.7 2002/05/16 21:51:17 jjanke Exp $
 */
public final class Queue implements Serializable
{
/*
	private static Connection	s_connection;
	private static AQSession	s_session;
	private static AQQueue		s_queueS;
	private static AQQueue		s_queueC;
	//
	private static String		s_queueName = "AQ_";
	private static String		s_toServer = "S";
	private static String		s_toClient = "C";
	private static String		s_table_extension = "_Tab";


	/**
	 *	Create AQ Session
	 *
	private static AQSession createSession()
	{
		Log.trace(Log.l3_Util, "Queue.createSession");
		if (s_connection == null)	//	autoCommit
			//	get dedicated
			s_connection = DB.createConnection(true, Connection.TRANSACTION_SERIALIZABLE);

		AQSession aq_sess = null;
		try
		{
			Class.forName("oracle.AQ.AQOracleDriver");
		}
		catch (ClassNotFoundException e)
		{
			Log.error("Queue.createSession (Driver)", e);
		}
		try
		{
			aq_sess = AQDriverManager.createAQSession(s_connection);
		}
		catch (Exception e)
		{
			Log.error("Queue.createSession", e);
		}

		return aq_sess;
	}	//	createSession

	/**
	 *	Get AQ Session
	 *
	private static AQSession getSession()
	{
		if (s_session == null)
			s_session = createSession();
		return s_session;
	}	//	getSession

	/**
	 *	Create Queue
	 *
	private static AQQueue createQueue(String name)
	{
		Log.trace(Log.l3_Util, "Queue.createQueue - " + name);
		//
		AQSession session = getSession();
		if (session == null)
			return null;

		AQQueue queue = null;
		try
		{
			//	Payload type: raw	- Age Ordered
			AQQueueTableProperty t_property = new AQQueueTableProperty("RAW");
			t_property.setComment("Compiere Client/Server Communication Table");
			t_property.setSortOrder("ENQ_TIME");
			t_property.setMultiConsumer(false);
		//	t_property.setCompatible("8.1");
			//	Create table in the Compiere scheme
			String tabName = name + s_table_extension;
			AQQueueTable table = session.createQueueTable(CConnection.get().getDbUid(),
				tabName, t_property);
			Log.trace(Log.l5_DData, "Queue Table created - " + tabName);

			//	Create Queue property
			AQQueueProperty q_property = new AQQueueProperty();
			q_property.setComment("Compiere Client/Server Communication Queue");
			q_property.setRetentionTime(AQQueueProperty.INFINITE);
			queue = session.createQueue(table, name, q_property);
			Log.trace(Log.l5_DData, "Queue created - " + name);
		}
		catch (Exception e)
		{
			Log.error("Queue.createQueue", e);
		}
		//
		return queue;
	}	//	createQueue

	/**
	 *	Get Queue "Test"
	 *
	protected static AQQueue getQueue(boolean toServer, boolean reset)
	{
		if (reset)
			close(false);
		//	Server Queue
		if (toServer && s_queueS == null)
		{
			AQSession session = getSession();
			try
			{
				s_queueS = session.getQueue(CConnection.get().getDbUid(),
					s_queueName+s_toServer);
			}
			catch (AQException e)
			{
				try
				{
					//	No Queue - So create it
					s_queueS = createQueue(s_queueName+s_toServer);
					s_queueS.start();
				}
				catch (Exception e2)
				{
					Log.error("Queue.getQueue (create1)", e2);
				}
			}
		}
		//	Client Queue
		if (!toServer && s_queueC == null)
		{
			AQSession session = getSession();
			try
			{
				s_queueC = session.getQueue(CConnection.get().getDbUid(),
					s_queueName+s_toClient);
			}
			catch (AQException e)
			{
				try
				{
					//	No Queue - So create it
					s_queueC = createQueue(s_queueName+s_toClient);
					s_queueC.start();
				}
				catch (Exception e2)
				{
					Log.error("Queue.getQueue (create2)", e2);
				}
			}
		}
		if (toServer)
			return s_queueS;
		return s_queueC;
	}	//	getQueue

	/**
	 *	Drop Queue
	 *
	private static void dropQueues()
	{
		Log.trace(Log.l3_Util, "Queue.dropQueues");
		//
		String tabNameS = s_queueName + s_toServer + s_table_extension;
		String tabNameC = s_queueName + s_toClient + s_table_extension;
		try
		{
			AQQueueTable tableS = getSession().getQueueTable(CConnection.get().getDbUid(), tabNameS);
			AQQueueTable tableC = getSession().getQueueTable(CConnection.get().getDbUid(), tabNameC);
			tableS.drop(true);
			tableC.drop(true);
		}
		catch (Exception e)
		{
			Log.error("Queue.dropQueues - " + e.getMessage());
		}
	}	//	dropQueues

	/**
	 *	Close Queues & Session
	 *
	public static void close (boolean closeConnection)
	{
		s_queueC = null;
		s_queueS = null;
		s_session = null;
		if (closeConnection && s_connection != null)
		{
			try
			{
				s_connection.close();
			}
			catch (SQLException e)
			{
				Log.error("Queue.close", e);
			}
			s_connection = null;
		}
	}	//	close


	/**
	 *	Send Message
	 *
	protected static boolean send (Serializable info, boolean toServer)
	{
		if (info == null)
			return false;

		//	Serialize info
		byte[] data = null;
		try
		{
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			ObjectOutputStream oos = new ObjectOutputStream (baos);
			oos.writeObject(info);
			oos.flush();
			oos.close();
			data = baos.toByteArray();
			baos.close();
		}
		catch (IOException ioe)
		{
			Log.error("Queue.send IO - " + ioe.getMessage());
			return false;
		}

		try
		{
			//	create Message
			AQQueue queue = getQueue(toServer, false);
			AQMessage message = queue.createMessage();
			//	populate payload
			AQRawPayload rawPayload = message.getRawPayload();
			rawPayload.setStream(data, data.length);

			//	Standard enqueue Options
			AQEnqueueOption option = new AQEnqueueOption();

			//	Enqueue
			queue.enqueue(option, message);
		}
		catch (Exception e)
		{
			Log.error("Queue.send", e);
			return false;
		}
		Log.trace(Log.l4_Data, "Queue.send " + info.getClass()
			+ (toServer ? " ToServer" : " ToClient")
			+ ", Size=" + data.length);
		return true;
	}	//	send

	/**
	 *	Receive message
	 *
	protected static Serializable receive (boolean fromServer)
	{
		//	Get Queue
		AQQueue queue = getQueue(!fromServer, false);

		//	Get Message
		AQMessage message = null;
		try
		{
			//	Set Dequeue Option
			AQDequeueOption option = new AQDequeueOption();
			option.setWaitTime(1);	//	one second wait
		//	option.setDequeueMode(AQDequeueOption.DEQUEUE_REMOVE);
			//
			message = queue.dequeue(option);
		}
		catch (AQOracleSQLException e)
		{
			if (e.getErrorCode() == 25228)	//	timeout
				return null;
			Log.error("Queue.receive", e);
			return null;
		}
		catch (Exception e)
		{
			Log.error("Queue.receive", e);
			return null;
		}

		Serializable info = deserialize (message);

		Log.trace(Log.l4_Data, "Queue.receive " + info.getClass()
			+ (fromServer ? " FromServer" : " FromClient"));
		return info;
	}	//	receive

	/**
	 *	De-Serialize
	 *
	private static Serializable deserialize (AQMessage message)
	{
		//	Deserialize
		Serializable info = null;
		try
		{
			//	get Payload
			AQRawPayload raw_payload = message.getRawPayload();
			byte[] data = raw_payload.getBytes();
			//
			ByteArrayInputStream bais = new ByteArrayInputStream(data);
			ObjectInputStream ois = new ObjectInputStream (bais);
			info = (Serializable)ois.readObject();
			ois.close();
			bais.close();
		}
		catch (Exception e)
		{
			Log.error("Queue.deserialize", e);
			return null;
		}
		return info;
	}	//	deserialize

	/**
	 *	List message
	 *
	protected static ArrayList listMessages (boolean fromServer)
	{
		//	Get Queue
		AQQueue queue = getQueue(!fromServer, true);
		AQDequeueOption option = new AQDequeueOption();
		try
		{
			Log.trace(Log.l3_Util, "Queue.listMessages - " + queue.getName());

			//	Set Dequeue Option
			option.setWaitTime(0);	//	no wait
			option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE);
		}
		catch (Exception e)
		{
			Log.error("Queue.lsiMessages", e);
		}

		ArrayList list = new ArrayList();

		//	Get Messages
		AQMessage message = null;
		do
		{
			try
			{
				message = queue.dequeue(option);
				Serializable info = deserialize (message);
				list.add(info);
				Log.trace(Log.l4_Data, "> " + info.toString());
			}
			catch (AQOracleSQLException e)
			{
				if (e.getErrorCode() != 25228)	//	timeout
					Log.error("Queue.receive", e);
				message = null;
			}
			catch (Exception e)
			{
				Log.error("Queue.receive", e);
			}
		} while (message != null);
		return list;
	}	//	listMessages




	public static boolean sendToServer(Serializable info)
	{
		return send (info, true);
	}

	public static boolean sendToClient(Serializable info)
	{
		return send (info, false);
	}

	public static Serializable receiveFromServer()
	{
		return receive (true);
	}

	public static Serializable receiveFromClient()
	{
		return receive (false);
	}

	/**************************************************************************
	 *	Main Test
	 *
	public static void main (String[] args)
	{
		Env.initTest(9, true);      //  run as Client

	//	dropQueues();
		Timestamp t = new Timestamp(System.currentTimeMillis());

		sendToServer ("This is a new test " + t.toString());
		sendToClient (new KeyNamePair (21, "Twenty-one " + t.toString()));
		sendToServer ("This is a second test " + t.toString());
		sendToClient (new KeyNamePair (22, "Twenty-two " + t.toString()));
		listMessages(true);
		listMessages(false);
		System.out.println(receiveFromClient());
		System.out.println(receiveFromServer());
		listMessages(true);
		listMessages(false);

		System.out.println("Fini");
		System.exit(0);
	//	AEnv.exit(0);
	}	//	Main
*/
}	//	Queue

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -