⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagingdbtest.java

📁 jbpm demo 是一款非常不错的开源工作流,简单易用,适合扩张开发!
💻 JAVA
字号:
/*
 * JBoss, Home of Professional Open Source
 * Copyright 2005, JBoss Inc., and individual contributors as indicated
 * by the @authors tag. See the copyright.txt in the distribution for a
 * full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jbpm.msg.db;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jbpm.JbpmContext;
import org.jbpm.db.AbstractDbTestCase;

public class MessagingDbTest extends AbstractDbTestCase {
  
  static final String DESTINATION_TEST = "TEST";
  static List receivedMsgs = null;

  DbMessageService dbMessageService = null;

  public void setUp() throws Exception {
    super.setUp();
    receivedMsgs = Collections.synchronizedList(new ArrayList());
  }

  protected void initializeMembers() {
    super.initializeMembers();
    dbMessageService = (DbMessageService) jbpmContext.getServices().getMessageService();
  }

  public void testSendAndReceive() {
    TextMessage textMessage = new TextMessage("hello queue");
    textMessage.setDestination(DESTINATION_TEST);
    dbMessageService.send(textMessage);
    newTransaction();
    log.trace("checking for messages...");
    assertTrue(dbMessageService.hasMessages(DESTINATION_TEST));
    log.trace("fetching the message...");
    TextMessage msg = (TextMessage) dbMessageService.receiveNoWait(DESTINATION_TEST);
    assertEquals("hello queue", msg.text);
  }
  
  public static class Receiver extends Thread {
    public Receiver(String name) {
      super(name);
    }
    public void run() {
      try {
        while(true) {
          JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
          try {
            DbMessageService dbMessageService = (DbMessageService) jbpmContext.getServices().getMessageService();

            log.trace("receiver '"+getName()+"' is going to wait for a message");
            if (!dbMessageService.hasMessages(DESTINATION_TEST)) {
              StaticNotifier.waitForNotification(DESTINATION_TEST);
            }

            log.trace("receiver '"+getName()+"' is going to get a message");
            TextMessage textMessage = (TextMessage) dbMessageService.receiveNoWait(DESTINATION_TEST);
            if (textMessage!=null) {
              String text = textMessage.getText();
              log.trace("receiver '"+getName()+"' received message '"+text+"'");
              receivedMsgs.add(text);
            }

          } finally {
            jbpmContext.close();
          }
        }
      } catch (InterruptedException e) {
        log.trace("thread '"+getName()+"' got interrupted");
      } finally {
        log.warn("thread '"+getName()+"' takes a hike");
      }
    }
  }

  public void testMultipleWaiters() throws InterruptedException {
    Thread receiverOne = new Receiver("one");
    Thread receiverTwo = new Receiver("two");
    Thread receiverThree = new Receiver("three");

    try {
      receiverOne.start();
      receiverTwo.start();
      receiverThree.start();
      Thread.sleep(500);
  
      // going to send a message
      TextMessage textMessage = new TextMessage("hello");
      textMessage.setDestination(DESTINATION_TEST);
      dbMessageService.send(textMessage);
      // wait a bit
      Thread.sleep(200);
      // and verify that the message did not yet arrive (it's supposed to be transactional)
      assertEquals(0, receivedMsgs.size());
      // commit the transaction
      newTransaction();
      // wait a bit more till one of the other threads puts the message in the received msgs
      Thread.sleep(200);
      // now the message should have arrived
      assertEquals(1, receivedMsgs.size());
      assertEquals("hello", receivedMsgs.get(0));
  
      // going to send a message
      TextMessage textMessage2 = new TextMessage("world");
      textMessage2.setDestination(DESTINATION_TEST);
      dbMessageService.send(textMessage2);
      // wait a bit
      Thread.sleep(200);
      // and verify that the message did not yet arrive (it's supposed to be transactional)
      // damn!  hsqldb's READ_UNCOMMITTED prevents the following assertion
      //assertEquals(1, receivedMsgs.size());
      
      // commit the transaction
      newTransaction();
      // wait a bit more till one of the other threads puts the message in the received msgs
      Thread.sleep(200);
      // now the message should have arrived
      assertEquals(2, receivedMsgs.size());
      assertEquals("world", receivedMsgs.get(1));
  
    } finally {
      receiverOne.interrupt();
      receiverTwo.interrupt();
      receiverThree.interrupt();
    }
    receiverOne.join(); 
    receiverTwo.join(); 
    receiverThree.join(); 
  }

  static List sendedMsgs = new ArrayList();

  public static class Sender extends Thread {
    int nbrOfMessages;
    int startIndex;
    public Sender(String name, int nbrOfMessages, int startIndex) {
      super(name);
      this.nbrOfMessages = nbrOfMessages;
      this.startIndex = startIndex;
    }
    public void run() {
      for (int i=0; i<nbrOfMessages; i++) {
        JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
        try {
          DbMessageService dbMessageService = (DbMessageService) jbpmContext.getServices().getMessageService();

          String msg = Integer.toString(startIndex+i);
          TextMessage textMessage = new TextMessage(msg);
          textMessage.setDestination(DESTINATION_TEST);
          dbMessageService.send(textMessage);
          sendedMsgs.add( msg );
        } finally {
          jbpmContext.close();
        }
      }
    }
  }
  
  public void testBulkMessages() throws InterruptedException {
    Thread receiverOne = new Receiver("receiver");
    receiverOne.start();
    
    int nbrOfMsgsPerSender = 20;
    
    Thread senderOne = new Sender("sender-one", nbrOfMsgsPerSender, 0);
    senderOne.start();
    Thread senderTwo = new Sender("sender-two", nbrOfMsgsPerSender, nbrOfMsgsPerSender);
    senderTwo.start();
    Thread senderThree = new Sender("sender-three", nbrOfMsgsPerSender, nbrOfMsgsPerSender*2);
    senderThree.start();
    Thread senderFour = new Sender("sender-four", nbrOfMsgsPerSender, nbrOfMsgsPerSender*3);
    senderFour.start();

    // wait till the receiver has received all the messages of the senders
    boolean isDone = false; 
    while (! isDone) {
      Thread.sleep(500);
      if (receivedMsgs.size()==(nbrOfMsgsPerSender*4)) {
        isDone = true;
      }
    }
    
    receiverOne.interrupt();
    receiverOne.join();

    Collection expectedMsgs = new HashSet();
    for (int i=0; i<nbrOfMsgsPerSender*4; i++) expectedMsgs.add(Integer.toString(i)); 

    assertEquals(expectedMsgs, new HashSet(sendedMsgs));
    assertEquals(expectedMsgs, new HashSet(receivedMsgs));
  }

  public void testReceiveFailure() {
    TextMessage textMessage = new TextMessage("hello queue");
    textMessage.setDestination(DESTINATION_TEST);
    dbMessageService.send(textMessage);

    newTransaction();
    
    assertNotNull(dbMessageService.receiveNoWait(DESTINATION_TEST));
    jbpmContext.setRollbackOnly();
    
    newTransaction();
    
    TextMessage msg = (TextMessage) dbMessageService.receiveNoWait(DESTINATION_TEST);
    assertEquals("hello queue", msg.text);
    msg.setDestination(DESTINATION_TEST);
    msg.setException("something went wrong");
    messagingSession.save(msg);
    
    newTransaction();
    
    assertEquals(0, messagingSession.findMessages(DESTINATION_TEST).size());
    List errorMsgs = messagingSession.findErrorMessages(DESTINATION_TEST);
    assertNotNull(errorMsgs);
    assertEquals(1, errorMsgs.size());

    msg = (TextMessage) errorMsgs.get(0);
    assertEquals("hello queue", msg.text);
    assertEquals("something went wrong", msg.getException());
  }

  private static Log log = LogFactory.getLog(MessagingDbTest.class);
  
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -