📄 requestlistener.java
字号:
/*
* JBoss, Home of Professional Open Source
* Copyright 2005, JBoss Inc., and individual contributors as indicated
* by the @authors tag.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the JBPM BPEL PUBLIC LICENSE AGREEMENT as
* published by JBoss Inc.; either version 1.0 of the License, or
* (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*/
package org.jbpm.bpel.integration.jms;
import java.util.Enumeration;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.wsdl.OperationType;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.bpel.BpelException;
import org.jbpm.bpel.db.IntegrationSession;
import org.jbpm.bpel.integration.def.Receiver;
import org.jbpm.graph.exe.ExecutionContext;
import org.jbpm.graph.exe.Token;
/**
* @author Alejandro Gu韟ar
* @version $Revision: 1.6 $ $Date: 2007/01/18 13:49:50 $
*/
public class RequestListener implements MessageListener {
private final long receiverId;
private final long tokenId;
private MessageConsumer consumer;
private static final Log log = LogFactory.getLog(RequestListener.class);
RequestListener(long receiverId, long tokenId, MessageConsumer consumer) {
this.receiverId = receiverId;
this.tokenId = tokenId;
this.consumer = consumer;
}
long getReceiverId() {
return receiverId;
}
long getTokenId() {
return tokenId;
}
public void open() throws JMSException {
/*
* jms could deliver a message immediately after setting this listener, so
* make sure this listener is fully initialized at this point
*/
consumer.setMessageListener(this);
log.debug("opened request listener: receiver="
+ receiverId
+ ", token="
+ tokenId);
}
public void onMessage(Message request) {
if (!(request instanceof ObjectMessage)) {
log.error("received non-object jms message: " + request);
return;
}
boolean acknowledgeMessage = true;
JbpmContext jbpmContext = JbpmConfiguration.getInstance()
.createJbpmContext();
try {
log.debug("received request: " + RequestListener.messageToString(request));
// load the token and have it saved automatically
Token token = jbpmContext.loadTokenForUpdate(tokenId);
// load receiver via Hibernate session
Receiver receiver = IntegrationSession.getInstance(jbpmContext)
.loadReceiver(receiverId);
// take the input parts from jms message
ObjectMessage objectRequest = (ObjectMessage) request;
Map parts = (Map) objectRequest.getObject();
try {
// assign the input parts
receiver.readMessage(parts, token);
// file outstanding request, in case operation has output
if (OperationType.REQUEST_RESPONSE.equals(receiver.getOperation()
.getStyle())) {
// encapsulate the fields needed to reply
OutstandingRequest outRequest = new OutstandingRequest(
(Queue) request.getJMSReplyTo(), request.getJMSMessageID());
// register the request in the integration control
IntegrationControl integrationControl = JmsIntegrationService.get(
jbpmContext).getIntegrationControl(token);
integrationControl.addOutstandingRequest(receiver, token, outRequest);
}
// pass control to activity
receiver.getInboundMessageActivity().messageReceived(receiver, token);
}
catch (BpelException e) {
log.debug("caught exception while processing request", e);
token.getNode().raiseException(e, new ExecutionContext(token));
}
}
catch (Exception e) {
log.error("could not resume process execution", e);
acknowledgeMessage = false;
jbpmContext.setRollbackOnly();
}
finally {
try {
// end transaction, close all services
jbpmContext.close();
}
catch (RuntimeException e) {
log.error("could not close jbpm context", e);
acknowledgeMessage = false;
}
}
// acknowledge request after everything succeeds
if (acknowledgeMessage) {
try {
request.acknowledge();
}
catch (JMSException e) {
log.error("could not acknowledge message", e);
}
}
}
public void close() throws JMSException {
consumer.close();
consumer = null;
log.debug("closed request listener: receiver="
+ receiverId
+ ", token="
+ tokenId);
}
public String toString() {
ToStringBuilder builder = new ToStringBuilder(this);
try {
builder.append("queue", ((QueueReceiver) consumer).getQueue()).append(
"selector", consumer.getMessageSelector());
}
catch (JMSException e) {
log.debug("could not fill request listener fields", e);
}
return builder.toString();
}
public static String messageToString(Message message) throws JMSException {
StringBuffer result = new StringBuffer();
// ID & destination
result.append("id=").append(message.getJMSMessageID()).append(
", destination=").append(message.getJMSDestination());
// replyTo & correlationID
Destination replyTo = message.getJMSReplyTo();
if (replyTo != null) {
result.append(", replyTo=")
.append(replyTo)
.append(", correlationId=")
.append(message.getJMSCorrelationID());
}
// properties
Enumeration propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String propertyName = (String) propertyNames.nextElement();
result.append(", ").append(propertyName).append('=').append(
message.getObjectProperty(propertyName));
}
return result.toString();
}
public static class Key {
private final long receiverId;
private final long tokenId;
Key(long receiverId, long tokenId) {
this.receiverId = receiverId;
this.tokenId = tokenId;
}
public long getReceiverId() {
return receiverId;
}
public long getTokenId() {
return tokenId;
}
public boolean equals(Object obj) {
if (!(obj instanceof Key))
return false;
Key that = (Key) obj;
return receiverId == that.receiverId && tokenId == that.tokenId;
}
public int hashCode() {
return new HashCodeBuilder(863, 5).append(receiverId)
.append(tokenId)
.toHashCode();
}
public String toString() {
return new ToStringBuilder(this).append("receiverId", receiverId).append(
"tokenId", tokenId).toString();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -