📄 safchannel.java
字号:
public void dumpList () { LogEvent evt = new LogEvent (this, "saf-dump-list"); Iterator iter = queue.getQueue().iterator(); while (iter.hasNext()) evt.addMessage (iter.next()); Logger.log (evt); } public void exportQueue (String fileName) throws IOException { synchronized (queue) { FileOutputStream fos = new FileOutputStream (fileName, true); PrintStream p = new PrintStream (fos); Iterator iter = queue.getQueue().iterator(); while (iter.hasNext()) ((ISOMsg)iter.next()).dump (p, ""); fos.flush (); } } private void logUpdate (LogEntry entry) throws IOException { if (debug) Logger.log (new LogEvent (this, "saf-update", entry)); log.update (entry); } /** * @param log an already recovered - ready to use ReliableLog */ public void setReliableLog (ReliableLog log) { this.log = log; } public void snapshot(OutputStream out) throws Exception { ObjectOutputStream stream = new ObjectOutputStream(out); stream.writeUTF(this.getClass().getName()); stream.writeObject(queue.getQueue()); stream.writeObject(null); stream.flush(); } public void recover(InputStream in) throws Exception { ObjectInputStream stream = new ObjectInputStream(in); if (!this.getClass().getName().equals(stream.readUTF())) throw new IOException("log from wrong implementation"); queue.setQueue ((LinkedList) stream.readObject()); } public void applyUpdate(Object update) throws Exception { if (!(update instanceof LogEntry)) throw new Exception ("not a LogEntry"); LinkedList list = queue.getQueue(); LogEntry entry = (LogEntry) update; switch (entry.op) { case LogEntry.QUEUE: list.addLast (entry.value); if (debug) ((ISOMsg)entry.value).dump (System.out, " QUEUE>"); break; case LogEntry.REQUEUE: list.removeFirst(); list.addFirst (entry.value); if (debug) ((ISOMsg)entry.value).dump (System.out, "REQUEUE>"); break; case LogEntry.DEQUEUE: if (debug) System.out.println ("DEQUEUE>"); list.removeFirst (); break; } } /** * @param filter incoming filter */ public void addIncomingFilter (ISOFilter filter) { incomingFilters.add (filter); } /** * @param filter outgoing filter to add */ public void addOutgoingFilter (ISOFilter filter) { outgoingFilters.add (filter); } public void addFilter (ISOFilter filter) { incomingFilters.add (filter); outgoingFilters.add (filter); } public void removeFilter (ISOFilter filter) { incomingFilters.remove (filter); outgoingFilters.remove (filter); } public void removeIncomingFilter (ISOFilter filter) { incomingFilters.remove (filter); } public void removeOutgoingFilter (ISOFilter filter) { outgoingFilters.remove (filter); } public Collection getIncomingFilters() { return incomingFilters; } public Collection getOutgoingFilters() { return outgoingFilters; } public void setIncomingFilters (Collection filters) { incomingFilters = new Vector (filters); } public void setOutgoingFilters (Collection filters) { outgoingFilters = new Vector (filters); } protected ISOMsg applyFilters (Collection filters, ISOMsg m, LogEvent evt) throws VetoException { Iterator iter = outgoingFilters.iterator(); while (iter.hasNext()) { m.setDirection(ISOMsg.OUTGOING); m = ((ISOFilter) iter.next()).filter (this, m, evt); } m.setDirection(ISOMsg.OUTGOING); return m; } public void run () { for (;;) { try { if (!usable) { Thread.sleep (1000); continue; } mux = ISOMUX.getMUX (cfg.get ("destination-mux")); if (mux != null && mux.isConnected ()) { ISOMsg msg = (ISOMsg) queue.dequeue(); if (isExpired (msg)) { logUpdate (new LogEntry (LogEntry.DEQUEUE, null)); Thread.yield(); // easy baby ... continue; } ISOMsg m = (ISOMsg) msg.clone(); m.setDirection(ISOMsg.OUTGOING); m = applyFilters (outgoingFilters, m, null); m.setDirection(ISOMsg.OUTGOING); if (cfg.getBoolean ("flag-retransmissions", false) && !msg.isRetransmission()) { msg.setRetransmissionMTI(); logUpdate (new LogEntry (LogEntry.REQUEUE, msg)); } int timeout = cfg.getInt ("timeout", 60000); if (timeout > 0) { ISORequest req = new ISORequest (m); mux.queue (req); ISOMsg resp = req.getResponse (timeout); if (isValidResponse (resp, m)) logUpdate (new LogEntry (LogEntry.DEQUEUE, null)); else queue.requeue (msg); } else { mux.send (m); logUpdate (new LogEntry (LogEntry.DEQUEUE, null)); } long delay = cfg.getLong ("delay"); if (delay > 0) Thread.sleep (delay); } } catch (NameRegistrar.NotFoundException e) { relax (); } catch (Exception e) { Logger.log (new LogEvent (this, "run", e)); relax (); } } } private void relax () { try { Thread.sleep (1000); } catch (InterruptedException ie) { } } /** * @param resp response * @param formerReq former request message * @return true if response is valid and message can be dequeued */ protected boolean isValidResponse (ISOMsg resp, ISOMsg formerReq) { return resp != null; } /** * @param request * @return true if message is expired and have to be discarded */ protected boolean isExpired (ISOMsg m) { return false; } public static class LogEntry implements Serializable, Loggeable { public static final int QUEUE = 0; public static final int REQUEUE = 1; public static final int DEQUEUE = 2; private static final long serialVersionUID=1; public int op; public Object value; public LogEntry (int op, Object value) { super(); this.op = op; this.value = value; } public void dump (PrintStream p, String indent) { String inner = indent + " "; String tag = null; switch (op) { case LogEntry.QUEUE: tag = "queue"; break; case LogEntry.REQUEUE: tag = "requeue"; break; case LogEntry.DEQUEUE: tag = "dequeue"; break; } if (tag != null) { p.println (indent + "<" + tag + ">"); ((ISOMsg)value).dump (p, inner); p.println (indent + "</" + tag + ">"); } } } public int getPendingCount() { return queue.pending (); } public int getMessageCount() { return cnt[TX]; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -