📄 queueserverentrytest.java
字号:
package org.xmlBlaster.test.classtest.queue;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.engine.MsgUnitWrapper;import org.xmlBlaster.engine.qos.PublishQosServer;import org.xmlBlaster.client.key.PublishKey;import java.util.ArrayList;import junit.framework.Test;import junit.framework.TestCase;import junit.framework.TestSuite;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.qos.MsgQosSaxFactory;import org.xmlBlaster.util.queue.QueuePluginManager;import org.xmlBlaster.util.plugin.PluginInfo;/** * Test persistence of Queue entries. * <p> * Invoke: java -Djava.compiler= org.xmlBlaster.test.classtest.queue.QueueServerEntryTest * </p> * @see org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry * @see org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry * @see org.xmlBlaster.util.queue.I_Queue * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin * @see org.xmlBlaster.util.queue.jdbc.JdbcQueueCommonTablePlugin */public class QueueServerEntryTest extends TestCase { private String ME = "QueueServerEntryTest"; protected ServerScope glob; private static Logger log = Logger.getLogger(QueueServerEntryTest.class.getName()); private I_Queue queue = null; public ArrayList queueList = null; public static String[] PLUGIN_TYPES = { new String("RAM"), new String("JDBC"), new String("CACHE") }; public int count = 0; /** Constructor for junit public QueueServerEntryTest(String name) { this(new Global(), name); } */ public QueueServerEntryTest(ServerScope glob, String name, int currImpl) { super(name); this.glob = glob; ME = "QueueServerEntryTest with class: " + PLUGIN_TYPES[this.count]; this.count = currImpl; try { glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); QueuePluginManager pluginManager = this.glob.getQueuePluginManager(); PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0"); java.util.Properties pluginProp = (java.util.Properties)pluginInfo.getParameters(); pluginProp.put("tableNamePrefix", "TEST"); pluginProp.put("entriesTableName", "_entries"); this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters()); QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "updateEntry"); this.queue = pluginManager.getPlugin(PLUGIN_TYPES[currImpl], "1.0", queueId, cbProp); this.queue.shutdown(); // to allow to initialize again } catch (Exception ex) { log.severe("could not propertly set up the database: " + ex.getMessage()); } } protected void setUp() { try { glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST"); ME = "QueueServerEntryTest with class: " + PLUGIN_TYPES[this.count]; } catch (Exception ex) { log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'" + ex.getMessage()); } if (this.queue != null) { this.queue.shutdown(); } } public void tearDown() { if (this.queue != null) { try { this.queue.clear(); this.queue.shutdown(); // this.queue = null; } catch (Exception ex) { log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp " + ex.getMessage()); } } } // -------------------------------------------------------------------------- public void testUpdateEntry() { String queueType = "unknown"; try { updateEntry(); } catch (XmlBlasterException ex) { fail("Exception when testing UpdateEntry probably due to failed initialization of the queue of type " + queueType + " " + ex.getMessage()); ex.printStackTrace(); } } public void updateEntry() throws XmlBlasterException { // set up the queues .... QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"); log.info("************ Starting updateEntry Test"); StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "updateEntry"); this.queue.initialize(queueId, prop); this.queue.clear(); try { byte[] content = "this is the content".getBytes(); PublishKey key = new PublishKey(glob, "someKey"); PublishQosServer publishQosServer = new PublishQosServer(glob, "<qos><persistent/></qos>"); MsgQosData msgQosData = publishQosServer.getData(); ((MsgQosSaxFactory)glob.getMsgQosFactory()).sendRemainingLife(false); // so we can compare the toXml() directly // populate it String state = Constants.STATE_EXPIRED; msgQosData.setState(state); msgQosData.setSubscriptionId("someId"); msgQosData.setPersistent(true); msgQosData.setForceUpdate(false); msgQosData.setReadonly(true); msgQosData.setSender(new SessionName(glob, "somebody")); msgQosData.setRedeliver(4); msgQosData.setQueueSize(1000L); msgQosData.setQueueIndex(500L); //msgQosData.addRouteInfo(null); //msgQosData.dirtyRead(null);//NodeId msgQosData.setPriority(PriorityEnum.LOW4_PRIORITY); msgQosData.setFromPersistenceStore(true); msgQosData.setLifeTime(4000L); msgQosData.setRemainingLifeStatic(6000L); MsgUnit msgUnit = new MsgUnit(key.toXml(), content, msgQosData.toXml()); log.fine("Testing" + msgQosData.toXml()); SessionName receiver = new SessionName(glob, "receiver1"); String subscriptionId = "subid"; int redeliverCounter = 2; boolean updateOneway = true; org.xmlBlaster.engine.ServerScope global = new org.xmlBlaster.engine.ServerScope(); MsgUnitWrapper msgWrapper = new MsgUnitWrapper(glob, msgUnit, queue.getStorageId()); MsgQueueUpdateEntry entry = new MsgQueueUpdateEntry(global, msgWrapper, queue.getStorageId(), receiver, subscriptionId, updateOneway); entry.incrRedeliverCounter(); entry.incrRedeliverCounter(); queue.put(entry, false); I_QueueEntry returnEntry = queue.peek(); boolean isUpdate = (returnEntry instanceof MsgQueueUpdateEntry); assertTrue("updateEntry: the return value is not an update ", isUpdate); MsgQueueUpdateEntry updateEntry = (MsgQueueUpdateEntry)returnEntry; assertEquals("The subscriptionId of the entry is different ", subscriptionId, updateEntry.getSubscriptionId()); assertEquals("The state of the entry is different ", state, updateEntry.getState()); assertEquals("The redeliverCounter of the entry is different ", redeliverCounter, updateEntry.getRedeliverCounter()); assertEquals("The priority of the entry is different ", entry.getPriority(), updateEntry.getPriority()); assertEquals("The oneway of the entry is different ", updateOneway, updateEntry.updateOneway()); assertEquals("The persistent of the entry is different ", entry.isPersistent(), updateEntry.isPersistent()); assertEquals("The receiver of the entry is different ", entry.getReceiver().toString(), updateEntry.getReceiver().toString()); assertEquals("The uniqueId of the entry is different ", entry.getUniqueId(), updateEntry.getUniqueId()); assertEquals("The msgUnitWrapperUniqueId of the entry is different ", entry.getMsgUnitWrapperUniqueId(), updateEntry.getMsgUnitWrapperUniqueId()); assertEquals("The topic oid of the entry is different ", entry.getKeyOid(), updateEntry.getKeyOid()); assertEquals("The topic oid of the entry is different ", entry.getStorageId().getId(), updateEntry.getStorageId().getId()); log.info("SUCCESS: MsgQueueUpdateEntry: Persistent fields are read as expected"); MsgUnit retMsgUnit = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -