⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 safchannel.java

📁 java pos,你可以直接编译运行,
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            evt.addMessage (iter.next());        Logger.log (evt);    }    public void exportQueue (String fileName) throws IOException {        synchronized (queue) {            FileOutputStream fos = new FileOutputStream (fileName, true);            PrintStream p = new PrintStream (fos);                            Iterator iter = queue.getQueue().iterator();            while (iter.hasNext())                 ((ISOMsg)iter.next()).dump (p, "");            fos.flush ();        }    }    private void logUpdate (LogEntry entry) throws IOException {        if (debug)             Logger.log (new LogEvent (this, "saf-update", entry));        log.update (entry);    }    /**     * @param log an already recovered - ready to use ReliableLog     */    public void setReliableLog (ReliableLog log) {        this.log = log;    }    public void snapshot(OutputStream out) throws Exception {        ObjectOutputStream stream = new ObjectOutputStream(out);        stream.writeUTF(this.getClass().getName());        stream.writeObject(queue.getQueue());        stream.writeObject(null);        stream.flush();    }    public void recover(InputStream in) throws Exception    {        ObjectInputStream stream = new ObjectInputStream(in);        if (!this.getClass().getName().equals(stream.readUTF()))            throw new IOException("log from wrong implementation");        queue.setQueue ((LinkedList) stream.readObject());    }    public void applyUpdate(Object update) throws Exception {        if (!(update instanceof LogEntry))            throw new Exception ("not a LogEntry");        LinkedList list = queue.getQueue();        LogEntry entry = (LogEntry) update;        switch (entry.op) {            case LogEntry.QUEUE:                list.addLast (entry.value);                if (debug)                    ((ISOMsg)entry.value).dump (System.out, "  QUEUE>");                break;            case LogEntry.REQUEUE:                list.removeFirst();                list.addFirst (entry.value);                if (debug)                    ((ISOMsg)entry.value).dump (System.out, "REQUEUE>");                break;            case LogEntry.DEQUEUE:                if (debug)                    System.out.println ("DEQUEUE>");                list.removeFirst ();                break;        }    }    /**     * @param filter incoming filter     */    public void addIncomingFilter (ISOFilter filter) {        incomingFilters.add (filter);    }    /**     * @param filter outgoing filter to add     */    public void addOutgoingFilter (ISOFilter filter) {        outgoingFilters.add (filter);    }    public void addFilter (ISOFilter filter) {        incomingFilters.add (filter);        outgoingFilters.add (filter);    }    public void removeFilter (ISOFilter filter) {        incomingFilters.remove (filter);        outgoingFilters.remove (filter);    }    public void removeIncomingFilter (ISOFilter filter) {        incomingFilters.remove (filter);    }    public void removeOutgoingFilter (ISOFilter filter) {        outgoingFilters.remove (filter);    }    public Collection getIncomingFilters() {        return incomingFilters;    }    public Collection getOutgoingFilters() {        return outgoingFilters;    }    public void setIncomingFilters (Collection filters) {        incomingFilters = new Vector (filters);    }    public void setOutgoingFilters (Collection filters) {        outgoingFilters = new Vector (filters);    }    protected ISOMsg applyFilters (Collection filters, ISOMsg m, LogEvent evt)         throws VetoException    {        Iterator iter  = outgoingFilters.iterator();        while (iter.hasNext()) {            m.setDirection(ISOMsg.OUTGOING);            m = ((ISOFilter) iter.next()).filter (this, m, evt);        }        m.setDirection(ISOMsg.OUTGOING);        return m;    }    public void run () {        for (;;) {            try {                if (!usable) {                    Thread.sleep (1000);                    continue;                }                mux = ISOMUX.getMUX (cfg.get ("destination-mux"));                if (mux != null && mux.isConnected ()) {                    ISOMsg msg = (ISOMsg) queue.dequeue();                    if (isExpired (msg)) {                        logUpdate (new LogEntry (LogEntry.DEQUEUE, null));                        Thread.yield(); // easy baby ...                        continue;                    }                    ISOMsg m   = (ISOMsg) msg.clone();                    m.setDirection(ISOMsg.OUTGOING);                    m = applyFilters (outgoingFilters, m, null);                    m.setDirection(ISOMsg.OUTGOING);                    if (cfg.getBoolean ("flag-retransmissions", false) &&                         !msg.isRetransmission())                    {                                                msg.setRetransmissionMTI();                        logUpdate (new LogEntry (LogEntry.REQUEUE, msg));                    }                    int timeout = cfg.getInt ("timeout", 60000);                    if (timeout > 0) {                        ISORequest req = new ISORequest (m);                        mux.queue (req);                        ISOMsg resp = req.getResponse (timeout);                        if (isValidResponse (resp, m))                            logUpdate (new LogEntry (LogEntry.DEQUEUE, null));                        else                            queue.requeue (msg);                    } else {                        mux.send (m);                        logUpdate (new LogEntry (LogEntry.DEQUEUE, null));                    }                    long delay = cfg.getLong ("delay");                    if (delay > 0)                        Thread.sleep (delay);                }                else                    relax ();            } catch (NameRegistrar.NotFoundException e) {                relax ();            } catch (Exception e) {                 Logger.log (new LogEvent (this, "run", e));                relax ();            }        }    }    private void relax () {        try {            Thread.sleep (1000);        } catch (InterruptedException ie) { }    }    /**     * @param resp response     * @param formerReq former request message     * @return true if response is valid and message can be dequeued     */    protected boolean isValidResponse (ISOMsg resp, ISOMsg formerReq) {        return resp != null;    }    /**     * @param request     * @return true if message is expired and have to be discarded     */    protected boolean isExpired (ISOMsg m) {        return false;    }    public static class LogEntry implements Serializable, Loggeable {        public static final int QUEUE   = 0;        public static final int REQUEUE = 1;        public static final int DEQUEUE = 2;        private static final long serialVersionUID=1;        public int op;        public Object value;        public LogEntry (int op, Object value) {            super();            this.op = op;            this.value = value;        }        public void dump (PrintStream p, String indent) {            String inner = indent + "  ";            String tag = null;            switch (op) {                case LogEntry.QUEUE:                    tag = "queue";                    break;                case LogEntry.REQUEUE:                    tag = "requeue";                    break;                case LogEntry.DEQUEUE:                    tag = "dequeue";                    break;            }            if (tag != null) {                if (value instanceof ISOMsg) {                    p.println (indent + "<" + tag + ">");                    ((ISOMsg)value).dump (p, inner);                    p.println (indent + "</" + tag + ">");                } else {                    p.println (indent + "<" + tag + "/>");                }            }        }    }    public int getPendingCount() {        return queue.pending ();    }    public int getMessageCount() {        return cnt[TX];    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -