📄 isomux.java
字号:
} catch (Throwable e) { if (!terminate) { channel.setUsable(false); if (!(e instanceof EOFException)) Logger.log (new LogEvent (this, "muxreceiver", e)); synchronized(parent) { parent.notify(); } } } } else { try { synchronized(rx) { rx.wait(); } } catch (InterruptedException e) { Logger.log (new LogEvent (this, "muxreceiver", e)); } } } Logger.log (new LogEvent (this, "muxreceiver", "terminate")); } public void setLogger (Logger logger, String realm) { } public String getRealm () { return realm; } public Logger getLogger() { return logger; } } private void doTransmit() throws ISOException, IOException { while (txQueue.size() > 0) { Object o = txQueue.firstElement(); ISOMsg m = null; if (o instanceof ISORequest) { ISORequest r = (ISORequest) o; if (r.isExpired()) cnt[TX_EXPIRED]++; else { m = r.getRequest(); rxQueue.put (getKey(m), r); r.setTransmitted (); synchronized(rx) { rx.notify(); // required by ChannelPool } } } else if (o instanceof ISOMsg) { m = (ISOMsg) o; } if (m != null) { try { channel.send(m); cnt[TX]++; } catch (ISOException e) { Logger.log (new LogEvent (this, "error", e)); } } txQueue.removeElement(o); txQueue.trimToSize(); } } public void run () { tx = Thread.currentThread(); int rxPriority = rx.getPriority(); // Bug#995787 if (rxPriority < Thread.MAX_PRIORITY) { // OS/400 V4R4 JVM rx.setPriority (rxPriority+1); // Thread problem // (Vincent.Greene@amo.com) } rx.start(); boolean firstTime = true; while (!terminate || !txQueue.isEmpty()) { try { if (channel.isConnected()) { doTransmit(); } else if (doConnect) { if (firstTime) { firstTime = !firstTime; channel.connect(); } else { Thread.sleep(5000); channel.reconnect(); } cnt[CONNECT]++; synchronized(rx) { rx.notify(); } } else { // nothing to do ... try { Thread.sleep (5000); } catch (InterruptedException ex) { } } synchronized(this) { if (!terminate && channel.isConnected() && txQueue.size() == 0) { this.wait(); } } } catch (ConnectException e) { if (channel instanceof ClientChannel) { ClientChannel cc = (ClientChannel) channel; Logger.log (new LogEvent (this, "connection-refused", cc.getHost()+":"+cc.getPort()) ); } try { Thread.sleep (1000); } catch (InterruptedException ex) { } } catch (Exception e) { Logger.log (new LogEvent (this, "mux", e)); try { Thread.sleep (1000); } catch (InterruptedException ex) { } } } // Wait for the receive queue to empty out before shutting down while (!rxQueue.isEmpty()) { try { Thread.sleep(5000); // Wait for the receive queue to clear. purgeRxQueue(); // get rid of expired stuff } catch (InterruptedException e) { break; } } // By closing the channel, we force the receive thread to terminate try { channel.disconnect(); } catch (IOException e) { } synchronized(rx) { rx.notify(); } try { rx.join (); } catch (InterruptedException e) { } Logger.log (new LogEvent (this, "mux", "terminate")); } /** * queue an ISORequest */ synchronized public void queue(ISORequest r) { txQueue.addElement(r); this.notify(); } /** * send a message over channel, usually a * response from an ISORequestListener */ synchronized public void send(ISOMsg m) { txQueue.addElement(m); this.notify(); } private void terminate(boolean hard) { LogEvent evt = new LogEvent (this, "mux", "<terminate type=\"" + (hard ? "hard" : "soft") +"\"/>"); evt.addMessage (this); Logger.log (evt); terminate = true; synchronized(this) { if (hard) { txQueue.removeAllElements(); rxQueue.clear(); } this.notify(); } } /** * terminate MUX * @param wait Time to wait before forcing shutdown */ public void terminate (int wait) { terminate(false); tx.interrupt(); rx.interrupt(); try { tx.join(wait); if (tx.isAlive()) { terminate(true); tx.join(); } } catch (InterruptedException e) { } } /** * terminate MUX (soft terminate, wait forever if necessary) */ public void terminate() { terminate(0); } public boolean isConnected() { return channel.isConnected(); } public void setLogger (Logger logger, String realm) { this.logger = logger; this.realm = realm; } public String getRealm () { return realm; } public Logger getLogger() { return logger; } public boolean isTerminating() { return terminate; } /** * associates this ISOMUX with a name using NameRegistrar * @param name name to register * @see NameRegistrar */ public void setName (String name) { this.name = name; NameRegistrar.register ("mux."+name, this); } /** * @return ISOMUX instance with given name. * @throws NameRegistrar.NotFoundException; * @see NameRegistrar */ public static ISOMUX getMUX (String name) throws NameRegistrar.NotFoundException { return (ISOMUX) NameRegistrar.get ("mux."+name); } /** * @return this ISOMUX's name ("" if no name was set) */ public String getName() { return this.name; } /** * ISOMUXs usually calls connect() on the underlying ISOChannel<br> * You can prevent this behaveour by passing a false value. * @param connect false to prevent connection (default true) */ public void setConnect (boolean connect) { this.doConnect = connect; if (!connect && isConnected()) { channel.setUsable(false); try { channel.disconnect(); } catch (IOException e) { Logger.log (new LogEvent(this, "set-connect", e)); } synchronized(this) { this.notify(); } } } /** * @return connect flag value */ public boolean getConnect() { return doConnect; } public void dump (PrintStream p, String indent) { p.println (indent + "<mux-stats connected=\"" + channel.isConnected() + "\">"); showCounters (p); p.println (indent + "</mux-stats>"); } public ISOMsg request (ISOMsg m, long timeout) throws ISOException { ISORequest req = new ISORequest (m); queue (req); return req.getResponse ((int) timeout); } public void request (ISOMsg m, long timeout, ISOResponseListener r, Object handBack) throws ISOException { throw new ISOException ("Not implemented"); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -