📄 parallel.java
字号:
if (t instanceof BuildException && firstLocation == Location.UNKNOWN_LOCATION) { firstLocation = ((BuildException) t).getLocation(); } exceptionMessage.append(StringUtils.LINE_SEP); exceptionMessage.append(t.getMessage()); } } } /** * Spin up required threads with a maximum number active at any given time. * * @exception BuildException if any of the threads failed. */ private void spinThreads() throws BuildException { final int numTasks = nestedTasks.size(); TaskRunnable[] runnables = new TaskRunnable[numTasks]; stillRunning = true; timedOut = false; int threadNumber = 0; for (Enumeration e = nestedTasks.elements(); e.hasMoreElements(); threadNumber++) { Task nestedTask = (Task) e.nextElement(); runnables[threadNumber] = new TaskRunnable(nestedTask); } final int maxRunning = numTasks < numThreads ? numTasks : numThreads; TaskRunnable[] running = new TaskRunnable[maxRunning]; threadNumber = 0; ThreadGroup group = new ThreadGroup("parallel"); TaskRunnable[] daemons = null; if (daemonTasks != null && daemonTasks.tasks.size() != 0) { daemons = new TaskRunnable[daemonTasks.tasks.size()]; } synchronized (semaphore) { // When we leave this block we can be sure all data is really // stored in main memory before the new threads start, the new // threads will for sure load the data from main memory. // // This probably is slightly paranoid. } synchronized (semaphore) { // start any daemon threads if (daemons != null) { for (int i = 0; i < daemons.length; ++i) { daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i)); Thread daemonThread = new Thread(group, daemons[i]); daemonThread.setDaemon(true); daemonThread.start(); } } // now run main threads in limited numbers... // start initial batch of threads for (int i = 0; i < maxRunning; ++i) { running[i] = runnables[threadNumber++]; Thread thread = new Thread(group, running[i]); thread.start(); } if (timeout != 0) { // start the timeout thread Thread timeoutThread = new Thread() { public synchronized void run() { try { wait(timeout); synchronized (semaphore) { stillRunning = false; timedOut = true; semaphore.notifyAll(); } } catch (InterruptedException e) { // ignore } } }; timeoutThread.start(); } // now find available running slots for the remaining threads outer: while (threadNumber < numTasks && stillRunning) { for (int i = 0; i < maxRunning; i++) { if (running[i] == null || running[i].isFinished()) { running[i] = runnables[threadNumber++]; Thread thread = new Thread(group, running[i]); thread.start(); // continue on outer while loop to get another // available slot continue outer; } } // if we got here all slots in use, so sleep until // something happens try { semaphore.wait(); } catch (InterruptedException ie) { // doesn't java know interruptions are rude? // just pretend it didn't happen and go about out business. // sheesh! } } // are all threads finished outer2: while (stillRunning) { for (int i = 0; i < maxRunning; ++i) { if (running[i] != null && !running[i].isFinished()) { //System.out.println("Thread " + i + " is still alive "); // still running - wait for it try { semaphore.wait(); } catch (InterruptedException ie) { // who would interrupt me at a time like this? } continue outer2; } } stillRunning = false; } } if (timedOut) { throw new BuildException("Parallel execution timed out"); } // now did any of the threads throw an exception exceptionMessage = new StringBuffer(); numExceptions = 0; firstException = null; firstLocation = Location.UNKNOWN_LOCATION; processExceptions(daemons); processExceptions(runnables); if (numExceptions == 1) { if (firstException instanceof BuildException) { throw (BuildException) firstException; } else { throw new BuildException(firstException); } } else if (numExceptions > 1) { throw new BuildException(exceptionMessage.toString(), firstLocation); } } /** * Determine the number of processors. Only effective on later VMs * * @return the number of processors available or 0 if not determinable. */ private int getNumProcessors() { try { Class[] paramTypes = {}; Method availableProcessors = Runtime.class.getMethod("availableProcessors", paramTypes); Object[] args = {}; Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args); return ret.intValue(); } catch (Exception e) { // return a bogus number return 0; } } /** * thread that execs a task */ private class TaskRunnable implements Runnable { private Throwable exception; private Task task; private boolean finished; /** * Construct a new TaskRunnable.<p> * * @param task the Task to be executed in a separate thread */ TaskRunnable(Task task) { this.task = task; } /** * Executes the task within a thread and takes care about * Exceptions raised within the task. */ public void run() { try { task.perform(); } catch (Throwable t) { exception = t; if (failOnAny) { stillRunning = false; } } finally { synchronized (semaphore) { finished = true; semaphore.notifyAll(); } } } /** * get any exception that got thrown during execution; * @return an exception or null for no exception/not yet finished */ public Throwable getException() { return exception; } /** * Provides the indicator that the task has been finished. * @return Returns true when the task is finished. */ boolean isFinished() { return finished; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -