⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestlistener.java

📁 jbpm-bpel-1.1.Beta3 JBoss jBPM Starters Kit  是一个综合包
💻 JAVA
字号:
/*
 * JBoss, Home of Professional Open Source
 * Copyright 2005, JBoss Inc., and individual contributors as indicated
 * by the @authors tag.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the JBPM BPEL PUBLIC LICENSE AGREEMENT as
 * published by JBoss Inc.; either version 1.0 of the License, or
 * (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 */
package org.jbpm.bpel.integration.jms;

import java.util.Enumeration;
import java.util.Map;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.wsdl.OperationType;

import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jbpm.bpel.BpelException;
import org.jbpm.bpel.db.IntegrationSession;
import org.jbpm.bpel.integration.def.Receiver;
import org.jbpm.graph.exe.ExecutionContext;
import org.jbpm.graph.exe.Token;

/**
 * @author Alejandro Gu韟ar
 * @version $Revision: 1.6 $ $Date: 2007/01/18 13:49:50 $
 */
public class RequestListener implements MessageListener {

  private final long receiverId;
  private final long tokenId;

  private MessageConsumer consumer;

  private static final Log log = LogFactory.getLog(RequestListener.class);

  RequestListener(long receiverId, long tokenId, MessageConsumer consumer) {
    this.receiverId = receiverId;
    this.tokenId = tokenId;
    this.consumer = consumer;
  }

  long getReceiverId() {
    return receiverId;
  }

  long getTokenId() {
    return tokenId;
  }

  public void open() throws JMSException {
    /*
     * jms could deliver a message immediately after setting this listener, so
     * make sure this listener is fully initialized at this point
     */
    consumer.setMessageListener(this);
    log.debug("opened request listener: receiver="
        + receiverId
        + ", token="
        + tokenId);
  }

  public void onMessage(Message request) {
    if (!(request instanceof ObjectMessage)) {
      log.error("received non-object jms message: " + request);
      return;
    }

    boolean acknowledgeMessage = true;

    JbpmContext jbpmContext = JbpmConfiguration.getInstance()
        .createJbpmContext();
    try {
      log.debug("received request: " + RequestListener.messageToString(request));

      // load the token and have it saved automatically
      Token token = jbpmContext.loadTokenForUpdate(tokenId);
      // load receiver via Hibernate session
      Receiver receiver = IntegrationSession.getInstance(jbpmContext)
          .loadReceiver(receiverId);

      // take the input parts from jms message
      ObjectMessage objectRequest = (ObjectMessage) request;
      Map parts = (Map) objectRequest.getObject();

      try {
        // assign the input parts
        receiver.readMessage(parts, token);

        // file outstanding request, in case operation has output
        if (OperationType.REQUEST_RESPONSE.equals(receiver.getOperation()
            .getStyle())) {
          // encapsulate the fields needed to reply
          OutstandingRequest outRequest = new OutstandingRequest(
              (Queue) request.getJMSReplyTo(), request.getJMSMessageID());
          // register the request in the integration control
          IntegrationControl integrationControl = JmsIntegrationService.get(
              jbpmContext).getIntegrationControl(token);
          integrationControl.addOutstandingRequest(receiver, token, outRequest);
        }

        // pass control to activity
        receiver.getInboundMessageActivity().messageReceived(receiver, token);
      }
      catch (BpelException e) {
        log.debug("caught exception while processing request", e);
        token.getNode().raiseException(e, new ExecutionContext(token));
      }
    }
    catch (Exception e) {
      log.error("could not resume process execution", e);
      acknowledgeMessage = false;
      jbpmContext.setRollbackOnly();
    }
    finally {
      try {
        // end transaction, close all services
        jbpmContext.close();
      }
      catch (RuntimeException e) {
        log.error("could not close jbpm context", e);
        acknowledgeMessage = false;
      }
    }

    // acknowledge request after everything succeeds
    if (acknowledgeMessage) {
      try {
        request.acknowledge();
      }
      catch (JMSException e) {
        log.error("could not acknowledge message", e);
      }
    }
  }

  public void close() throws JMSException {
    consumer.close();
    consumer = null;
    log.debug("closed request listener: receiver="
        + receiverId
        + ", token="
        + tokenId);
  }

  public String toString() {
    ToStringBuilder builder = new ToStringBuilder(this);
    try {
      builder.append("queue", ((QueueReceiver) consumer).getQueue()).append(
          "selector", consumer.getMessageSelector());
    }
    catch (JMSException e) {
      log.debug("could not fill request listener fields", e);
    }
    return builder.toString();
  }

  public static String messageToString(Message message) throws JMSException {
    StringBuffer result = new StringBuffer();
    // ID & destination
    result.append("id=").append(message.getJMSMessageID()).append(
        ", destination=").append(message.getJMSDestination());
    // replyTo & correlationID
    Destination replyTo = message.getJMSReplyTo();
    if (replyTo != null) {
      result.append(", replyTo=")
          .append(replyTo)
          .append(", correlationId=")
          .append(message.getJMSCorrelationID());
    }
    // properties
    Enumeration propertyNames = message.getPropertyNames();
    while (propertyNames.hasMoreElements()) {
      String propertyName = (String) propertyNames.nextElement();
      result.append(", ").append(propertyName).append('=').append(
          message.getObjectProperty(propertyName));
    }
    return result.toString();
  }

  public static class Key {

    private final long receiverId;
    private final long tokenId;

    Key(long receiverId, long tokenId) {
      this.receiverId = receiverId;
      this.tokenId = tokenId;
    }

    public long getReceiverId() {
      return receiverId;
    }

    public long getTokenId() {
      return tokenId;
    }

    public boolean equals(Object obj) {
      if (!(obj instanceof Key))
        return false;

      Key that = (Key) obj;
      return receiverId == that.receiverId && tokenId == that.tokenId;
    }

    public int hashCode() {
      return new HashCodeBuilder(863, 5).append(receiverId)
          .append(tokenId)
          .toHashCode();
    }

    public String toString() {
      return new ToStringBuilder(this).append("receiverId", receiverId).append(
          "tokenId", tokenId).toString();
    }
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -