threadpooltest.java

来自「java版ace,java程序员值得一看」· Java 代码 · 共 188 行

JAVA
188
字号
// ============================================================================//// = PACKAGE//    tests.ASX// // = FILENAME//    ThreadPoolTest.java//// = AUTHOR//    Prashant Jain// // ============================================================================package JACE.tests.ASX;import java.io.*;import JACE.OS.*;import JACE.ASX.*;import JACE.Reactor.*;public class ThreadPoolTest extends Task{  int nThreads_;  int nIterations_;    public static int MAX_MB_SIZE = 1024;  public ThreadPoolTest (int nThreads, int nIterations)    {      this.nIterations_ = nIterations;      this.nThreads_ = nThreads;      if (this.activate (0, nThreads, true) == -1)	ACE.ERROR ("activate failed");    }  public int handleTimeout (TimeValue tv, Object obj)  {    return 0;  }  public int open (Object obj)    {      return 0;    }  public int close (long flags)    {      return 0;    }  public int put (MessageBlock mb, TimeValue tv)    {      try	{	  return this.putq (mb, tv);	}      catch (InterruptedException e)	{	}      return 0;    }    public int svc ()    {      int count = 1;      // Keep looping, reading a message out of the queue, until we get a      // message with a length == 0, which signals us to quit.      try	{	  for (;; count++)	    {       	      MessageBlock mb = this.getq (null);	      if (mb == null)		{		  ACE.ERROR (Thread.currentThread ().toString () + " in iteration " + count + ", got result -1, exiting");		  break;		}	      int length = mb.length ();	      	      if (length > 0)		ACE.DEBUG (Thread.currentThread ().toString () +			   " in iteration " + count + ", length = " + 			   length + ", text = \"" + mb.base () + "\"");	      	      if (length == 0)		{		  ACE.DEBUG (Thread.currentThread ().toString () +			     " in iteration " + count + 			     ", got NULL message, exiting");		  break;		}	      Thread.yield ();	    }	}      catch (InterruptedException e)	{	}      return 0;    }  public static void produce (ThreadPoolTest threadPool, int nIterations)    {      int count = 0;      for (int n = 0;;)	{	  // Allocate a new message.	  MessageBlock mb = new MessageBlock (new Integer (count).toString ());	  	  if (count == nIterations)	    n = 1; // Indicate that we need to shut down.	  else	    count++;	  if (count == 0 || (count % 20 == 0))	    {	      try		{		  Thread.sleep (1);		}	      catch (InterruptedException e)		{		}	    }	  if (n != 1)	    {	      ACE.DEBUG ("Producing...");	      // Pass the message to the Thread_Pool.	      if (threadPool.put (mb, null) == -1)		ACE.ERROR ("put");	    }	  else	    {	      // Send a shutdown message to the waiting threads and exit.	      ACE.DEBUG ("start loop, dump of task");	      for (int i = threadPool.thrCount (); i > 0; i--)		{		  ACE.DEBUG (Thread.currentThread ().toString () + 			     "EOF, enqueueing NULL block for thread " + i);		  // Enqueue a NULL message to flag each consumer to		  // shutdown.		  if (threadPool.put (new MessageBlock (0), null) == -1)		    ACE.ERROR ("put");		}	      break;	    }	}    }  public static void main (String args[])    {      int nThreads = 1;      int nIterations = 100;      ACE.enableDebugging ();      try	{	  if (args.length == 2)	    {	      nThreads = Integer.parseInt (args[0]);	      nIterations = Integer.parseInt (args[1]);	    }	  else if (args.length == 1)	    {	      nThreads = Integer.parseInt (args[0]);	    }	}      catch (NumberFormatException e)	{	  ACE.ERROR ("Illegal argument.");	}      ACE.DEBUG ("Threads = " + nThreads + " Iterations = " + nIterations);      // Create the worker tasks.      ThreadPoolTest threadPool = new ThreadPoolTest (nThreads,						      nIterations);      // Create work for the worker tasks to process in their own threads.      produce (threadPool, nIterations);      ACE.DEBUG ("exiting...");    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?