📄 useragent.java
字号:
heartBeatTask.touch(); } AbstractJmsRequest request = ctx.getRequest(not.getMessage()); proxyImpl.reactToClientRequest(key.intValue(), request); if (ctx.isClosed()) { //CnxCloseRequest request = (CnxCloseRequest) not.getMessage(); connections.remove(key); HeartBeatTask hbt = (HeartBeatTask) heartBeatTasks.remove(key); if (hbt != null) { hbt.cancel(); } } } } // else should not happen because: // - RequestNot is transient // - RequestNot always follows an OpenConnection or // a GetConnection } private void doReact(ProxyRequestGroupNot not) { RequestNot[] requests = not.getRequests(); RequestBuffer rm = new RequestBuffer(this); for (int i = 0; i < requests.length; i++) { RequestNot req = requests[i]; Integer key = new Integer(req.getConnectionKey()); HeartBeatTask heartBeatTask = (HeartBeatTask) heartBeatTasks.get(key); if (heartBeatTask != null) { heartBeatTask.touch(); } ConnectionContext ctx = (ConnectionContext) connections.get(key); if (ctx != null) { AbstractJmsRequest request = ctx.getRequest(req.getMessage()); if (request instanceof ProducerMessages) { ProducerMessages pm = (ProducerMessages) request; rm.put(req.getConnectionKey(), pm); } else if (request instanceof JmsRequestGroup) { JmsRequestGroup jrg = (JmsRequestGroup)request; AbstractJmsRequest[] groupedRequests = jrg.getRequests(); for (int j = 0; j < groupedRequests.length; j++) { if (groupedRequests[i] instanceof ProducerMessages) { ProducerMessages pm = (ProducerMessages) groupedRequests[i]; rm.put(req.getConnectionKey(), pm); } else { proxyImpl.reactToClientRequest(key.intValue(), groupedRequests[i]); } } } else { proxyImpl.reactToClientRequest(key.intValue(), request); } } } rm.flush(); } private void doReact(CloseConnectionNot not) { if (connections != null) { Integer key = new Integer(not.getKey()); // The connection may have already been // explicitely closed by a CnxCloseRequest. if (connections.remove(key) != null) { proxyImpl.reactToClientRequest(not.getKey(), new CnxCloseRequest()); heartBeatTasks.remove(key); } } // else should not happen: // 1- CloseConnectionNot is transient // 2- CloseConnectionNot follows an OpenConnectionNot // or a GetConnectionNot } private void doReact(ResetCollocatedConnectionsNot not) { if (connections != null) { Collection values = connections.values(); Iterator iterator = values.iterator(); while (iterator.hasNext()) { Object obj = iterator.next(); // Standard connections must be dropped. // Only reliable connections can be recovered. if (obj instanceof StandardConnectionContext) { ConnectionContext cc = (ConnectionContext) obj; proxyImpl.reactToClientRequest( cc.getKey(), new CnxCloseRequest()); iterator.remove(); } } } } private void doReact(SendRepliesNot not) { Enumeration en = not.getReplies(); while (en.hasMoreElements()) { SendReplyNot sr = (SendReplyNot) en.nextElement(); doReact(sr); } } /** * Notification sent by local agents (destinations) * indicating that the proxy can reply to a client. * @param not */ private void doReact(SendReplyNot not) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.doReact(" + not + ')'); ClientContext cc = proxyImpl.getClientContext(not.getKey()); if (cc != null) { if (cc.setReply(not.getRequestId()) == 0) { sendToClient(not.getKey(), new ServerReply(not.getRequestId())); } } else if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) { // Can happen if the connection is closed before the SendReplyNot // arrives. MomTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent: client context not found for " + not); } } /** * Sends a notification to the specified agent. * * @param to the identifier of the recipient agent * @param not the notification to send */ public void sendNot(AgentId to, Notification not) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.sendNot(" + to + ',' + not + ')'); sendTo(to, not); } /** * Sends a reply to the client connected through * the specified connection. * * @param key the key of the connection the client * is connected through. * @param reply the reply to send to the client. */ public void sendToClient(int key, AbstractJmsReply reply) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.sendToClient(" + key + ',' + reply + ')'); Integer objKey = new Integer(key); if (connections != null) { ConnectionContext ctx = (ConnectionContext)connections.get(objKey); if (ctx != null) { ctx.pushReply(reply); } } // else may happen. Drop the reply. } /** * Timer task responsible for closing the connection if * it has not sent any requests for the duration 'timeout'. */ class HeartBeatTask extends fr.dyade.aaa.util.TimerTask implements java.io.Serializable { private int timeout; private Integer key; private long lastRequestDate; HeartBeatTask(int timeout, Integer key) { this.timeout = timeout; this.key = key; } public void run() { long date = System.currentTimeMillis(); if ((date - lastRequestDate) > timeout) { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "HeartBeatTask: close connection"); ConnectionContext ctx = (ConnectionContext)connections.remove(key); heartBeatTasks.remove(key); proxyImpl.reactToClientRequest(key.intValue(), new CnxCloseRequest()); Exception exc = new Exception("Connection " + getId() + ':' + key + " closed"); ctx.pushError(exc); } else { start(); } } public void start() { try { ConnectionManager.getTimer().schedule(this, timeout); } catch (Exception exc) { throw new Error(exc.toString()); } } public void touch() { lastRequestDate = System.currentTimeMillis(); } } public void setNoSave() { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "setNoSave()"); super.setNoSave(); } public void setSave() { if (MomTracing.dbgProxy.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgProxy.log(BasicLevel.DEBUG, "UserAgent.setSave()"); super.setSave(); } public void readBag(ObjectInputStream in) throws IOException, ClassNotFoundException { connections = (Hashtable) in.readObject(); heartBeatTasks = (Hashtable) in.readObject(); if (heartBeatTasks != null) { // Start the tasks Enumeration tasks = heartBeatTasks.elements(); while (tasks.hasMoreElements()) { HeartBeatTask task = (HeartBeatTask) tasks.nextElement(); task.start(); } } proxyImpl.readBag(in); } public void writeBag(ObjectOutputStream out) throws IOException { out.writeObject(connections); out.writeObject(heartBeatTasks); proxyImpl.writeBag(out); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -