⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsintegrationservice.java

📁 jbpm-bpel-1.1.Beta3 JBoss jBPM Starters Kit  是一个综合包
💻 JAVA
字号:
/*
 * JBoss, Home of Professional Open Source
 * Copyright 2005, JBoss Inc., and individual contributors as indicated
 * by the @authors tag.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the JBPM BPEL PUBLIC LICENSE AGREEMENT as
 * published by JBoss Inc.; either version 1.0 of the License, or
 * (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 */
package org.jbpm.bpel.integration.jms;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.wsdl.Operation;
import javax.wsdl.OperationType;
import javax.xml.namespace.QName;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.jbpm.JbpmContext;
import org.jbpm.bpel.BpelException;
import org.jbpm.bpel.endpointref.EndpointReference;
import org.jbpm.bpel.graph.def.BpelDefinition;
import org.jbpm.bpel.graph.exe.BpelFaultException;
import org.jbpm.bpel.integration.IntegrationService;
import org.jbpm.bpel.integration.client.SoapClient;
import org.jbpm.bpel.integration.def.Correlations;
import org.jbpm.bpel.integration.def.Invoker;
import org.jbpm.bpel.integration.def.PartnerLinkDefinition;
import org.jbpm.bpel.integration.def.Receiver;
import org.jbpm.bpel.integration.def.Replier;
import org.jbpm.bpel.integration.exe.PartnerLinkInstance;
import org.jbpm.bpel.variable.def.MessageType;
import org.jbpm.bpel.variable.exe.MessageValue;
import org.jbpm.graph.exe.Token;
import org.jbpm.svc.Services;

/**
 * @author Alejandro Gu韟ar
 * @version $Revision: 1.6 $ $Date: 2007/01/22 17:27:03 $
 */
public class JmsIntegrationService implements IntegrationService {

  private final JmsIntegrationServiceFactory integrationServiceFactory;

  private List requestListeners = new ArrayList();

  private static final Log log = LogFactory.getLog(JmsIntegrationService.class);
  private static final long serialVersionUID = 1L;

  JmsIntegrationService(JmsIntegrationServiceFactory integrationServiceFactory) {
    this.integrationServiceFactory = integrationServiceFactory;
  }

  public void receive(Receiver receiver, Token token) {
    assignIdIfTransient(token);
    IntegrationControl integrationControl = getIntegrationControl(token);
    try {
      Session jmsSession = createJmsSession(integrationControl);
      createRequestListener(receiver, token, integrationControl, jmsSession);
    }
    catch (JMSException e) {
      throw new BpelException("could not create request listener", e);
    }
  }

  public void receive(List receivers, Token token) {
    assignIdIfTransient(token);
    IntegrationControl integrationControl = getIntegrationControl(token);
    try {
      Session jmsSession = createJmsSession(integrationControl);
      Iterator receiverIt = receivers.iterator();
      while (receiverIt.hasNext()) {
        Receiver receiver = (Receiver) receiverIt.next();
        createRequestListener(receiver, token, integrationControl, jmsSession);
      }
    }
    catch (JMSException e) {
      throw new BpelException("could not create request listeners", e);
    }
  }

  private static void assignIdIfTransient(Token token) {
    // in case token is transient, assign an identifier to it
    if (token.getId() == 0L)
      Services.assignId(token);
  }

  private static Session createJmsSession(IntegrationControl integrationControl)
      throws JMSException {
    return integrationControl.getJmsConnection().createSession(false,
        Session.CLIENT_ACKNOWLEDGE);
  }

  private void createRequestListener(Receiver receiver, Token token,
      IntegrationControl integrationControl, Session jmsSession)
      throws JMSException {
    String selector = formatSelector(receiver, token);
    // retrieve the destination associated with the partner link
    Destination destination = integrationControl.getPartnerLinkEntry(
        receiver.getPartnerLink()).getDestination();
    MessageConsumer consumer = jmsSession.createConsumer(destination, selector);

    RequestListener requestListener = new RequestListener(receiver.getId(),
        token.getId(), consumer);
    integrationControl.addRequestListener(requestListener);
    requestListeners.add(requestListener);

    log.debug("created request listener: receiver="
        + receiver
        + ", token="
        + token);
  }

  /**
   * Formats a message selector including partner link, operation and
   * correlation properties.
   */
  private static String formatSelector(Receiver receiver, Token token) {
    StringBuffer selector = new StringBuffer();
    // partner link id
    selector.append(IntegrationConstants.PARTNER_LINK_ID_PROP)
        .append('=')
        .append(receiver.getPartnerLink().getId());
    // operation name
    selector.append(" AND ")
        .append(IntegrationConstants.OPERATION_NAME_PROP)
        .append("='")
        .append(receiver.getOperation().getName())
        .append('\'');
    // reception properties
    Correlations correlations = receiver.getCorrelations();
    // BPEL-90: avoid NPE when the receiver was defined with no correlations
    if (correlations != null) {
      // iterate over the property name-value pairs
      Iterator propertyEntryIt = correlations.getReceptionProperties(token)
          .entrySet()
          .iterator();
      while (propertyEntryIt.hasNext()) {
        Map.Entry propertyEntry = (Map.Entry) propertyEntryIt.next();
        QName propertyName = (QName) propertyEntry.getKey();
        // property value
        selector.append(" AND ").append(propertyName.getLocalPart()).append(
            "='").append(propertyEntry.getValue()).append('\'');
      }
    }
    return selector.toString();
  }

  public void endReception(Receiver receiver, Token token) {
    closeRequestListener(receiver, token, getIntegrationControl(token));
  }

  public void endReception(List receivers, Token token) {
    IntegrationControl integrationControl = getIntegrationControl(token);
    // close all request listeners
    Iterator receiverIt = receivers.iterator();
    while (receiverIt.hasNext()) {
      Receiver receiver = (Receiver) receiverIt.next();
      closeRequestListener(receiver, token, integrationControl);
    }
  }

  private void closeRequestListener(Receiver receiver, Token token,
      IntegrationControl integrationControl) {
    RequestListener requestListener = integrationControl.removeRequestListener(
        receiver, token);
    // some competing thread might have removed the request listener already
    if (requestListener == null)
      return;

    try {
      requestListener.close();
    }
    catch (JMSException e) {
      log.warn("could not close request listener: " + requestListener);
    }
    requestListeners.remove(requestListener);
  }

  public void reply(Replier replier, Token token) {
    try {
      replyOutstandingRequest(replier, token);
    }
    catch (JMSException e) {
      throw new BpelException("could not send reply", e);
    }
  }

  private void replyOutstandingRequest(Replier replier, Token token)
      throws JMSException {
    // extract the output parts
    Map parts = replier.writeMessage(token);

    // obtain the outstanding request for the partner link, operation and
    // message exchange of the replier
    IntegrationControl integrationControl = getIntegrationControl(token);
    OutstandingRequest request = integrationControl.removeOutstandingRequest(
        replier, token);

    Session jmsSession = createJmsSession(integrationControl);
    try {
      request.sendReply(parts, replier.getFaultName(), jmsSession);
    }
    finally {
      jmsSession.close();
    }
  }

  public void invoke(Invoker invoker, Token token) {
    // extract the input parts
    Map inputParts = invoker.writeMessage(token);

    PartnerLinkDefinition partnerLinkDef = invoker.getPartnerLink();
    PartnerLinkInstance partnerLinkInst = partnerLinkDef.getInstance(token);

    // acquire a client for the partner link
    IntegrationControl integrationControl = getIntegrationControl(token);
    SoapClient partnerClient = integrationControl.getPartnerClient(partnerLinkInst);

    Operation operation = invoker.getOperation();
    String operationName = operation.getName();

    // is this a request/response operation?
    if (OperationType.REQUEST_RESPONSE.equals(operation.getStyle())) {
      // send input, block for output
      Map outputParts;
      try {
        outputParts = partnerClient.call(operationName, inputParts);
      }
      catch (BpelFaultException e) {
        replaceMessageType(e, token);
        throw e;
      }

      log.debug("invoked: partnerLink="
          + partnerLinkDef.getName()
          + ", operation="
          + operationName
          + ", output="
          + outputParts);

      // assign the output data
      invoker.readMessage(outputParts, token);
    }
    else {
      // fire and forget
      partnerClient.callOneWay(operationName, inputParts);
      log.debug("invoked: partnerLink="
          + partnerLinkDef.getName()
          + ", operation="
          + operationName);
    }
  }

  /**
   * Replaces the transient message type in the given fault exception with the
   * persistent object from the process definition. {@link SoapClient} produces
   * faults with transient message types.
   */
  private static void replaceMessageType(BpelFaultException faultException,
      Token token) {
    // extract the message value from the exception
    MessageValue faultData = faultException.getFaultInstance()
        .getMessageValue();

    // find the related message type
    BpelDefinition processDefinition = (BpelDefinition) token.getProcessInstance()
        .getProcessDefinition();
    MessageType persistentType = processDefinition.getImports().getMessageType(
        faultData.getType().getName());

    // replace with the persistent object
    faultData.setType(persistentType);
  }

  public EndpointReference getMyReference(PartnerLinkDefinition partnerLink) {
    // TODO where to get my reference from?
    return null;
  }

  public void close() {
    try {
      openRequestListeners();
    }
    catch (JMSException e) {
      throw new BpelException("could not open subsequent request listeners", e);
    }
  }

  public void openRequestListeners() throws JMSException {
    for (int i = 0, n = requestListeners.size(); i < n; i++) {
      RequestListener requestListener = (RequestListener) requestListeners.get(i);
      requestListener.open();
    }
  }

  public IntegrationControl getIntegrationControl(Token token) {
    return integrationServiceFactory.getIntegrationControl((BpelDefinition) token.getProcessInstance()
        .getProcessDefinition());
  }

  public static JmsIntegrationService get(JbpmContext jbpmContext) {
    return (JmsIntegrationService) jbpmContext.getServices().getService(
        IntegrationService.SERVICE_NAME);
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -