📄 workqueueimpl.java
字号:
/* * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.corba.se.impl.orbutil.threadpool;import java.util.LinkedList;import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;import com.sun.corba.se.spi.orbutil.threadpool.Work;import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;import com.sun.corba.se.impl.orbutil.ORBConstants;import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl;import com.sun.corba.se.spi.monitoring.MonitoringConstants;import com.sun.corba.se.spi.monitoring.MonitoringFactories;import com.sun.corba.se.spi.monitoring.MonitoredObject;import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;public class WorkQueueImpl implements WorkQueue{ private ThreadPool workerThreadPool; private LinkedList theWorkQueue = new LinkedList(); private long workItemsAdded = 0; // Initialized to 1 to avoid divide by zero in averageTimeInQueue() private long workItemsDequeued = 1; private long totalTimeInQueue = 0; // Name of the work queue private String name; // MonitoredObject for work queue private MonitoredObject workqueueMonitoredObject; public WorkQueueImpl() { name=ORBConstants.WORKQUEUE_DEFAULT_NAME; initializeMonitoring(); } public WorkQueueImpl(ThreadPool workerThreadPool) { this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME); } public WorkQueueImpl(ThreadPool workerThreadPool, String name) { this.workerThreadPool = workerThreadPool; this.name = name; initializeMonitoring(); } // Setup monitoring for this workqueue private void initializeMonitoring() { workqueueMonitoredObject = MonitoringFactories. getMonitoredObjectFactory(). createMonitoredObject(name, MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION); LongMonitoredAttributeBase b1 = new LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED, MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) { public Object getValue() { return new Long(WorkQueueImpl.this.totalWorkItemsAdded()); } }; workqueueMonitoredObject.addAttribute(b1); LongMonitoredAttributeBase b2 = new LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE, MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) { public Object getValue() { return new Long(WorkQueueImpl.this.workItemsInQueue()); } }; workqueueMonitoredObject.addAttribute(b2); LongMonitoredAttributeBase b3 = new LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE, MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) { public Object getValue() { return new Long(WorkQueueImpl.this.averageTimeInQueue()); } }; workqueueMonitoredObject.addAttribute(b3); } // Package private method to get the monitored object for this // class MonitoredObject getMonitoredObject() { return workqueueMonitoredObject; } public void addWork(Work work) { synchronized (this) { workItemsAdded++; work.setEnqueueTime(System.currentTimeMillis()); theWorkQueue.addLast(work); ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this); } } Work requestWork(long waitTime) throws TimeoutException, InterruptedException { Work workItem; synchronized (this) { if (theWorkQueue.size() != 0) { workItem = (Work)theWorkQueue.removeFirst(); totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); workItemsDequeued++; return workItem; } try { long remainingWaitTime = waitTime; long finishTime = System.currentTimeMillis() + waitTime; do { this.wait(remainingWaitTime); if (theWorkQueue.size() != 0) { workItem = (Work)theWorkQueue.removeFirst(); totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); workItemsDequeued++; return workItem; } remainingWaitTime = finishTime - System.currentTimeMillis(); } while (remainingWaitTime > 0); throw new TimeoutException(); } catch (InterruptedException ie) { throw ie; } } } public void setThreadPool(ThreadPool workerThreadPool) { this.workerThreadPool = workerThreadPool; } public ThreadPool getThreadPool() { return workerThreadPool; } /** * Returns the total number of Work items added to the Queue. * This method is unsynchronized and only gives a snapshot of the * state when it is called */ public long totalWorkItemsAdded() { return workItemsAdded; } /** * Returns the total number of Work items in the Queue to be processed * This method is unsynchronized and only gives a snapshot of the * state when it is called */ public int workItemsInQueue() { return theWorkQueue.size(); } public synchronized long averageTimeInQueue() { return (totalTimeInQueue/workItemsDequeued); } public String getName() { return name; }}// End of file.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -