📄 connection.java
字号:
} // Catching an IllegalStateException if the connection is broken: catch (IllegalStateException caughtISE) { isE = caughtISE; } // At this point, the server won't deliver messages anymore, // the connection just waits for the sessions to have finished their // processings. Session session; for (int i = 0; i < sessions.size(); i++) { session = (Session) sessions.elementAt(i); try { session.repliesIn.stop(); } catch (InterruptedException iE) {} session.stop(); } started = false; if (isE != null) { JoramTracing.log(JoramTracing.ERROR, isE); throw isE; } if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": is stopped."); } /** * API method for closing the connection; even if the connection appears * to be broken, closes the sessions. * * @exception JMSException Actually never thrown. */ public void close() throws JMSException { // Ignoring the call if the connection is closed: if (closed) return; closing = true; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": closing..."); // Finishing the timer, if any: if (sessionsTimer != null) sessionsTimer.cancel(); // Stopping the connection: try { stop(); } // Catching a JMSException if the connection is broken: catch (JMSException jE) {} // Closing the sessions: Session session; while (! sessions.isEmpty()) { session = (Session) sessions.elementAt(0); try { session.close(); } // Catching a JMSException if the connection is broken: catch (JMSException jE) {} } // Closing the connection consumers: if (cconsumers != null) { ConnectionConsumer cc; while (! cconsumers.isEmpty()) { cc = (ConnectionConsumer) cconsumers.elementAt(0); cc.close(); } } // Closing the connection: connectionImpl.close(); // Shutting down the driver, if needed: if (! driver.stopping) driver.stop(); requestsTable.clear(); requestsTable = null; repliesTable.clear(); repliesTable = null; closed = true; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); } /** Returns a new request identifier. */ synchronized int nextRequestId() { if (requestsC == Integer.MAX_VALUE) requestsC = 0; return requestsC++; } /** Returns a new session identifier. */ synchronized String nextSessionId() { if (sessionsC == Integer.MAX_VALUE) sessionsC = 0; sessionsC++; return "c" + key + "s" + sessionsC; } /** Returns a new message identifier. */ synchronized String nextMessageId() { if (messagesC == Integer.MAX_VALUE) messagesC = 0; messagesC++; return "ID:" + proxyId + "c" + key + "m" + messagesC; } /** Returns a new subscription name. */ synchronized String nextSubName() { if (subsC == Integer.MAX_VALUE) subsC = 0; subsC++; return "c" + key + "sub" + subsC; } /** Schedules a session task to the connection's timer. */ synchronized void schedule(com.scalagent.kjoram.util.TimerTask task) { if (sessionsTimer == null) return; try { sessionsTimer.schedule(task, factoryParameters.txPendingTimer * 1000); } catch (Exception exc) {} } /** * Method sending a synchronous request to the server and waiting for an * answer. * * @exception IllegalStateException If the connection is closed or broken. * @exception JMSSecurityException When sending a request to a destination * not accessible because of security. * @exception InvalidDestinationException When sending a request to a * destination that no longer exists. * @exception JMSException If the request failed for any other reason. */ AbstractJmsReply syncRequest(AbstractJmsRequest request) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); if (request.getRequestId() == -1) request.setRequestId(nextRequestId()); int requestId = request.getRequestId(); try { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: " + request.getClass().getName() + " with id: " + requestId); Lock lock = new Lock(); requestsTable.put(request.getKey(), lock); synchronized(lock) { connectionImpl.send(request); while (true) { try { lock.wait(); break; } catch (InterruptedException iE) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN,this + ": caught InterruptedException"); continue; } } requestsTable.remove(request.getKey()); } } // Catching an exception because of... catch (Exception e) { JMSException jE = null; if (e instanceof JMSException) throw (JMSException) e; else jE = new JMSException("Exception while getting a reply."); jE.setLinkedException(e); // Unregistering the request: if (requestsTable != null) requestsTable.remove(request.getKey()); JoramTracing.log(JoramTracing.ERROR, jE); throw jE; } // Finally, returning the reply: AbstractJmsReply reply = (AbstractJmsReply) repliesTable.remove(request.getKey()); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": got reply."); // If the reply is null, it means that the requester has been unlocked // by the driver because it detected a connection failure: if (reply == null) throw new IllegalStateException("Connection is broken."); // Else, if the reply notifies of an error: throwing the appropriate exc: else if (reply instanceof MomExceptionReply) { MomException mE = ((MomExceptionReply) reply).getException(); if (mE instanceof AccessException) throw new JMSSecurityException(mE.getMessage()); else if (mE instanceof DestinationException) throw new InvalidDestinationException(mE.getMessage()); else throw new JMSException(mE.getMessage()); } // Else: returning the reply: else return reply; } /** * Actually sends an asynchronous request to the server. * * @exception IllegalStateException If the connection is closed or broken. */ void asyncRequest(AbstractJmsRequest request) throws IllegalStateException { if (closed) throw new IllegalStateException("Forbidden call on a closed" + " connection."); if (request.getRequestId() == -1) request.setRequestId(nextRequestId()); try { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": sends request: " + request.getClass().getName() + " with id: " + request.getRequestId()); connectionImpl.send(request); } // In the case of a broken connection: catch (IllegalStateException exc) { // Removes the potentially stored requester: requestsTable.remove(request.getKey()); JoramTracing.log(JoramTracing.ERROR, exc); throw exc; } } /** * Method called by the driver for distributing the server replies * it gets on the connection. * <p> * Server replies are either synchronous replies to client requests, * or asynchronous message deliveries, or asynchronous exceptions * notifications. */ void distribute(AbstractJmsReply reply) { // Getting the correlation identifier: int correlationId = reply.getCorrelationId(); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": got reply: " + correlationId); Object obj = null; if (correlationId != -1) obj = requestsTable.get(reply.getKey()); // If the request is a synchronous request, putting the reply in the // replies table and unlocking the requester: if (obj instanceof Lock) { repliesTable.put(reply.getKey(), reply); synchronized(obj) { obj.notify(); } } // If the reply is an asynchronous exception, passing it: else if (reply instanceof MomExceptionReply) { // Removing the potential consumer object from the table: requestsTable.remove(reply.getKey()); MomException mE = ((MomExceptionReply) reply).getException(); JMSException jE = null; if (mE instanceof AccessException) jE = new JMSSecurityException(mE.getMessage()); else if (mE instanceof DestinationException) jE = new InvalidDestinationException(mE.getMessage()); else jE = new JMSException(mE.getMessage()); onException(jE); } // Else, if the reply is an asynchronous delivery: else if (obj != null) { try { // Passing the reply to its consumer: if (obj instanceof ConnectionConsumer) ((ConnectionConsumer) obj).repliesIn.push(reply); else if (obj instanceof MessageConsumer) ((MessageConsumer) obj).sess.repliesIn.push(reply); } catch (StoppedQueueException sqE) { denyDelivery((ConsumerMessages) reply); } } // Finally, if the requester disappeared, denying the delivery: else if (reply instanceof ConsumerMessages) denyDelivery((ConsumerMessages) reply); } /** Actually denies a non deliverable delivery. */ private void denyDelivery(ConsumerMessages delivery) { Vector msgs = delivery.getMessages(); com.scalagent.kjoram.messages.Message msg; Vector ids = new Vector(); for (int i = 0; i < msgs.size(); i++) { msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(i); ids.addElement(msg.getIdentifier()); } if (ids.isEmpty()) return; try { // Sending the denying as an asynchronous request, as no synchronous // behaviour is expected here: asyncRequest(new SessDenyRequest(delivery.comesFrom(), ids, delivery.getQueueMode(), true)); } // If sthg goes wrong while denying, nothing more can be done! catch (JMSException jE) {} }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -