⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 realtimestockmonitor.java

📁 JStock是一个免费股市软件
💻 JAVA
字号:
/*
 * RealTimeStockMonitor.java
 *
 * Created on April 24, 2007, 9:56 PM
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or (at
 * your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Copyright (C) 2007 Cheok YanCheng <yccheok@yahoo.com>
 */

package org.yccheok.jstock.engine;

import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 *
 * @author yccheok
 */
public class RealTimeStockMonitor extends Subject<RealTimeStockMonitor, java.util.List<Stock>> {
    
    /** Creates a new instance of RealTimeStockMonitor */
    public RealTimeStockMonitor(int maxThread, int numOfStockPerIteration, long delay) {
        if(maxThread <= 0 || numOfStockPerIteration <= 0 || delay <= 0) {
            throw new IllegalArgumentException("maxThread=" + maxThread + ",numOfStockPerIteration=" + numOfStockPerIteration + ",delay=" + delay);
        }
        
        this.maxThread = maxThread;
        this.numOfStockPerIteration = numOfStockPerIteration;
        this.delay = delay;
        
        this.stockServerFactories = new java.util.concurrent.CopyOnWriteArrayList<StockServerFactory>();
        this.stockCodes = new java.util.concurrent.CopyOnWriteArrayList<Code>();
        this.stockMonitors = new java.util.ArrayList<StockMonitor>();
        
        stockCodesReadWriteLock = new java.util.concurrent.locks.ReentrantReadWriteLock();
        stockCodesReaderLock = stockCodesReadWriteLock.readLock();
        stockCodesWriterLock = stockCodesReadWriteLock.writeLock();        
    }
    
    public synchronized boolean addStockServerFactory(StockServerFactory factory) {
        return stockServerFactories.add(factory);
    }

    public synchronized StockServerFactory getStockServerFactory(int index) {
        return stockServerFactories.get(index);
    }
    
    public synchronized int getNumOfStockServerFactory() {
        return stockServerFactories.size();
    }
    
    public synchronized boolean removeStockServerFactory(StockServerFactory factory) {
        return stockServerFactories.remove(factory);
    }
    
    // synchronized, to avoid addStockCode and removeStockCode at the same time.
    // This method will start all the monitoring threads automatically.
    public synchronized boolean addStockCode(Code code) {
        // Lock isn't required here. This is because increase the size of the 
        // list is not going to get IndexOutOfBoundException.
        if(stockCodes.contains(code)) return false;

        boolean status = stockCodes.add(code);

        start();

        return status;
    }
    
    public synchronized int getNumOfStockCode() {
        return stockCodes.size();
    }

    public synchronized Code getStockCode(int index) {
        return stockCodes.get(index);
    }
    
    public synchronized boolean clearStockCodes() {
        // This is to ensure we are able to get the correct StockCodes size,
        // and able to retrieve Iterator in a safe way without getting 
        // IndexOutOfBoundException.
        stockCodesWriterLock.lock();
        stockCodes.clear();
        stockCodesWriterLock.unlock();   
        
        while(stockMonitors.size() > 0) {
            StockMonitor stockMonitor = stockMonitors.remove(stockMonitors.size() - 1);
            stockMonitor._stop();
            /*
             * Unlike stop(), no need to explicitly wait for the thread to dead. Let it dead
             * naturally. However, is it safe to do so?
             *            
            try {
                stockMonitor.join();
            }
            catch(java.lang.InterruptedException exp) {
                log.error("", exp);
            } 
             */           
        }
        
        return true;
    }
    
    // synchronized, to avoid addStockCode and removeStockCode at the same time
    public synchronized boolean removeStockCode(Code code) {
        // This is to ensure we are able to get the correct StockCodes size,
        // and able to retrieve Iterator in a safe way without getting 
        // IndexOutOfBoundException.
        stockCodesWriterLock.lock();
        boolean status = stockCodes.remove(code);
        stockCodesWriterLock.unlock();

        // Do we need to remove any old thread?
        final int numOfMonitorRequired = this.getNumOfRequiredThread();

        if(this.stockMonitors.size() > numOfMonitorRequired) {
            log.info("After removing : current thread size=" + this.stockMonitors.size() + ",numOfMonitorRequired=" + numOfMonitorRequired);

            StockMonitor stockMonitor = stockMonitors.remove(stockMonitors.size() - 1);
            stockMonitor._stop();
            
            /*
             * Unlike stop(), no need to explicitly wait for the thread to dead. Let it dead
             * naturally. However, is it safe to do so?
             * 
            try {
                stockMonitor.join();
            }
            catch(java.lang.InterruptedException exp) {
                log.error("", exp);
            }
             */
            
            log.info("After removing : current thread size=" + this.stockMonitors.size() + ",numOfMonitorRequired=" + numOfMonitorRequired);
        }

        return status;
    }    
    
    public synchronized void softStart() {
        for(StockMonitor stockMonitor : stockMonitors) {
            stockMonitor.softStart();
        }
    }

    public synchronized void softStop() {
        for(StockMonitor stockMonitor : stockMonitors) {
            stockMonitor.softStop();
        }
    }    
    
    public synchronized void start() {
        // Do we need to remove any old thread?
        final int numOfMonitorRequired = this.getNumOfRequiredThread();
        
        assert(numOfMonitorRequired <= this.maxThread);
        
        for(int i=this.stockMonitors.size(); i<numOfMonitorRequired; i++) {
            log.info("Before adding : current thread size=" + this.stockMonitors.size() + ",numOfMonitorRequired=" + numOfMonitorRequired);
            
            StockMonitor stockMonitor = new StockMonitor(i * numOfStockPerIteration);
            stockMonitors.add(stockMonitor);
            stockMonitor.start();
            
            log.info("After adding : current thread size=" + this.stockMonitors.size() + ",numOfMonitorRequired=" + numOfMonitorRequired);
        }
    }
    
    // Stop all the monitoring thread. Once this had been stopped, all the
    // previous monitoring thread will be removed.
    public synchronized void stop() {
        for(StockMonitor stockMonitor : stockMonitors) {
            stockMonitor._stop();
            
            try {
                stockMonitor.join();
            }
            catch(java.lang.InterruptedException exp) {
                log.error("", exp);
            }            
        }
        
        stockMonitors.clear();
    }
    
    public synchronized long getDelay() {
        return this.delay;
    }
    
    public synchronized void setDelay(int delay) {
        this.delay = delay;
    }
    
    private int getNumOfRequiredThread() {
        final int numOfThreadRequired = 
            (stockCodes.size() / numOfStockPerIteration) + 
            (((stockCodes.size() % numOfStockPerIteration) == 0) ? 0 : 1);
        
        return Math.min(numOfThreadRequired, maxThread);
    }
    
    private class StockMonitor extends Thread {    
        private volatile Status status = Status.Normal;
    
        public StockMonitor(int index) {
            this.index = index;
            thread = this;
        }
        
        private synchronized void softWait() throws InterruptedException {
            synchronized (this) {
                if (status == Status.Pause) {
                    while (status != Status.Resume) {
                        wait();
                    }

                    status = Status.Normal;
                }
            }
        }

        public synchronized void softStart() {
            if(status == Status.Pause) {
                status = Status.Resume;
                notify();
            }
        }

        public synchronized void softStop() {
            status = Status.Pause;
            notify();
        }    
        
        public void run() {
            final Thread thisThread = Thread.currentThread();
            
            /* Will advance by numOfStockPerIteration * maxThread */            
            final int step = numOfStockPerIteration * maxThread;
            
            while(thisThread == thread) {
                try {
                    softWait();
                }
                catch(InterruptedException exp) {
                    log.error("", exp);
                    break;
                }
                
                for(int currIndex = index; thisThread == thread; currIndex += step) {
                    ListIterator<Code> listIterator = null;

                    // Acquire iterator in a safe way.
                    stockCodesReaderLock.lock();                
                    final int stockCodesSize = stockCodes.size();
                    if(currIndex < stockCodesSize)
                        listIterator = stockCodes.listIterator(currIndex);
                    stockCodesReaderLock.unlock();

                    if(listIterator != null) {
                        List<Code> codes = new ArrayList<Code>();

                        for(int i = 0; listIterator.hasNext() && i < numOfStockPerIteration && thisThread == thread; i++) {
                            codes.add(listIterator.next());
                        }

                        for(StockServerFactory factory : stockServerFactories)
                        {                            
                            final StockServer stockServer = factory.getStockServer();
                            
                            List<Stock> stocks = null;
                            try {
                                stocks = stockServer.getStocksByCodes(codes);                                
                            }
                            catch(StockNotFoundException exp) {
                                log.error(codes, exp);
                                // Try with another server.
                                continue;
                            }
                         
                            if(thisThread != thread) {
                                break;
                            }
                            
                            // Notify all the interested parties.
                            RealTimeStockMonitor.this.notify(RealTimeStockMonitor.this, stocks);
                            
                            break;
                        }   // for                    

                    }   // if(listIterator != null)
                    else {
                        break;
                    }
                }   // for(int currIndex = index; thisThread == thread; curIndex += step)
                
                try {
                    Thread.sleep(delay);
                }
                catch(java.lang.InterruptedException exp) {
                    log.error("index=" + index, exp);
                    break;
                }
            }
        }
        
        public void _stop() {
            thread = null;
            // Wake up from sleep.
            interrupt();
        }
        
        private final int index;
        private volatile Thread thread;
    }
    
    private enum Status {
        Pause,
        Resume,
        Normal
    };
        
    // Delay in ms
    private volatile long delay;
    private final int maxThread;
    // Number of stock to be monitored per iteration.
    private final int numOfStockPerIteration;
    private java.util.List<StockServerFactory> stockServerFactories;
    private java.util.List<Code> stockCodes;
    private java.util.List<StockMonitor> stockMonitors;
    private final java.util.concurrent.locks.ReadWriteLock stockCodesReadWriteLock;
    private final java.util.concurrent.locks.Lock stockCodesReaderLock;
    private final java.util.concurrent.locks.Lock stockCodesWriterLock;
    
    private static final Log log = LogFactory.getLog(RealTimeStockMonitor.class);    
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -