⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 isomux.java

📁 POS is a Java&#174 platform-based, mission-critical, ISO-8583 based financial transaction library/fr
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * jPOS Project [http://jpos.org] * Copyright (C) 2000-2008 Alejandro P. Revilla * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program.  If not, see <http://www.gnu.org/licenses/>. */package org.jpos.iso;import java.io.EOFException;import java.io.IOException;import java.io.PrintStream;import java.net.ConnectException;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import org.jpos.core.Configuration;import org.jpos.core.ReConfigurable;import org.jpos.util.LogEvent;import org.jpos.util.LogSource;import org.jpos.util.Loggeable;import org.jpos.util.Logger;import org.jpos.util.NameRegistrar;/** * Should run in it's own thread. Starts another Receiver thread * * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a> * @version $Revision: 2624 $ $Date: 2008-05-19 12:19:14 -0300 (Mon, 19 May 2008) $ * @see ISORequest * @see ISOChannel * @see ISOException * @see ISORequestListener */public class ISOMUX implements Runnable, ISOSource, LogSource, MUX,                               ReConfigurable, Loggeable, ISOMUXMBean{    private ISOChannel channel;    private Thread rx = null, tx = null;    private Vector txQueue;    private Hashtable rxQueue;    private int traceNumberField = 11;    private volatile boolean terminate = false;    private String name;    private ISOMUX muxInstance;    private boolean doConnect;    protected Logger logger = null;    protected String realm = null;     public static final int CONNECT      = 0;    public static final int TX           = 1;    public static final int RX           = 2;    public static final int TX_EXPIRED   = 3;    public static final int RX_EXPIRED   = 4;    public static final int TX_PENDING   = 5;    public static final int RX_PENDING   = 6;    public static final int RX_UNKNOWN   = 7;    public static final int RX_FORWARDED = 8;    public static final int SIZEOF_CNT   = 9;    private int[] cnt;    private ISORequestListener requestListener;    /**     * @param c a connected or unconnected ISOChannel     */    public ISOMUX (ISOChannel c) {        super();        initMUX(c);    }    /**     * @param c a connected or unconnected ISOChannel     * @param logger a logger     * @param realm  logger's realm     */    public ISOMUX (ISOChannel c, Logger logger, String realm) {        super();        setLogger (logger, realm);        initMUX (c);    }    public void setConfiguration (Configuration cfg) {        setTraceNumberField (cfg.getInt ("tracenofield"));    }    private void initMUX (ISOChannel c) {        doConnect = true;        channel = c;        rx = null;        txQueue = new Vector();        rxQueue = new Hashtable();        cnt = new int[SIZEOF_CNT];        requestListener = null;        rx = new Thread (new Receiver(this),"ISOMUX-Receiver");        name = "";        muxInstance = this;    }    /**     * allow changes to default value 11 (used in ANSI X9.2 messages)     * @param traceNumberField new traceNumberField     */      public void setTraceNumberField(int traceNumberField) {        if (traceNumberField > 0)             this.traceNumberField = traceNumberField;    }    /**     * @return the underlying ISOChannel     */    public ISOChannel getISOChannel() {        return channel;    }   /**    * set an ISORequestListener for unmatched messages    * @param rl a request listener object    * @see ISORequestListener    */    public void setISORequestListener(ISORequestListener rl) {        requestListener = rl;    }   /**    * remove possible ISORequestListener     * @see ISORequestListener    */    public void removeISORequestListener() {        requestListener = null;    }    /**     * construct key to match request with responses     * @param   m   request/response     * @return      key (default terminal(41) + tracenumber(11))     */    protected String getKey(ISOMsg m) throws ISOException {        return (m.hasField(41)?ISOUtil.zeropad((String)m.getValue(41),16) : "")           + (m.hasField (traceNumberField) ?                ISOUtil.zeropad((String) m.getValue(traceNumberField),6) :                Long.toString (System.currentTimeMillis()));    }    /**     * get rid of expired requests     */    private void purgeRxQueue() {        Enumeration e = rxQueue.keys();        while (e.hasMoreElements()) {            Object key = e.nextElement();            ISORequest r = (ISORequest) rxQueue.get(key);            if (r != null && r.isExpired()) {                rxQueue.remove(key);                cnt[RX_EXPIRED]++;            }        }    }    /**     * show Counters     * @param p - where to print     */    public void showCounters(PrintStream p) {        int[] c = getCounters();        p.println("           Connections: " + c[CONNECT]);        p.println("           TX messages: " + c[TX]);        p.println("            TX expired: " + c[TX_EXPIRED]);        p.println("            TX pending: " + c[TX_PENDING]);        p.println("           RX messages: " + c[RX]);        p.println("            RX expired: " + c[RX_EXPIRED]);        p.println("            RX pending: " + c[RX_PENDING]);        p.println("          RX unmatched: " + c[RX_UNKNOWN]);        p.println("          RX forwarded: " + c[RX_FORWARDED]);    }    /**     * get the counters in order to pretty print them     * or for stats purposes     */    public int[] getCounters() {        cnt[TX_PENDING] = txQueue.size();        cnt[RX_PENDING] = rxQueue.size();        return cnt;    }    public void resetCounters () {        cnt = new int[SIZEOF_CNT];    }    /**     * @return number of re-connections on the underlying channel     */    public int getConnectionCount () {        return cnt[CONNECT];    }    /**     * @return number of transmitted messages     */    public int getTransmitCount () {        return cnt[TX];    }    /**     * @return number of expired messages     */    public int getExpiredCount () {        return cnt[TX_EXPIRED];    }    /**     * @return number of messages waiting to be transmited     */    public int getTransmitPendingCount () {        return txQueue.size();    }    /**     * @return number of received messages     */    public int getReceiveCount () {        return cnt[RX];    }    /**     * @return number of unanswered messages     */    public int getReceiveExpiredCount () {        return cnt[RX_EXPIRED];    }    /**     * @return number of messages waiting for response     */    public int getReceivePendingCount () {        return rxQueue.size();    }    /**     * @return number of unknown messages received     */    public int getUnknownCount () {        return cnt[RX_UNKNOWN];    }    /**     * @return number of forwarded messages received     */    public int getForwardedCount () {        return cnt[RX_FORWARDED];    }    private class Receiver implements Runnable, LogSource {        Runnable parent;        protected Receiver(Runnable p) {            parent = p;        }        public void run() {            int i = 0;            while (!terminate || !rxQueue.isEmpty() || !txQueue.isEmpty()) {                if (i++ % 250 == 1)                     Logger.log (new LogEvent (this, "mux", parent));                if (channel.isConnected()) {                    try {                        ISOMsg d = channel.receive();                        cnt[RX]++;                        String k = getKey(d);                        ISORequest r = (ISORequest) rxQueue.get(k);                        boolean forward = true;                        if (r != null) {                            rxQueue.remove(k);                            synchronized (r) {                                if (r.isExpired()) {                                    if ((++cnt[RX_EXPIRED]) % 10 == 0)                                        purgeRxQueue();                                }                                else {                                    r.setResponse(d);                                    forward = false;                                }                            }                        }                        if (forward) {                            if (requestListener != null) {                                requestListener.process(muxInstance, d);                                cnt[RX_FORWARDED]++;                            }                            else                                 cnt[RX_UNKNOWN]++;                        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -