📄 cbdispatchconnection.java
字号:
/*------------------------------------------------------------------------------Name: CbDispatchConnection.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.dispatch;import java.util.ArrayList;import java.util.Map;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.checkpoint.I_Checkpoint;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.protocol.I_CallbackDriver;import org.xmlBlaster.util.qos.address.AddressBase;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.MsgUnitRaw;import org.xmlBlaster.engine.MsgUnitWrapper;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.SubscriptionInfo;import org.xmlBlaster.engine.admin.I_AdminSession;import org.xmlBlaster.engine.admin.I_AdminSubject;import org.xmlBlaster.engine.qos.UpdateReturnQosServer;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.xbformat.I_ProgressListener;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.util.dispatch.DispatchConnection;import org.xmlBlaster.util.dispatch.I_PostSendListener;import org.xmlBlaster.authentication.plugins.CryptDataHolder;import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;/** * Holding all necessary infos to establish callback * connections and invoke their update(). * @see DispatchConnection * @author xmlBlaster@marcelruff.info * @author michele@laghi.eu */public final class CbDispatchConnection extends DispatchConnection{ private static Logger log = Logger.getLogger(CbDispatchConnection.class.getName()); public final String ME; private I_CallbackDriver cbDriver; private String cbKey; private I_AdminSession session; private SessionName sessionName; /** * @param connectionsHandler The DevliveryConnectionsHandler witch i belong to * @param address The address i shall connect to */ public CbDispatchConnection(Global glob, CbDispatchConnectionsHandler connectionsHandler, AddressBase address) throws XmlBlasterException { super(glob, connectionsHandler, address); this.ME = connectionsHandler.getDispatchManager().getQueue().getStorageId().toString(); sessionName = connectionsHandler.getDispatchManager().getSessionName(); ServerScope serverScope = (ServerScope)glob; I_AdminSubject subject = serverScope.getAuthenticate().getSubjectInfoByName(sessionName); if (subject != null) this.session = subject.getSessionByPubSessionId(sessionName.getPublicSessionId()); } /** * @return A nice name for logging */ public final String getName() { return ME; } public void setAddress(AddressBase address) throws XmlBlasterException { super.setAddress(address); if (this.cbDriver == null || !this.cbDriver.isAlive()) loadPlugin(); this.cbDriver.init(this.glob, (CallbackAddress)address); } /** * The name of the protocol driver */ public final String getDriverName() { return (this.cbDriver != null) ? this.cbDriver.getName() : "unknown"; } /** Load the appropriate protocol driver */ public final void loadPlugin() throws XmlBlasterException { // Check if a native callback driver is passed in the glob Hashtable (e.g. for "jdbc" or "native"), take this instance // SOCKET protocol this.cbDriver = (I_CallbackDriver)this.address.getCallbackDriver(); if (this.cbDriver == null) { // JDBC service this.cbKey = address.getType() + address.getRawAddress(); this.cbDriver = glob.getNativeCallbackDriver(this.cbKey); } if (this.cbDriver == null) { // instantiate the callback plugin (CORBA, XMLRPC) ... this.cbDriver = ((org.xmlBlaster.engine.ServerScope)glob).getCbProtocolManager().getNewCbProtocolDriverInstance(address.getType()); if (this.cbDriver == null) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, callback protocol type='" + address.getType() + "' is not supported"); // glob.addNativeCallbackDriver(this.cbKey, this.cbDriver); if (log.isLoggable(Level.FINE)) log.fine(ME+": Created callback plugin '" + this.address.getType() + "'"); } else { if (log.isLoggable(Level.FINE)) log.fine(ME+": Created native callback driver for protocol '" + address.getType() + "'"); } } public I_ProgressListener registerProgressListener(I_ProgressListener listener) { if (this.cbDriver == null) return null; return this.cbDriver.registerProgressListener(listener); } /** * @see DispatchConnection#connectLowlevel() */ public final void connectLowlevel() throws XmlBlasterException { // Initialize the driver (connect on lowlevel layer) ... this.cbDriver.init(glob, (CallbackAddress)address); // Check if it is available if (super.address.getPingInterval() > 0) { // Send clientProperty "__initialCallbackPing"=false to supress initial ping boolean initialCallbackPing = super.address.getEnv(Constants.CLIENTPROPERTY_INITIAL_CALLBACK_PING , true).getValue(); if (initialCallbackPing) doPing("<qos><state info='"+Constants.INFO_INITIAL+"'/></qos>"); } if (log.isLoggable(Level.FINE)) log.fine(ME+": Connected low level to callback '" + this.address.getType() + "'"); } class Holder { public MsgQueueUpdateEntry msgQueueUpdateEntry; public MsgUnitRaw msgUnitRaw; public String subscriptionId; public Holder(MsgQueueUpdateEntry msgQueueUpdateEntry, MsgUnitRaw msgUnitRaw, String subscriptionId) { this.msgQueueUpdateEntry = msgQueueUpdateEntry; this.msgUnitRaw = msgUnitRaw; this.subscriptionId = subscriptionId; } } /** * We export/encrypt the message (call the interceptor) * * @param holderList list of Holder instances * @param methodName UPDATE or UPDATE_ONEWAY * @throws XmlBlasterException */ private void exportCrypt(ArrayList holderList, MethodName methodName) throws XmlBlasterException { if (holderList == null || methodName == null) return; I_MsgSecurityInterceptor securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor(); if (securityInterceptor == null) { log.warning(ME+": No session security context, sending " + holderList.size() + " messages without encryption"); return; } ServerScope scope = (ServerScope)this.glob; for (int i=0; i<holderList.size(); i++) { Holder holder = (Holder)holderList.get(i); // Pass subscribeQos or connectQos - clientProperties to exportMessage() in case there are // some interesting settings provided, for example a desired XSL transformation SubscriptionInfo subscriptionInfo = null; Map map = null; if (holder.subscriptionId != null) { subscriptionInfo = scope.getRequestBroker().getClientSubscriptions().getSubscription(holder.subscriptionId); if (subscriptionInfo != null) map = subscriptionInfo.getQueryQosDataClientProperties(); //String xslFileName = subscriptionInfo.getQueryQosData().getClientProperty("__xslTransformerFileName", (String)null); } else { // todo: use map=ConnectQos.getClientProperties() as a map to pass to dataHolder } CryptDataHolder dataHolder = new CryptDataHolder(methodName, holder.msgUnitRaw, map); holder.msgUnitRaw = securityInterceptor.exportMessage(dataHolder); } if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted " + holderList.size() + " " + methodName + " messages."); } /** * Send the messages back to the client. * @param msgArr Should be a copy of the original, since we export it which changes/encrypts the content * <p> * The RETURN value is transferred in the msgArr[i].getReturnObj(), for oneway updates it is null * </p> */ public void doSend(MsgQueueEntry[] msgArr_) throws XmlBlasterException { ArrayList oneways = null; ArrayList responders = null; {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -