📄 integrationcontrol.java
字号:
+ instance
+ ", reference="
+ partnerRef);
}
// select a port from the service catalog with the criteria known at
// this point
Port port = partnerRef.selectPort(getServiceCatalog());
log.debug("selected partner port: instance="
+ instance
+ ", port="
+ port.getName());
// create a client for that port
partnerClient = new SoapClient(port);
partnerClients.put(instanceId, partnerClient);
}
}
return partnerClient;
}
private static long getOrAssignId(PartnerLinkInstance instance) {
long instanceId = instance.getId();
// in case instance is transient, assign an identifier to it
if (instanceId == 0L) {
Services.assignId(instance);
instanceId = instance.getId();
}
return instanceId;
}
EndpointReference createPartnerReference(PartnerLinkDefinition definition) {
PartnerLinkEntry entry = getPartnerLinkEntry(definition);
InitiateMode initiateMode = entry.getInitiateMode();
EndpointReference partnerReference;
if (InitiateMode.STATIC.equals(initiateMode)) {
partnerReference = entry.getPartnerReference();
}
else if (InitiateMode.PULL.equals(initiateMode)) {
EndpointReferenceFactory refFactory = EndpointReferenceFactory.getInstance(
DEFAULT_REFERENCE_NAME, null);
partnerReference = refFactory.createEndpointReference();
}
else {
throw new BpelFaultException(
BpelConstants.FAULT_UNINITIALIZED_PARTNER_ROLE);
}
return partnerReference;
}
public ServiceCatalog getServiceCatalog() {
return getAppDescriptor().getServiceCatalog();
}
public List getStartListeners() {
return Collections.unmodifiableList(startListeners);
}
/**
* Prepares inbound message activities annotated to create a process instance
* for receiving requests.
*/
public void enableInboundMessageActivities(JbpmContext jbpmContext)
throws NamingException, JMSException {
InitialContext initialContext = new InitialContext();
try {
// publish partner link information to JNDI
BpelDefinition process = getAppDescriptor().findProcessDefinition(
jbpmContext);
createPartnerLinkEntries(initialContext, process);
// open a jms connection
createJmsConnection(initialContext);
// enable start IMAs
StartListenersBuilder builder = new StartListenersBuilder(this);
builder.visit(process);
startListeners = builder.getStartListeners();
// enable outstanding IMAs
IntegrationSession integrationSession = IntegrationSession.getInstance(jbpmContext);
IntegrationService integrationService = Receiver.getIntegrationService(jbpmContext);
// receive
Iterator receiveTokenIt = integrationSession.findReceiveTokens(process)
.iterator();
while (receiveTokenIt.hasNext()) {
Token token = (Token) receiveTokenIt.next();
Receive receive = (Receive) token.getNode();
integrationService.receive(receive.getReceiver(), token);
}
// pick
Iterator pickTokenIt = integrationSession.findPickTokens(process)
.iterator();
while (pickTokenIt.hasNext()) {
Token token = (Token) pickTokenIt.next();
// pick points activity token to begin mark
Begin begin = (Begin) token.getNode();
Pick pick = (Pick) begin.getCompositeActivity();
integrationService.receive(pick.getOnMessages(), token);
}
// event
Iterator eventTokenIt = integrationSession.findEventTokens(process)
.iterator();
while (eventTokenIt.hasNext()) {
Token token = (Token) eventTokenIt.next();
// scope points events token to itself
Scope scope = (Scope) token.getNode();
Iterator onEventsIt = scope.getOnEvents().iterator();
while (onEventsIt.hasNext()) {
OnEvent onEvent = (OnEvent) onEventsIt.next();
integrationService.receive(onEvent.getReceiver(), token);
}
}
// start message delivery
jmsConnection.start();
}
finally {
initialContext.close();
}
}
/**
* Prevents inbound message activities annotated to create a process instance
* from further receiving requests.
*/
public void disableInboundMessageActivities() throws JMSException {
// disable start IMAs
Iterator startListenerIt = startListeners.iterator();
while (startListenerIt.hasNext()) {
StartListener startListener = (StartListener) startListenerIt.next();
startListener.close();
}
// disable outstanding IMAs
Iterator requestListenerIt = requestListeners.values().iterator();
while (requestListenerIt.hasNext()) {
RequestListener requestListener = (RequestListener) requestListenerIt.next();
requestListener.close();
}
// release jms connection
closeJmsConnection();
}
void createPartnerLinkEntries(InitialContext initialContext,
BpelDefinition process) throws NamingException {
// match scopes with their descriptors
Map scopeDescriptors = new ScopeMatcher().match(process, getAppDescriptor());
// lookup destinations & bind port entries
PartnerLinkEntriesBuilder builder = new PartnerLinkEntriesBuilder(
scopeDescriptors, getJmsContext(initialContext),
integrationServiceFactory.getDefaultDestination());
builder.visit(process);
partnerLinkEntriesById = builder.getPartnerLinkEntriesById();
partnerLinkEntriesByHandle = builder.getPartnerLinkEntriesByHandle();
}
void createJmsConnection(InitialContext initialContext)
throws NamingException, JMSException {
// retrieve connection factory
Context jmsContext = getJmsContext(initialContext);
ConnectionFactory jmsConnectionFactory;
try {
jmsConnectionFactory = (ConnectionFactory) jmsContext.lookup(CONNECTION_FACTORY_NAME);
log.debug("retrieved jms connection factory: " + CONNECTION_FACTORY_NAME);
}
catch (NameNotFoundException e) {
log.debug("jms connection factory not found: " + CONNECTION_FACTORY_NAME);
log.debug("falling back to default from integration service factory");
jmsConnectionFactory = integrationServiceFactory.getDefaultConnectionFactory();
if (jmsConnectionFactory == null)
throw e;
}
// create a connection
jmsConnection = jmsConnectionFactory.createConnection();
}
void closeJmsConnection() throws JMSException {
if (jmsConnection != null) {
jmsConnection.close();
jmsConnection = null;
}
}
void reset() {
appDescriptor = null;
partnerLinkEntriesById = null;
startListeners = Collections.EMPTY_LIST;
requestListeners.clear();
outstandingRequests.clear();
partnerClients.clear();
}
static Context getJmsContext(InitialContext initialContext) {
String jmsContextName = DEFAULT_JMS_CONTEXT;
if (JbpmConfiguration.Configs.hasObject(NAME_JMS_CONTEXT)) {
jmsContextName = JbpmConfiguration.Configs.getString(NAME_JMS_CONTEXT);
}
Context jmsContext;
try {
jmsContext = (Context) initialContext.lookup(jmsContextName);
}
catch (NamingException e) {
log.debug("could not retrieve jms context, falling back to initial context");
jmsContext = initialContext;
}
return jmsContext;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -