📄 receiveintxbean.java
字号:
//声明这个类属于包examples.jta.jmsjdbc
package examples.jta.jmsjdbc;
//声明这个引入的其他类
//jdbc类
import java.sql.Connection;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
//Ejb类
import javax.ejb.CreateException;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;
//java消息服务类
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
//java名称服务类
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
//事务类
import javax.transaction.UserTransaction;
import javax.sql.DataSource;
/**
* ReceiveInTxBean 是一个无状态的会话EJBean,使用bean管理的事务
* 这个EJBean演示分布事务,事务资源由:JMS队列和数据库
* 开始分布式事务,从JMS队列接收消息,更新数据库和发送事务。
*/
//无状态会话EJB必须实现接口SessionBean
public class ReceiveInTxBean implements SessionBean {
//声明是否控制台输出变量
private static final boolean VERBOSE = true;
//声明会话上下文变量
private SessionContext ctx;
// 打印
private void log(String s) {
if (VERBOSE) System.out.println(s);
}
/**
* 这是EJB规范定义的方法,在这个实例中未使用
*/
public void ejbActivate() {
log("ejbActivate called");
}
/**
* 这是EJB规范定义的方法,在这个实例中未使用
*/
public void ejbPassivate() {
log("ejbPassivate called");
}
/**
* 这是EJB规范定义的方法,在这个实例中未使用
*/
public void ejbCreate() throws CreateException {
log("ejbCreate called");
}
/**
* 这是EJB规范定义的方法,在这个实例中未使用
*/
public void ejbRemove() {
log("ejbRemove called");
}
/**
* 设置会话上下文
*
* @参数 ctx SessionContext 会话上下文
*/
public void setSessionContext(SessionContext ctx) {
log("setSessionContext called");
this.ctx = ctx;
}
/**
* 这个方法实现了在远程接口ReceiveInTx中定义的receiveMessages方法
*
* 这个方法开始一个分布式事务, 从JMS消息队列接收消息,更新数据库,并触发分布式事务
*/
public void receiveMessages() {
//声明消息队列连接变量
QueueConnection qcon = null;
//声明消息队列会话变量
QueueSession qsession = null;
//声明消息接收器变量
QueueReceiver qreceiver = null;
//声明数据库连接
Connection jcon = null;
try {
//声明并实例名称服务上下文
Context ictx = new InitialContext();
//从EJB配置文件中提取上下文对象,EJB的初始化参数是放在配置文件中的
Context env = (Context) ictx.lookup("java:comp/env");
//获取参数
//消息队列构造器名
String queueConnFactoryName = (String)env.lookup("queueConnFactoryName");
//消息队列名
String queueName = (String)env.lookup("queueName");
//数据源名
String xaDataSrcName = (String)env.lookup("xaDataSrcName");
//表明
String tableName = (String)env.lookup("tableName");
//查找消息队列构造器对象
QueueConnectionFactory qconFactory =
(QueueConnectionFactory) ictx.lookup(queueConnFactoryName);
//创建消息队列连接对象
qcon = qconFactory.createQueueConnection();
//创建消息队列会话对象
qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
//查找消息队列对象
Queue queue = (Queue) ictx.lookup(queueName);
//创建消息接收器
qreceiver = qsession.createReceiver(queue);
//启动消息队列连接
qcon.start();
//查找数据源
DataSource xads = (DataSource) ictx.lookup(xaDataSrcName);
//获取事务对象
UserTransaction utx = ctx.getUserTransaction();
//下面定义事务
//开始
utx.begin();
log("TRANSACTION BEGUN");
String msgText = null;
do {
//方法queueReceive用来接收消息,(参看queueReceive方法)
msgText = queueReceive(qreceiver);
if (msgText.equalsIgnoreCase("quit")) {
//如果消息是“quit”
//触发事务
utx.commit();
log("TRANSACTION COMMITTED");
} else {
//用消息中的数据更新数据库
updateDatabase(xads, tableName, msgText);
}
} while (msgText != null && !msgText.equals("quit"));
} catch (javax.naming.NamingException nex) {
//名称服务器抛出的异常
log("Naming exception: " + nex);
} catch (javax.jms.JMSException jex) {
//消息服务抛出的异常
log("JMS exception: " + jex);
} catch (javax.transaction.NotSupportedException nse) {
//事务服务抛出的异常
log("TRANSACTION COULD NOT BEGIN DUE TO: " + nse);
} catch (javax.transaction.RollbackException rbe) {
//事务服务抛出的异常
log("TRANSACTION ROLLED BACK DUE TO: " + rbe);
} catch (javax.transaction.HeuristicRollbackException hre) {
//事务服务抛出的异常
log("TRANSACTION ROLLED BACK DUE TO: " + hre);
} catch (javax.transaction.HeuristicMixedException hme) {
//事务服务抛出的异常
log("TRANSACTION ROLLED BACK DUE TO: " + hme);
} catch (javax.transaction.SystemException se) {
//事务服务抛出的异常
log("TRANSACTION EXCEPTION: " + se);
} finally {
//最后做一些清理工作
if (qreceiver != null) {
//关闭消息接收器
try { qreceiver.close(); } catch (JMSException ex) {}
}
if (qsession != null) {
//关闭消息队列会话
try { qsession.close(); } catch (JMSException ex) {}
}
if (qcon != null) {
//关闭消息队列连接
try { qcon.close(); } catch (JMSException ex) {}
}
}
}
//本类使用的方法
//从指定的消息接收其中接收消息,并转化成文本
private String queueReceive(QueueReceiver qr) {
//文本变量
String msgText = null;
try {
//获取消息对象
Message msg = qr.receive();
if (msg != null) {
//判断消息类型
if (msg instanceof TextMessage) {
//是文本消息
msgText = ((TextMessage)msg).getText();
} else {
msgText = msg.toString();
}
log("Message Received: " + msgText);
}
} catch (JMSException jmse) {
log("Error receiving JMS message: " + jmse);
}
//返回消息文本
return msgText;
}
//本类使用的方法
//更新指定数据源和数据表
private void updateDatabase(DataSource xads, String tableName, String data)
{
//声明数据库连接变量
Connection jcon = null;
//声明SQL语句对象变量
PreparedStatement stmt = null;
try {
//通过数据源获取数据库连接
jcon = xads.getConnection();
//sql语句命令,插入一条纪录
String sql = "insert into " + tableName + " (data) values (?)";
//执行sql语句
stmt = jcon.prepareStatement(sql);
stmt.setString(1, data);
stmt.executeUpdate();
} catch (SQLException ex) {
//数据库异常
log("Cannot update database:" + data);
} finally {
//清除工作
if (stmt != null) {
//关闭语句对象
try { stmt.close(); } catch (SQLException ex) {}
}
if (jcon != null) {
//关闭数据库连接
try { jcon.close(); } catch (SQLException ex) {}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -