bufferstreamtest.java

来自「java版ace,java程序员值得一看」· Java 代码 · 共 190 行

JAVA
190
字号
// ============================================================================//// = PACKAGE//    tests.ASX// // = FILENAME//    BufferStreamTest.java//// = AUTHOR//    Prashant Jain// // ============================================================================package JACE.tests.ASX;import java.io.*;import JACE.OS.*;import JACE.ASX.*;/** * This short program copies stdin to stdout via the use of an ASX * STREAM.  It illustrates an implementation of the classic "bounded * buffer" program using an ASX STREAM containing two Modules.  Each * Module contains two Tasks.   */public class BufferStreamTest{  static class CommonTask extends Task  {    // ACE_Task hooks    public int open (Object obj)    {      if (this.activate (0, 1, false) == -1)	ACE.ERROR ("spawn");      return 0;    }    public int close (long exitStatus)    {      ACE.DEBUG (Thread.currentThread () + " thread is exiting with status "  +		 exitStatus + " in module " + this.name () + "\n");      return 0;    }    public int put (MessageBlock mb, TimeValue tv)     {       return 0;     }    public int handleTimeout (TimeValue tv, Object obj)    {      return 0;    }  }  // Define the Producer interface.   static class Producer extends CommonTask  {    // Read data from stdin and pass to consumer.    // The Consumer reads data from the stdin stream, creates a message,    // and then queues the message in the message list, where it is    // removed by the consumer thread.  A 0-sized message is enqueued when    // there is no more data to read.  The consumer uses this as a flag to    // know when to exit.    public int svc ()    {      // Keep reading stdin, until we reach EOF.       BufferedReader in = new BufferedReader(new InputStreamReader(System.in));       String msg = null;      try	{	  while (true)	    {	      System.out.print ("Enter input: ");	      System.out.flush ();	      msg = in.readLine ();	      if (msg == null)		{		  // Send a shutdown message to the other thread and exit.		  if (this.putNext (new MessageBlock (0), null) == -1)		    ACE.ERROR ("putNext");		  break;		}	      else		{		  // Send the message to the other thread.		  if (this.putNext (new MessageBlock (msg), null) == -1)		    ACE.ERROR ("putNext");		}	    }	}      catch (IOException e)	{	}      return 0;    }  }  static class Consumer extends CommonTask				// = TITLE				//    Define the Consumer interface.   {    // Enqueue the message on the MessageQueue for subsequent    // handling in the svc() method.    public int put (MessageBlock mb, TimeValue tv)      { 	try	  {	    return this.putq (mb, tv);	  }	catch (InterruptedException e)	  {	  }	return 0;      }    // The consumer dequeues a message from the ACE_Message_Queue, writes    // the message to the stderr stream, and deletes the message.  The    // Consumer sends a 0-sized message to inform the consumer to stop    // reading and exit.    public int svc ()      {	MessageBlock mb = null;	// Keep looping, reading a message out of the queue, until we	// timeout or get a message with a length == 0, which signals us to	// quit.	try	  {	    while (true)	      {		// Wait for upto 4 seconds		mb = this.getq (TimeValue.relativeTimeOfDay (4, 0));	  		if (mb == null)		  break;	  		int length = mb.length ();	  		if (length > 0)		  System.out.println ("\n" + mb.base ());		if (length == 0)		  break;	      }	  }	catch (InterruptedException e)	  {	  }	if (mb == null)	  {	    ACE.ERROR ("timed out waiting for message");	    System.exit (1);	  }	return 0;      }  }  // Spawn off a new thread.  public static void main (String args[])  {    ACE.enableDebugging ();    // Control hierachically-related active objects    Stream stream = new Stream ();    Module pm = new Module ("Consumer", new Consumer (), null, null);    Module cm = new Module ("Producer", new Producer (), null, null);    // Create Producer and Consumer Modules and push them onto the    // STREAM.  All processing is performed in the STREAM.    if (stream.push (pm) == -1)      {	ACE.ERROR ("push");	return;      }    else if (stream.push (cm) == -1)      {	ACE.ERROR ("push");	return;      }  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?