⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 startlistener.java

📁 jbpm-bpel-1.1.Beta3 JBoss jBPM Starters Kit  是一个综合包
💻 JAVA
字号:
/*
 * JBoss, Home of Professional Open Source
 * Copyright 2005, JBoss Inc., and individual contributors as indicated
 * by the @authors tag.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the JBPM BPEL PUBLIC LICENSE AGREEMENT as
 * published by JBoss Inc.; either version 1.0 of the License, or
 * (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 */
package org.jbpm.bpel.integration.jms;

import java.util.Map;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Session;
import javax.wsdl.OperationType;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.bpel.db.IntegrationSession;
import org.jbpm.bpel.graph.def.BpelDefinition;
import org.jbpm.bpel.integration.def.PartnerLinkDefinition;
import org.jbpm.bpel.integration.def.Receiver;
import org.jbpm.graph.exe.ExecutionContext;
import org.jbpm.graph.exe.ProcessInstance;
import org.jbpm.graph.exe.Token;

/**
 * @author Alejandro Gu韟ar
 * @version $Revision: 1.6 $ $Date: 2007/01/18 13:49:50 $
 */
public class StartListener implements MessageListener {

  private final long processId;
  private final long receiverId;

  private Session jmsSession;
  private MessageConsumer consumer;

  private static final Log log = LogFactory.getLog(StartListener.class);

  public StartListener(BpelDefinition process, Receiver receiver,
      IntegrationControl integrationControl) throws JMSException {
    receiverId = receiver.getId();
    processId = process.getId();
    // get the destination associated to the partner link
    PartnerLinkDefinition partnerLink = receiver.getPartnerLink();
    PartnerLinkEntry entry = integrationControl.getPartnerLinkEntry(partnerLink);
    Destination destination = entry.getDestination();
    // build the message selector
    String selector = formatSelector(receiver);
    jmsSession = integrationControl.getJmsConnection().createSession(false,
        Session.CLIENT_ACKNOWLEDGE);
    consumer = jmsSession.createConsumer(destination, selector);
    consumer.setMessageListener(this);
    log.debug("opened start listener: process="
        + process
        + '#'
        + processId
        + ", receiver="
        + receiver);
  }

  public long getReceiverId() {
    return receiverId;
  }

  public void onMessage(Message request) {
    if (!(request instanceof ObjectMessage)) {
      log.error("received non-object jms message: " + request);
      return;
    }

    boolean acknowledgeMessage = true;

    JbpmConfiguration jbpmConfiguration = JbpmConfiguration.getInstance();
    JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
    try {
      log.debug("received request: " + RequestListener.messageToString(request));
      // load process definition
      BpelDefinition processDefinition = (BpelDefinition) jbpmContext.getGraphSession()
          .loadProcessDefinition(processId);
      // instantiate the process
      ProcessInstance instance = processDefinition.createProcessInstance();

      // load the message receiver
      Receiver trigger = IntegrationSession.getInstance(jbpmContext)
          .loadReceiver(receiverId);
      Token rootToken = instance.getRootToken();
      try {
        // file outstanding request, in case operation has output
        if (OperationType.REQUEST_RESPONSE.equals(trigger.getOperation()
            .getStyle())) {
          // encapsulate the fields needed to reply
          OutstandingRequest outRequest = new OutstandingRequest(
              (Queue) request.getJMSReplyTo(), request.getJMSMessageID());
          // register the request in the integration control
          IntegrationControl integrationControl = JmsIntegrationServiceFactory.getInstance(
              jbpmConfiguration)
              .getIntegrationControl(processDefinition);
          integrationControl.addOutstandingRequest(trigger, rootToken,
              outRequest);
        }
        // read message parts into the process variables
        Map inputParts = (Map) ((ObjectMessage) request).getObject();
        trigger.readMessage(inputParts, rootToken);
        // pass control to the process
        processDefinition.messageReceived(trigger, rootToken);
      }
      catch (RuntimeException e) {
        log.debug("caught exception while passing control to process, "
            + "searching for handler", e);
        processDefinition.getGlobalScope().raiseException(e,
            new ExecutionContext(rootToken));
      }
      // save changes to instance
      jbpmContext.save(instance);
    }
    catch (Exception e) {
      log.error("could not start process instance", e);
      acknowledgeMessage = false;
      jbpmContext.setRollbackOnly();
      return;
    }
    finally {
      try {
        // end transaction, close all services
        jbpmContext.close();
      }
      catch (RuntimeException e) {
        log.debug("could not close jbpm context", e);
        acknowledgeMessage = false;
      }
    }

    // acknowledge request after everything succeeds
    if (acknowledgeMessage) {
      try {
        // acknowledge request after everything succeeds
        request.acknowledge();
      }
      catch (JMSException e) {
        log.error("could not acknowledge message", e);
      }
    }
  }

  public void close() throws JMSException {
    // close consumer first
    consumer.close();
    consumer = null;
    log.debug("closed start listener: process="
        + processId
        + ", receiver="
        + receiverId);
    
    // now close session
    jmsSession.close();
    jmsSession = null;
  }
  
  boolean isClosed() {
    return consumer == null && jmsSession == null;
  }

  protected String formatSelector(Receiver receiver) {
    // partner link id
    return IntegrationConstants.PARTNER_LINK_ID_PROP
        + '='
        + receiver.getPartnerLink().getId()
        // operation name
        + " AND "
        + IntegrationConstants.OPERATION_NAME_PROP
        + "='"
        + receiver.getOperation().getName()
        + '\'';
  }

  public String toString() {
    ToStringBuilder builder = new ToStringBuilder(this);
    try {
      builder.append("queue", ((QueueReceiver) consumer).getQueue()).append(
          "selector", consumer.getMessageSelector());
    }
    catch (JMSException e) {
      log.debug("could not fill request listener fields", e);
    }
    return builder.toString();
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -