startlistener.java
来自「ejb3 java session bean」· Java 代码 · 共 260 行
JAVA
260 行
/* * JBoss, Home of Professional Open Source * Copyright 2005, JBoss Inc., and individual contributors as indicated * by the @authors tag. * * This is free software; you can redistribute it and/or modify it * under the terms of the JBPM BPEL PUBLIC LICENSE AGREEMENT as * published by JBoss Inc.; either version 1.0 of the License, or * (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */package org.jbpm.bpel.integration.jms;import java.util.Map;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import javax.jms.QueueReceiver;import javax.jms.Session;import javax.jms.TopicSubscriber;import org.apache.commons.lang.builder.ToStringBuilder;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jbpm.JbpmContext;import org.jbpm.bpel.graph.def.BpelProcessDefinition;import org.jbpm.bpel.graph.exe.BpelFaultException;import org.jbpm.bpel.integration.def.ReceiveAction;import org.jbpm.bpel.persistence.db.BpelGraphSession;import org.jbpm.bpel.persistence.db.IntegrationSession;import org.jbpm.graph.exe.ExecutionContext;import org.jbpm.graph.exe.ProcessInstance;import org.jbpm.graph.exe.Token;/** * @author Alejandro Guizar * @version $Revision: 1.22 $ $Date: 2008/06/12 08:18:54 $ */public class StartListener implements MessageListener { private final long processDefinitionId; private final long receiveActionId; private final IntegrationControl integrationControl; private final Session jmsSession; private final MessageConsumer messageConsumer; private static final Log log = LogFactory.getLog(StartListener.class); StartListener(BpelProcessDefinition processDefinition, ReceiveAction receiveAction, IntegrationControl integrationControl) throws JMSException { // initialize entity identifiers processDefinitionId = processDefinition.getId(); receiveActionId = receiveAction.getId(); // save integration control this.integrationControl = integrationControl; jmsSession = integrationControl.createJmsSession(); // create message consumer Destination destination = integrationControl.getPartnerLinkEntry(receiveAction.getPartnerLink()) .getDestination(); String selector = formatSelector(receiveAction); messageConsumer = jmsSession.createConsumer(destination, selector); integrationControl.addStartListener(this); log.debug("created start listener: processDefinition=" + processDefinition + ", receiveAction=" + receiveAction); } private static String formatSelector(ReceiveAction receiveAction) { return IntegrationConstants.PARTNER_LINK_ID_PROP + '=' + receiveAction.getPartnerLink().getId() + " AND " + IntegrationConstants.OPERATION_NAME_PROP + "='" + receiveAction.getOperation().getName() + '\''; } StartListener(StartListener other) throws JMSException { this.processDefinitionId = other.processDefinitionId; this.receiveActionId = other.receiveActionId; this.integrationControl = other.integrationControl; jmsSession = integrationControl.createJmsSession(); messageConsumer = jmsSession.createConsumer(getDestination(other.messageConsumer), other.messageConsumer.getMessageSelector()); integrationControl.addStartListener(this); log.debug("created start listener: processDefinition=" + processDefinitionId + ", receiveAction=" + receiveActionId); } private static Destination getDestination(MessageConsumer messageConsumer) throws JMSException { if (messageConsumer instanceof QueueReceiver) { QueueReceiver queueReceiver = (QueueReceiver) messageConsumer; return queueReceiver.getQueue(); } if (messageConsumer instanceof TopicSubscriber) { TopicSubscriber topicSubscriber = (TopicSubscriber) messageConsumer; return topicSubscriber.getTopic(); } throw new JMSException("unknown message consumer type: " + messageConsumer.getClass()); } public long getReceiveActionId() { return receiveActionId; } public MessageConsumer getMessageConsumer() { return messageConsumer; } public void open() throws JMSException { /* * jms may deliver a message immediately after setting the message listener; make sure this * listener is fully initialized at this point */ messageConsumer.setMessageListener(this); log.debug("opened start listener: processDefinition=" + processDefinitionId + ", receiveAction=" + receiveActionId); } public void onMessage(Message message) { if (!(message instanceof ObjectMessage)) { log.error("received non-object jms message: " + message); return; } try { ObjectMessage request = (ObjectMessage) message; log.debug("delivering request: " + RequestListener.messageToString(request)); /* * CODE ORDER NOTE. Removing this listener early in the process prevents any other thread from * closing it. This effect is desirable because the orderly shutdown mechanism of JMS never * stops a running listener anyway. Furthermore, the mechanism is specified to *block* the * other thread until this listener returns. */ integrationControl.removeStartListener(this); // BPEL-282 create new start listener to improve concurrency StartListener startListener = new StartListener(this); startListener.open(); deliverRequest((Map) request.getObject(), request.getJMSReplyTo(), request.getJMSMessageID()); request.acknowledge(); close(); } catch (JMSException e) { log.error("request delivery failed due to jms exception, giving up", e); } catch (RuntimeException e) { if (RequestListener.isRecoverable(e)) { log.warn("request delivery failed due to recoverable exception, attempting recovery"); try { // recover the session manually jmsSession.recover(); } catch (JMSException je) { log.error("request recovery failed, giving up", je); } } else log.error("request delivery failed due to non-recoverable exception, giving up", e); } } private void deliverRequest(Map parts, Destination replyTo, String requestId) { JbpmContext jbpmContext = integrationControl.getIntegrationServiceFactory() .getJbpmConfiguration() .createJbpmContext(); try { // load process definition BpelGraphSession graphSession = BpelGraphSession.getContextInstance(jbpmContext); BpelProcessDefinition processDefinition = graphSession.loadProcessDefinition(processDefinitionId); // load receive action ReceiveAction receiveAction = IntegrationSession.getContextInstance(jbpmContext) .loadReceiveAction(receiveActionId); // instantiate the process ProcessInstance processInstance = new ProcessInstance(processDefinition); // XXX root token is not assigned an identifier at creation time jbpmContext.getServices().getPersistenceService().assignId(processInstance.getRootToken()); // build initial runtime structures Token receivingToken = receiveAction.initializeProcessInstance(processInstance); try { // file outstanding request, in case operation has output if (receiveAction.getOperation().getOutput() != null) { // encapsulate the fields needed to reply OutstandingRequest outRequest = new OutstandingRequest(replyTo, requestId); // register the request in the integration control integrationControl.addOutstandingRequest(receiveAction, receivingToken, outRequest); } // pass control to start activity receiveAction.deliverMessage(receivingToken, parts); } catch (BpelFaultException e) { log.debug("request delivery caused a fault", e); processDefinition.getGlobalScope().raiseException(e, new ExecutionContext(receivingToken)); } // save changes to instance jbpmContext.save(processInstance); } catch (RuntimeException e) { jbpmContext.setRollbackOnly(); throw e; } finally { // end transaction, close all services jbpmContext.close(); } } public void close() throws JMSException { messageConsumer.close(); jmsSession.close(); log.debug("closed start listener: processDefinition=" + processDefinitionId + ", receiveAction=" + receiveActionId); } public String toString() { ToStringBuilder builder = new ToStringBuilder(this); try { QueueReceiver queueReceiver = (QueueReceiver) messageConsumer; builder.append("queue", queueReceiver.getQueue()).append("selector", queueReceiver.getMessageSelector()); } catch (JMSException e) { log.debug("could not fill request listener fields", e); } return builder.toString(); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?