📄 session.java
字号:
+ " denying the message."); if (queueMode) cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, true)); else cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId,false)); } // Else: else { // Preparing ack for manual sessions: if (! autoAck) prepareAck(targetName, msgId, queueMode); // Passing the current message: try { messageListener.onMessage(Message.wrapMomMessage(this, momMsg)); // Auto ack: acknowledging the message: if (autoAck) cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId, queueMode)); } // Catching a JMSException means that the building of the Joram // message went wrong: denying the message: catch (JMSException jE) { JoramTracing.log(JoramTracing.ERROR, this + ": error while processing the" + " received message: " + jE); if (queueMode) cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); else cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); } // Catching a RuntimeException means that the client onMessage() code // is incorrect; denying the message if needed: catch (RuntimeException rE) { JoramTracing.log(JoramTracing.ERROR,this + ": RuntimeException thrown" + " by the listener: " + rE); if (autoAck && queueMode) cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); else if (autoAck && ! queueMode) cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); } } } } catch (JMSException e) {} } /** * API method. * * @exception IllegalStateException If the session is closed, or not * transacted, or if the connection is broken. */ public void commit() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); if (! transacted) throw new IllegalStateException("Can't commit a non transacted" + " session."); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": committing..."); // If the transaction was scheduled: cancelling. if (scheduled) { closingTask.cancel(); scheduled = false; } // Sending client messages: try { Enumeration dests = sendings.keys(); String dest; ProducerMessages pM; while (dests.hasMoreElements()) { dest = (String) dests.nextElement(); pM = (ProducerMessages) sendings.remove(dest); cnx.syncRequest(pM); } // Acknowledging the received messages: acknowledge(); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": committed."); } // Catching an exception if the sendings or acknowledgement went wrong: catch (JMSException jE) { TransactionRolledBackException tE = new TransactionRolledBackException("A JMSException was thrown during" + " the commit."); tE.setLinkedException(jE); JoramTracing.log(JoramTracing.ERROR, "Exception: " + tE); rollback(); throw tE; } } /** * API method. * * @exception IllegalStateException If the session is closed, or not * transacted. */ public void rollback() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); if (! transacted) throw new IllegalStateException("Can't rollback a non transacted" + " session."); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": rolling back..."); // If the transaction was scheduled: cancelling. if (scheduled) { closingTask.cancel(); scheduled = false; } // Denying the received messages: deny(); // Deleting the produced messages: sendings.clear(); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": rolled back."); } /** * API method. * * @exception IllegalStateException If the session is closed, or transacted. */ public void recover() throws JMSException { if (transacted) throw new IllegalStateException("Can't recover a transacted session."); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + " recovering..."); // Stopping the session, denying the received messages: stop(); deny(); // Re-starting the session: start(); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": recovered."); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the request fails for any other reason. */ public void unsubscribe(String name) throws JMSException { MessageConsumer cons; for (int i = 0; i < consumers.size(); i++) { cons = (MessageConsumer) consumers.elementAt(i); if (! cons.queueMode && cons.targetName.equals(name)) throw new JMSException("Can't delete durable subscription " + name + " as long as an active subscriber exists."); } cnx.syncRequest(new ConsumerUnsubRequest(name)); } /** * API method. * * @exception JMSException Actually never thrown. */ public synchronized void close() throws JMSException { // Ignoring the call if the session is already closed: if (closed) return; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": closing..."); // Finishing the timer, if any: if (consumersTimer != null) consumersTimer.cancel(); // Emptying the current pending deliveries: try { repliesIn.stop(); } catch (InterruptedException iE) {} // Stopping the session: stop(); // Denying the non acknowledged messages: if (transacted) rollback(); else deny(); // Closing the session's resources: while (! browsers.isEmpty()) ((QueueBrowser) browsers.elementAt(0)).close(); while (! consumers.isEmpty()) ((MessageConsumer) consumers.elementAt(0)).close(); while (! producers.isEmpty()) ((MessageProducer) producers.elementAt(0)).close(); cnx.sessions.removeElement(this); closed = true; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); } /** Schedules a consumer task to the session's timer. */ synchronized void schedule(TimerTask task, long timer) { if (consumersTimer == null) consumersTimer = new com.scalagent.kjoram.util.Timer(); try { consumersTimer.schedule(task, timer); } catch (Exception exc) {} } /** * Starts the asynchronous deliveries in the session. * <p> * This method is called either by a consumer when setting the first * message listener of the session, if the connection is started, or * by the starting connection if at least one listener has previously * been set by a consumer. * <p> * It creates and starts a daemon dedicated to distributing the * asynchronous deliveries arriving on the connection to their consumers. * * @exception IllegalStateException If the session is closed. */ void start() throws IllegalStateException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": starting..."); repliesIn.start(); // Starting the daemon if needed: if (! started && msgListeners > 0) { daemon = new SessionDaemon(this); daemon.setDaemon(false); daemon.start(); } started = true; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": started."); } /** * Stops the asynchronous deliveries processing in the session. * <p> * This method must be carefully used. When the session is stopped, the * connection might very well going on pushing deliveries in the * session's queue. If the session is never re-started, these deliveries * will never be poped out, and this may lead to a situation of consumed * but never acknowledged messages. * <p> * This fatal situation never occurs as the <code>stop()</code> method is * either called by the <code>recover()</code> method, which then calls * the <code>start()</code> method, or by the <code>Session.close()</code> * and <code>Connection.stop()</code> methods, which first empty the * session's deliveries and forbid any further push. */ void stop() { // Ignoring the call if the session is already stopped: if (! started) return; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": stopping..."); // Stopping the daemon if needed: if (daemon != null) { daemon.stop(); daemon = null; } // Synchronizing the stop() with the consumers: if (consumers != null) { MessageConsumer consumer; for (int i = 0; i < consumers.size(); i++) { consumer = (MessageConsumer) consumers.elementAt(i); consumer.syncro(); } } started = false; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": stopped."); } /** * Method called by message producers when producing a message for * preparing the session to later commit it. * * @param dest The destination the message is destinated to. * @param msg The message. */ void prepareSend(Destination dest, com.scalagent.kjoram.messages.Message msg) { // If the transaction was scheduled, cancelling: if (scheduled) closingTask.cancel(); ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName()); if (pM == null) { pM = new ProducerMessages(dest.getName()); sendings.put(dest.getName(), pM); } pM.addMessage(msg); // If the transaction was scheduled, re-scheduling it: if (scheduled) cnx.schedule(closingTask); } /** * Method called by message consumers when receiving a message for * preparing the session to later acknowledge or deny it. * * @param name Name of the destination or of the proxy subscription * the message comes from. * @param id Identifier of the consumed message. * @param queueMode <code>true</code> if the message consumed comes from * a queue. */ void prepareAck(String name, String id, boolean queueMode) { // If the transaction was scheduled, cancelling: if (scheduled) closingTask.cancel(); MessageAcks acks = (MessageAcks) deliveries.get(name); if (acks == null) { acks = new MessageAcks(queueMode); deliveries.put(name, acks); } acks.addId(id); // If the transaction must be scheduled, scheduling it: if (closingTask != null) { scheduled = true; cnx.schedule(closingTask); } } /** * Method acknowledging the received messages. * * @exception IllegalStateException If the connection is broken. */ void acknowledge() throws IllegalStateException { String target; MessageAcks acks; Enumeration targets = deliveries.keys(); while (targets.hasMoreElements()) { target = (String) targets.nextElement(); acks = (MessageAcks) deliveries.remove(target); cnx.asyncRequest(new SessAckRequest(target, acks.getIds(), acks.getQueueMode())); } } /** Method denying the received messages. */ void deny() { try { String target; MessageAcks acks; SessDenyRequest deny; Enumeration targets = deliveries.keys(); while (targets.hasMoreElements()) { target = (String) targets.nextElement(); acks = (MessageAcks) deliveries.remove(target); deny = new SessDenyRequest(target, acks.getIds(), acks.getQueueMode()); if (acks.getQueueMode()) cnx.syncRequest(deny); else cnx.asyncRequest(deny); } } catch (JMSException jE) {} } /** * Method called by the session daemon for passing an * asynchronous message delivery to the appropriate consumer. */ void distribute(AbstractJmsReply asyncReply) { // Getting the message: ConsumerMessages reply = (ConsumerMessages) asyncReply; // Getting the consumer: MessageConsumer cons = null; if (reply.getQueueMode()) { cons = (MessageConsumer) cnx.requestsTable.remove(reply.getKey()); } else cons = (MessageConsumer) cnx.requestsTable.get(reply.getKey()); // Passing the message(s) to the consumer: if (cons != null) { Vector msgs = reply.getMessages(); for (int i = 0; i < msgs.size(); i++) cons.onMessage((com.scalagent.kjoram.messages.Message) msgs.elementAt(i)); } // The target consumer of the received message may be null if it has // been closed without having stopped the connection: denying the // deliveries. else { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, this + ": an asynchronous" + " delivery arrived for an improperly" + " closed consumer: denying the" + " messages."); Vector msgs = reply.getMessages(); com.scalagent.kjoram.messages.Message msg; Vector ids = new Vector(); for (int i = 0; i < msgs.size(); i++) { msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i); ids.addElement(msg.getIdentifier()); } if (ids.isEmpty()) return; try { cnx.asyncRequest(new SessDenyRequest(reply.comesFrom(), ids, reply.getQueueMode(), true)); } catch (JMSException jE) {} } } /** * The <code>SessionCloseTask</code> class is used by non-XA transacted * sessions for taking care of closing them if they tend to be pending, * and if a transaction timer has been set. */ private class SessionCloseTask extends TimerTask { /** Method called when the timer expires, actually closing the session. */ public void run() { try { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, "Session closed " + "because of pending transaction"); close(); } catch (Exception e) {} } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -