jtthread.java

来自「Java Pattern Oriented Framework (Jt) 是为了」· Java 代码 · 共 652 行

JAVA
652
字号
package Jt;


/** 
 *  Jt objects that inherit from this class execute in a separate/independent thread: 
 *  Jt Messages are processed asynchronously using a separate thread. 
 *  A queue of messages is required to accomplish this asynchronous behavior.
 *  When a Jt message is sent to this object (sendMessage), 
 *  the message is automatically added to the message queue. 
 *  The run method (separate thread) is constantly extracting the next message in 
 *  the queue and calling processMessage() to process it.
 */


public class JtThread extends JtComposite implements Runnable {


  private static final long serialVersionUID = 1L;
  public static final String JtCLASS_NAME = JtThread.class.getName(); 
  // Object states

  public static final int JtINACTIVE_STATE = 0; // Before start
  public static final int JtACTIVE_STATE = 1;   // After start
  //public static final int JtSUSPENDED_STATE = 2;
  public static final int JtSTOPPED_STATE = 3;  // After stop event
  public static final int JtIDLE_STATE = 4;     // No messages in the queue

 
  // Messages
  
  //public static final String JtSTART = "JtSTART";
  //public static final String JtSTOP = "JtSTOP";
  
  // Events

  private static final int JtSTART_EVENT = 21; // Start the object (start processing messages)                                             
  private static final int JtSTOP_EVENT = 20;  // Stop the object
  private static final int JtEMPTY_QUEUE = 15; // Message queue is empty
  private static final int JtNEW_MESSAGE = 16; // New message in the queue

  //public static final int JtRESUME = 19; 
  //public static final int JtSUSPEND = 18;
  //public static final int JtNOEVENT = 0;

  private int state = JtINACTIVE_STATE;       // Current state
  private boolean daemon = true;              // Daemon thread (default)
  private int priority = Thread.NORM_PRIORITY; 

  private transient Thread thread = null;     // Thread
  JtQueue msgQueue = new JtQueue ();          // Message queue


  public JtThread() {
  }

/**
  * Returns the Thread instance.
  */

  public Thread getThread () {
    return (thread);
  }

/**
  * Changes the Thread instance.
  * @param thread thread
  */

  public void setThread (Thread thread) {
    this.thread = thread; // check
  }

/**
  * Returns this object's state.
  */

  public int getState () {
    return (state);
  }

/**
  * Changes this object's state.
  */

  public void setState (int state) {
    this.state = state;
  }

/**
  * Returns the thread priority.
  */

  public int getPriority () {
    if (thread != null)
      priority = thread.getPriority ();
    return (priority);
  }


/**
  * Changes the thread priority.
  */

  public void setPriority (int priority) {

    if (thread != null) {
      thread.setPriority (priority);  // check
      this.priority = thread.getPriority ();
    } else
      this.priority = priority;


  }




/**
  * Verifies if the object's thread is a daemon thread or a user thread.
  */

  public boolean getDaemon () {

   if (thread != null) {
      daemon = thread.isDaemon ();  // check
   } 
     
    return (daemon);
  }


/**
  * Marks the object's thread as either a daemon thread or a user thread.
  */


  public void setdaemon (boolean daemon) {
    
    if (thread != null) {
      try {
       thread.setDaemon (daemon);  // check
      } catch (Exception e) {
       handleException (e);
      }
      this.daemon = thread.isDaemon ();
    } else
      this.daemon = daemon;

  }

  // Activate object

  void activate () {

     thread = new Thread ((Runnable) this);

     if (thread == null) {
        handleError("JtThread.activate: unable to create new thread");
        return;
     }

     handleTrace ("JtThread.activate: ..." + thread.getName());

     try { // just in case


        if (daemon)
         thread.setDaemon (true);

        thread.start ();

        thread.setPriority (priority);  // check
     } catch (Exception e) {
        //updateState (JtINACTIVE_STATE); check
        handleException (e);
     }

  }



  // sleep_for_awhile: sleep for a period of time

  void sleep_for_awhile (long period) {

    try {
        Thread.sleep (period);
    } catch (Exception e) {
        handleException (e);
    }

  }
  

  // updateState


  synchronized private void updateState (int state)
  {
    this.state = state;
    handleTrace ("JtThread.updateState:" + stateName (state));
  }


  // State name

  private String stateName (int state) {

    switch (state) {
      case JtINACTIVE_STATE:
        return ("INACTIVE_STATE");
      case JtACTIVE_STATE:
        return ("ACTIVE_STATE");
      //case JtSUSPENDED_STATE:
        //return ("SUSPENDED_STATE");
      case JtSTOPPED_STATE:
        return ("STOPPED_STATE");
      case JtIDLE_STATE:
        return ("IDLE_STATE");
      default:
        return ("UNKNOWN");

    }

  }


  // Event name

  private String eventName (int event) {

    switch (event) {
      case JtSTOP_EVENT:
        return ("STOP");
      //case JtRESUME:
        //return ("RESUME");
      //case JtSUSPEND:
        //return ("SUSPEND");
      case JtSTART_EVENT:
        return ("START");
      case JtEMPTY_QUEUE:
        return ("EMPTY_QUEUE");
      case JtNEW_MESSAGE:
        return ("NEW_MESSAGE");
      case 0:
        return ("NOEVENT");
      default:
        return ("UNKNOWN");

    }

  }

  // Request state transition

  synchronized private void requestStateTransition (int event) {

    if (event < 0)
      return;
    
    switch (state) {
      case JtINACTIVE_STATE: 
        if (event == JtSTART_EVENT || event == JtNEW_MESSAGE) {
          updateState (JtACTIVE_STATE); // check         
          activate ();
          break;
        }
        if (event == JtSTOP_EVENT) {
          updateState (JtSTOPPED_STATE);                 
          break;        
        }
        invalidTransition (state, event);
        break;
      case JtACTIVE_STATE:
        if (event == JtEMPTY_QUEUE) {
          updateState (JtIDLE_STATE); 
          suspendThread ();
          break;                   
        }
        if (event == JtNEW_MESSAGE || event == JtSTART_EVENT) {
          break;                   
        }
        if (event == JtSTOP_EVENT) {
          updateState (JtSTOPPED_STATE);                 
          break;        
        }
        invalidTransition (state, event);
        break;
      case JtIDLE_STATE:
        if (event == JtNEW_MESSAGE) {
          updateState (JtACTIVE_STATE); 
          resumeThread ();                  
          break; 
        }
        if (event == JtSTOP_EVENT) {
          updateState (JtSTOPPED_STATE);  
          // resume the Thread 
          resumeThread ();                
          break;        
        }
        invalidTransition (state, event);
        break;
       default:
     }

  }

  // Invalid state transition

  private void invalidTransition (int state, int event) {

    handleError ("JtThread.invalidTransition: invalid state transition(state, event):"
            + stateName(state)
            + "," + eventName (event));

  }


  // Dequeue a message
  
  synchronized private Object dequeueMessage () {

    Object omsg;

    if (msgQueue == null)
      return (null); // this should never happen    

    if (msgQueue.getSize () == 0)
      return (null);

    omsg = msgQueue.processMessage (new JtMessage ("JtDEQUEUE"));

    return (omsg);
  }



  // Check empty queue

  synchronized private boolean checkEmptyQueue ()
  {

    if (msgQueue.getSize () == 0) {
      handleTrace 
       ("JtThread.checkEmptyQueue:empty queue");


      requestStateTransition (JtEMPTY_QUEUE);    
      return (true);
    } else
      return (false);    
   
  }



  // processQueue: process queue of messages

  private void processQueue ()
  {
    int i;
    Object omsg;
    int size;



    if (checkEmptyQueue ())
      return;

    //handleTrace 
    //   ("JtThread.processQueue:queue size:" + msgQueue.getSize ());

    size =  msgQueue.getSize (); 
    for (i = 0; i < size; i++)
    {
      try { // just in case


        if (state == JtSTOPPED_STATE) // Do I need to stop ?
          break;

        //processNextMessage ();
        omsg = dequeueMessage ();
        processMessage (omsg);

      } catch (Exception e) {
        handleException (e);
      }
    }
  }


  /**
   * Extracts Jt messages from the queue and processes them via
   * processMessage (). Returns when the state becomes JtSTOPPED_STATE (JtSTOP message).
   */

  public final void run () {

    while (true) {

      if (state == JtSTOPPED_STATE)
        break;
      
      //sleep_for_awhile (2000L);

      processQueue (); // Process queue of messages

    }
    handleTrace ("JtThread.run: object (thread) is stopping ..." + thread.getName());
  }

  // Thread management 

  private synchronized void suspendThread () {


    handleTrace ("JtThread.suspendThread:suspending thread " + thread.getName());
    try {
      wait ();
    } catch (Exception e) {
      handleException (e);
    }
    handleTrace ("JtThread.suspendThread:resuming ... " + thread.getName());
  }

  private synchronized void resumeThread () {

    handleTrace ("JtThread.resumeThread: ... " + thread.getName());
    notify ();
  }

  synchronized private void checkInactiveState () {

    if (state == JtINACTIVE_STATE)
     requestStateTransition (JtNEW_MESSAGE);     

  }


/** 
 *  Add a message to the queue for further processing.
 *  When a Jt message is sent to this object via sendMessage, 
 *  the message is automatically added to the message queue. 
 *  The run method (separate thread) is contantly extracting the next message in 
 *  the queue and calling processMessage() to process it.
 */


  synchronized public Object enqueueMessage (Object msg)
  {
    JtMessage imsg, tmp;
    String msgid = null;


    if (msg == null)
      return (null);
            
    imsg = (JtMessage) msg; 
    msgid = (String) imsg.getMsgId ();

    // If the current state is inactive, activate the thread since 
    // a new message has been received

    checkInactiveState ();

    // JtSTART and JtSTOP should be processed right away (don't add these to the queue)

    //if (msgid.equals (JtThread.JtSTART) || msgid.equals (JtThread.JtSTOP)) {
    if (msgid.equals (JtThread.JtSTART)) {
       processMessage (msg);
       return (msg);
    }

    // enqueue the message

    tmp = new JtMessage ("JtENQUEUE");
    tmp.setMsgContent (msg);
    msgQueue.processMessage (tmp);

    requestStateTransition (JtNEW_MESSAGE);
    return (msg); //check

  }
  /**
    * Process object messages. 
    * <ul>
    * <li> JtSTART - Starts the thread associated with this object. 
    * <li> JtSTOP - Stops the thread by resetting the state variable to JtSTOPPED_STATE.
    * This causes the run method to return.
    * <li> JtREMOVE - Performs any housekeeping needed before this object is removed.  
    * This includes stopping its execution thread. 
    * </ul>
    * @param event Jt Message    
    */


  public Object processMessage (Object event) {

   String msgid = null;
   JtMessage e = (JtMessage) event;

     if (e == null)
	return null;
            

     msgid = (String) e.getMsgId ();

     if (msgid == null)
       return null;

     handleTrace ("JtThread.processMessage:" + msgid);

     if (msgid.equals (JtThread.JtSTART)) {
       requestStateTransition (JtSTART_EVENT);
       return (null);
     }

     if (msgid.equals (JtThread.JtSTOP) || msgid.equals (JtThread.JtREMOVE)) {
       requestStateTransition (JtSTOP_EVENT);
       return (null);
     }

     if (msgid.equals (JtThread.JtTEST)) {
         System.out.println("JtThread: processing test message:" + e.getMsgContent());
         return (null);
       }
     
     return (super.processMessage(event));
     //return (null); 

/*
     handleError ("JtThread.processMessage: invalid message id:" + msgid);
     return (null);
*/

  }



  static private char waitForInputKey () {
    char c = ' ';

      try {

      c = (char) System.in.read ();
      while (System.in.available () > 0)
        System.in.read ();

      } catch (Exception e) {
        e.printStackTrace ();
      }

      return (c);
  }

  static private char readInputKey () {
    char c = ' ';

      try {

      if (System.in.available () <= 0) {
        return (' ');
      }

      c = (char) System.in.read ();
      while (System.in.available () > 0)
        System.in.read ();

      } catch (Exception e) {
        e.printStackTrace ();
      }

      return (c);
  }



  /**
    * Demonstrates the messages processed by JtThread and illustrates its use.
    */

  public static void main(String[] args) {

    JtThread thread;
    JtObject main;
    char c = 0;
    int flag = 0;
    int i;
    JtMessage msg;

    main = new JtFactory ();
    thread = (JtThread) main.createObject (JtThread.JtCLASS_NAME, "thread");
    main.setValue (thread, "daemon", "true");
    //main.setValue (thread, "priority", "" + Thread.MIN_PRIORITY);


    System.out.print ("Press any Key to Start the object (thread) ...");

    waitForInputKey ();

    main.sendMessage ("thread", new JtMessage (JtThread.JtSTART));

    System.out.println ("Press X to stop the object. ");
    System.out.println ("Press any other key to start/stop sending messages to the object ...");

    
    c = waitForInputKey ();
    if (c != 'X') {
    flag = 1;
    for (i = 1; ;) {

      if (flag == 1) {
 
        // Send a test message to the object
        msg = new JtMessage (JtObject.JtTEST);
        msg.setMsgContent(new Integer (i));
        main.sendMessage (thread, msg);
        System.out.println (i);
        i++;

      } else
        c = waitForInputKey ();

      if (flag == 1)
        c = readInputKey ( );
      if (c != ' ') {
        flag = 1 - flag; // Start/Stop sending messages
      }

      // Stop the object
      if (c == 'X') {
        break;

      }

    }
    }

     // Stop the object

     main.sendMessage (thread, new JtMessage (JtThread.JtSTOP));

     System.out.println ("Press any key to exit");
     waitForInputKey ();

  }

}


⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?