📄 proxyimpl.java
字号:
/** * Either forwards the <code>ConsumerReceiveRequest</code> request as a * <code>ReceiveRequest</code> directly to the target queue, or wraps it * and sends it to the proxy if destinated to a subscription. */ private void reactToClientRequest(int key, ConsumerReceiveRequest req) { if (req.getQueueMode()) { ReceiveRequest not = new ReceiveRequest( key, req.getRequestId(), req.getSelector(), req.getTimeToLive(), req.getReceiveAck(), null, 1); AgentId to = AgentId.fromString(req.getTarget()); if (to.getTo() == proxyAgent.getId().getTo()) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> local receiving"); not.setPersistent(false); proxyAgent.sendNot(to, not); } else { proxyAgent.sendNot(to, not); } } else { doReact(key, req); } } /** * Either forwards the <code>ConsumerSetListRequest</code> request as a * <code>ReceiveRequest</code> directly to the target queue, or wraps it * and sends it to the proxy if destinated to a subscription. */ private void reactToClientRequest(int key, ConsumerSetListRequest req) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "ProxyImp.reactToClientRequest(" + key + ',' + req + ')'); if (req.getQueueMode()) { ReceiveRequest not = new ReceiveRequest(key, req.getRequestId(), req.getSelector(), 0, false, req.getMessageIdsToAck(), req.getMessageCount()); AgentId to = AgentId.fromString(req.getTarget()); if (to.getTo() == proxyAgent.getId().getTo()) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, " -> local sending"); not.setPersistent(false); proxyAgent.sendNot(to, not); } else { proxyAgent.sendNot(to, not); } } else { doReact(key, req); } } /** * Forwards the client's <code>QBrowseRequest</code> request as * a <code>BrowseRequest</code> MOM request directly to a destination. */ private void reactToClientRequest(int key, QBrowseRequest req) { proxyAgent.sendNot(AgentId.fromString(req.getTarget()), new BrowseRequest(key, req.getRequestId(), req.getSelector())); } private void reactToClientRequest(int key, JmsRequestGroup request) { AbstractJmsRequest[] requests = request.getRequests(); RequestBuffer rm = new RequestBuffer(proxyAgent); for (int i = 0; i < requests.length; i++) { if (requests[i] instanceof ProducerMessages) { ProducerMessages pm =(ProducerMessages) requests[i]; rm.put(key, pm); } else { reactToClientRequest(key, requests[i]); } } rm.flush(); } /** * Distributes the received notifications to the appropriate reactions. * <p> * A JMS proxy reacts to: * <ul> * <li><code>SyncReply</code> proxy synchronizing notification,</li> * <li><code>SetDMQRequest</code> admin notification,</li> * <li><code>SetThreshRequest</code> admin notification,</li> * <li><code>SetNbMaxMsgRequest</code> admin notification,</li> * <li><code>Monit_GetNbMaxMsg</code> admin notification,</li> * <li><code>Monit_GetDMQSettings</code> monitoring notification,</li> * <li><code>AbstractReply</code> destination replies,</li> * <li><code>AdminReply</code> administration replies,</li> * <li><code>fr.dyade.aaa.agent.UnknownAgent</code>.</li> * </ul> * * @exception UnknownNotificationException * If the notification is not expected. */ public void react(AgentId from, Notification not) throws UnknownNotificationException { // Administration and monitoring requests: if (not instanceof SetDMQRequest) doReact(from, (SetDMQRequest) not); else if (not instanceof SetThreshRequest) doReact(from, (SetThreshRequest) not); else if (not instanceof SetNbMaxMsgRequest) doReact(from, (SetNbMaxMsgRequest) not); else if (not instanceof Monit_GetNbMaxMsg) doReact(from, (Monit_GetNbMaxMsg) not); else if (not instanceof Monit_GetDMQSettings) doReact(from, (Monit_GetDMQSettings) not); // Synchronization notification: else if (not instanceof SyncReply) doReact((SyncReply) not); // Notifications sent by a destination: else if (not instanceof AbstractReply) doFwd(from, (AbstractReply) not); else if (not instanceof AdminReply) doReact((AdminReply) not); // Platform notifications: else if (not instanceof UnknownAgent) doReact((UnknownAgent) not); else if (not instanceof UserAdminRequestNot) doReact((UserAdminRequestNot) not); else throw new UnknownNotificationException("Unexpected notification: " + not.getClass().getName()); } /** * Distributes the client requests to the appropriate reactions. * <p> * The proxy accepts the following requests: * <ul> * <li><code>GetAdminTopicRequest</code></li> * <li><code>CnxConnectRequest</code></li> * <li><code>CnxStartRequest</code></li> * <li><code>CnxStopRequest</code></li> * <li><code>SessCreateTQRequest</code></li> * <li><code>SessCreateTTRequest</code></li> * <li><code>ConsumerSubRequest</code></li> * <li><code>ConsumerUnsubRequest</code></li> * <li><code>ConsumerCloseSubRequest</code></li> * <li><code>ConsumerSetListRequest</code></li> * <li><code>ConsumerUnsetListRequest</code></li> * <li><code>ConsumerReceiveRequest</code></li> * <li><code>ConsumerAckRequest</code></li> * <li><code>ConsumerDenyRequest</code></li> * <li><code>SessAckRequest</code></li> * <li><code>SessDenyRequest</code></li> * <li><code>TempDestDeleteRequest</code></li> * <li><code>XACnxPrepare</code></li> * <li><code>XACnxCommit</code></li> * <li><code>XACnxRollback</code></li> * <li><code>XACnxRecoverRequest</code></li> * </ul> * <p> * A <code>JmsExceptReply</code> is sent back to the client when an * exception is thrown by the reaction. */ private void doReact(int key, AbstractJmsRequest request) { try { // Updating the active context if the request is not a new context // request! if (! (request instanceof CnxConnectRequest)) setCtx(key); if (request instanceof GetAdminTopicRequest) doReact(key, (GetAdminTopicRequest) request); else if (request instanceof CnxConnectRequest) doReact(key, (CnxConnectRequest) request); else if (request instanceof CnxStartRequest) doReact((CnxStartRequest) request); else if (request instanceof CnxStopRequest) doReact((CnxStopRequest) request); else if (request instanceof SessCreateTQRequest) doReact((SessCreateTQRequest) request); else if (request instanceof SessCreateTTRequest) doReact((SessCreateTTRequest) request); else if (request instanceof ConsumerSubRequest) doReact((ConsumerSubRequest) request); else if (request instanceof ConsumerUnsubRequest) doReact((ConsumerUnsubRequest) request); else if (request instanceof ConsumerCloseSubRequest) doReact((ConsumerCloseSubRequest) request); else if (request instanceof ConsumerSetListRequest) doReact((ConsumerSetListRequest) request); else if (request instanceof ConsumerUnsetListRequest) doReact((ConsumerUnsetListRequest) request); else if (request instanceof ConsumerReceiveRequest) doReact((ConsumerReceiveRequest) request); else if (request instanceof ConsumerAckRequest) doReact((ConsumerAckRequest) request); else if (request instanceof ConsumerDenyRequest) doReact((ConsumerDenyRequest) request); else if (request instanceof SessAckRequest) doReact((SessAckRequest) request); else if (request instanceof SessDenyRequest) doReact((SessDenyRequest) request); else if (request instanceof TempDestDeleteRequest) doReact((TempDestDeleteRequest) request); else if (request instanceof XACnxPrepare) doReact((XACnxPrepare) request); else if (request instanceof XACnxCommit) doReact((XACnxCommit) request); else if (request instanceof XACnxRollback) doReact((XACnxRollback) request); else if (request instanceof XACnxRecoverRequest) doReact((XACnxRecoverRequest) request); else if (request instanceof CnxCloseRequest) doReact(key, (CnxCloseRequest) request); else if (request instanceof ActivateConsumerRequest) doReact(key, (ActivateConsumerRequest) request); else if (request instanceof CommitRequest) doReact(key, (CommitRequest)request); } catch (MomException mE) { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, mE); // Sending the exception to the client: doReply(new MomExceptionReply(request.getRequestId(), mE)); } } /** * Method implementing the reaction to a <code>GetAdminTopicRequest</code> * requesting the identifier of the local admin topic. * <p> * It simply sends back a <code>GetAdminTopicReply</code> holding the * admin topic identifier. * * @exception AccessException If the requester is not an administrator. */ private void doReact(int key, GetAdminTopicRequest req) throws AccessException {// if (! admin)// throw new AccessException("Request forbidden to a non administrator."); doReply( key, new GetAdminTopicReply( req, AdminTopicImpl.getReference().getId().toString())); } /** * Method implementing the reaction to a <code>CnxConnectRequest</code> * requesting the key of the active context. * <p> * It simply sends back a <code>ConnectReply</code> holding the active * context's key. * * @exception DestinationException In case of a first administrator * context, if the local administration topic reference * is not available. */ private void doReact(int key, CnxConnectRequest req) throws DestinationException { // state change, so save. proxyAgent.setSave(); setActiveCtxId(key); activeCtx = new ClientContext(proxyAgent.getId(), key); activeCtx.setProxyAgent(proxyAgent); contexts.put(new Integer(key), activeCtx); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Connection " + key + " opened."); doReply(new CnxConnectReply(req, key, proxyAgent.getId().toString())); } /** * Method implementing the proxy reaction to a <code>CnxStartRequest</code> * requesting to start a context. * <p> * This method sends the pending <code>ConsumerMessages</code> replies, * if any. */ private void doReact(CnxStartRequest req) { activeCtx.setActivated(true); // Delivering the pending deliveries, if any: for (Enumeration deliveries = activeCtx.getPendingDeliveries(); deliveries.hasMoreElements();) doReply((AbstractJmsReply) deliveries.nextElement()); // Clearing the pending deliveries. activeCtx.clearPendingDeliveries(); } /** * Method implementing the JMS proxy reaction to a * <code>CnxStopRequest</code> requesting to stop a context. * <p> * This method sends a <code>ServerReply</code> back. */ private void doReact(CnxStopRequest req) { activeCtx.setActivated(false); doReply(new ServerReply(req)); } /** * Method implementing the JMS proxy reaction to a * <code>SessCreateTQRequest</code> requesting the creation of a temporary * queue. * <p> * Creates the queue, sends it a <code>SetRightRequest</code> for granting * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a * <code>SyncReply</code> notification it sends to itself. This latest * action's purpose is to preserve causality. * * @exception RequestException If the queue could not be deployed. */ private void doReact(SessCreateTQRequest req) throws RequestException { try { Queue queue = new Queue(); queue.init(proxyAgent.getId(), null); AgentId qId = queue.getId(); queue.deploy(); // Setting free WRITE right on the queue: proxyAgent.sendNot(qId, new SetRightRequest(null, null, 2)); activeCtx.addTemporaryDestination(qId); SessCreateTDReply reply = new SessCreateTDReply(req, qId.toString()); proxyAgent.sendNot(proxyAgent.getId(), new SyncReply(activeCtxId, reply)); proxyAgent.sendNot(AdminTopic.getDefault(), new RegisterTmpDestNot(qId, false, true)); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Temporary queue " + qId + " created."); } catch (java.io.IOException iE) { throw new RequestException("Could not create temporary queue: " + iE); } } /** * Method implementing the JMS proxy reaction to a * <code>SessCreateTTRequest</code> requesting the creation of a temporary * topic. * <p> * Creates the topic, sends it a <code>SetRightRequest</code> for granting * WRITE access to all, and wraps a <code>SessCreateTDReply</code> in a * <code>SyncReply</code> notification it sends to itself. This latest * action's purpose is to preserve causality. * * @exception RequestException If the topic could not be deployed. */ private void doReact(SessCreateTTRequest req) throws RequestException { Topic topic = new Topic(); topic.init(proxyAgent.getId(), null); AgentId tId = topic.getId();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -