⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 receiveintxbean.java

📁 weblogic应用全实例
💻 JAVA
字号:
//声明这个类属于包examples.jta.jmsjdbc
package examples.jta.jmsjdbc;
//声明这个引入的其他类
//jdbc类
import java.sql.Connection;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
//Ejb类
import javax.ejb.CreateException;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;
//java消息服务类
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
//java名称服务类
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
//事务类
import javax.transaction.UserTransaction;
import javax.sql.DataSource;

/**
 * ReceiveInTxBean 是一个无状态的会话EJBean,使用bean管理的事务
 * 这个EJBean演示分布事务,事务资源由:JMS队列和数据库
 * 开始分布式事务,从JMS队列接收消息,更新数据库和发送事务。
 */
//无状态会话EJB必须实现接口SessionBean
public class ReceiveInTxBean implements SessionBean {
  //声明是否控制台输出变量
  private static final boolean VERBOSE = true;
  //声明会话上下文变量
  private SessionContext ctx;

  // 打印
  private void log(String s) {
    if (VERBOSE) System.out.println(s);
  } 

  /**
   * 这是EJB规范定义的方法,在这个实例中未使用
   */
  public void ejbActivate() {
    log("ejbActivate called");
  }

  /**
   * 这是EJB规范定义的方法,在这个实例中未使用
   */
  public void ejbPassivate() {
    log("ejbPassivate called");
  }

  /**
   * 这是EJB规范定义的方法,在这个实例中未使用
   */
  public void ejbCreate() throws CreateException {
    log("ejbCreate called");
  }

  /**
   * 这是EJB规范定义的方法,在这个实例中未使用
   */
  public void ejbRemove() {
    log("ejbRemove called");
  }

  /**
   * 设置会话上下文
   *
   * @参数 ctx               SessionContext 会话上下文
   */
  public void setSessionContext(SessionContext ctx) {
    log("setSessionContext called");
    this.ctx = ctx;
  }

  /**
   * 这个方法实现了在远程接口ReceiveInTx中定义的receiveMessages方法
   *
   * 这个方法开始一个分布式事务, 从JMS消息队列接收消息,更新数据库,并触发分布式事务
   */
  public void receiveMessages() {
  	//声明消息队列连接变量
    QueueConnection qcon = null;
    //声明消息队列会话变量
    QueueSession qsession = null;
    //声明消息接收器变量
    QueueReceiver qreceiver = null;
    //声明数据库连接
    Connection jcon = null;

    try {
      //声明并实例名称服务上下文
      Context ictx = new InitialContext();
     //从EJB配置文件中提取上下文对象,EJB的初始化参数是放在配置文件中的
      Context env = (Context) ictx.lookup("java:comp/env");
      //获取参数
      //消息队列构造器名
      String queueConnFactoryName = (String)env.lookup("queueConnFactoryName");
      //消息队列名
      String queueName = (String)env.lookup("queueName");
      //数据源名
      String xaDataSrcName = (String)env.lookup("xaDataSrcName");
      //表明
      String tableName = (String)env.lookup("tableName");
      //查找消息队列构造器对象
      QueueConnectionFactory qconFactory = 
        (QueueConnectionFactory) ictx.lookup(queueConnFactoryName);
      //创建消息队列连接对象
      qcon = qconFactory.createQueueConnection();
      //创建消息队列会话对象
      qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      //查找消息队列对象
      Queue queue = (Queue) ictx.lookup(queueName);
      //创建消息接收器
      qreceiver = qsession.createReceiver(queue);
      //启动消息队列连接
      qcon.start();
      
      //查找数据源
      DataSource xads = (DataSource) ictx.lookup(xaDataSrcName);
      //获取事务对象
      UserTransaction utx = ctx.getUserTransaction();
      //下面定义事务
      //开始
      utx.begin();
      log("TRANSACTION BEGUN");
      String msgText = null;
      do {
      	//方法queueReceive用来接收消息,(参看queueReceive方法)
        msgText = queueReceive(qreceiver);
        if (msgText.equalsIgnoreCase("quit")) {
          //如果消息是“quit”
          //触发事务
          utx.commit();
          log("TRANSACTION COMMITTED");
        } else {
          //用消息中的数据更新数据库
          updateDatabase(xads, tableName, msgText);
        }
      } while (msgText != null && !msgText.equals("quit"));

	} catch (javax.naming.NamingException nex) {
	//名称服务器抛出的异常
      log("Naming exception: " + nex);      
    } catch (javax.jms.JMSException jex) {
    //消息服务抛出的异常
      log("JMS exception: " + jex);      
    } catch (javax.transaction.NotSupportedException nse) {
    //事务服务抛出的异常
      log("TRANSACTION COULD NOT BEGIN DUE TO: " + nse);      
    } catch (javax.transaction.RollbackException rbe) {
    //事务服务抛出的异常
      log("TRANSACTION ROLLED BACK DUE TO: " + rbe);
    } catch (javax.transaction.HeuristicRollbackException hre) {
    //事务服务抛出的异常
      log("TRANSACTION ROLLED BACK DUE TO: " + hre);
    } catch (javax.transaction.HeuristicMixedException hme) {
    //事务服务抛出的异常
      log("TRANSACTION ROLLED BACK DUE TO: " + hme);
    } catch (javax.transaction.SystemException se) {
    //事务服务抛出的异常
      log("TRANSACTION EXCEPTION: " + se);
    } finally {
    //最后做一些清理工作
      if (qreceiver != null) {
      	//关闭消息接收器
        try { qreceiver.close(); } catch (JMSException ex) {}
      }
      if (qsession != null) {
      	//关闭消息队列会话
        try { qsession.close(); } catch (JMSException ex) {}
      }
      if (qcon != null) {
      	//关闭消息队列连接
        try { qcon.close(); } catch (JMSException ex) {}
      }
    }
  }
  
  //本类使用的方法
  //从指定的消息接收其中接收消息,并转化成文本
  private String queueReceive(QueueReceiver qr) {
  	//文本变量
    String msgText = null;
    try {
    //获取消息对象
      Message msg = qr.receive();
      if (msg != null) {
      	//判断消息类型
        if (msg instanceof TextMessage) {
        //是文本消息
          msgText = ((TextMessage)msg).getText();
        } else {
          msgText = msg.toString();
        }
        log("Message Received: " + msgText);
      }        
    } catch (JMSException jmse) {
      log("Error receiving JMS message: " + jmse);
    }
    //返回消息文本
    return msgText;
  }
  //本类使用的方法
  //更新指定数据源和数据表
  private void updateDatabase(DataSource xads, String tableName, String data) 
  {
  	//声明数据库连接变量
    Connection jcon = null;
    //声明SQL语句对象变量
    PreparedStatement stmt = null;
    try {
    //通过数据源获取数据库连接
      jcon = xads.getConnection();
      //sql语句命令,插入一条纪录
      String sql = "insert into " + tableName + " (data) values (?)";
      //执行sql语句
      stmt = jcon.prepareStatement(sql);
      stmt.setString(1, data);
      stmt.executeUpdate();
    } catch (SQLException ex) {
    //数据库异常
      log("Cannot update database:" + data);
    } finally {
    //清除工作
      if (stmt != null) {
      //关闭语句对象
        try { stmt.close(); } catch (SQLException ex) {}
      }
      if (jcon != null) {
      //关闭数据库连接
        try { jcon.close(); } catch (SQLException ex) {}
      }
    }
  }
}




⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -