📄 broker.java
字号:
package com.ca.directory.jxplorer.broker;
import com.ca.directory.jxplorer.*;
import com.ca.commons.naming.DN;
import com.ca.commons.naming.DXEntry;
import com.ca.commons.naming.DXNamingEnumeration;
import com.ca.commons.jndi.SchemaOps;
import javax.naming.directory.DirContext;
import javax.naming.NamingException;
import java.util.Vector;
import java.util.ArrayList;
public abstract class Broker implements Runnable, DataSource
{
protected Vector requestQueue = new Vector(10); // the core list of outstanding queries.
// also the object that everything synchronizes on...
protected Vector listeners = new Vector();
private static int noBrokers = 0; // for debugging, assign a unique sequential ID to each object created
public int id;
protected DataQuery current = null;
StopMonitor stopMonitor = null;
public Broker() { id = (noBrokers++); }
private static boolean debug = false;
/**
* Registers a stop monitor that is used by the gui to allow user
* cancellation of in-progress broker actions.
*/
public void registerStopMonitor(StopMonitor monitor) { stopMonitor = monitor; }
/**
* Adds a DataQuery to the request Queue.
* Primarily used by the DataSource methods to
* register requests.
*/
public DataQuery push(DataQuery request)
{
for (int i=0; i<listeners.size(); i++)
request.addDataListener((DataListener)listeners.get(i));
synchronized(requestQueue)
{
requestQueue.add(request);
}
if (stopMonitor != null) stopMonitor.updateWatchers();
synchronized(requestQueue)
{
requestQueue.notifyAll();
}
return request; // returns the same request for easy chaining.
}
/**
* Gets the next DataQuery from the request Queue,
* removing it from the queue as it does so.
*/
public DataQuery pop()
{
DataQuery request = null;
synchronized(requestQueue)
{
if (requestQueue.isEmpty()) return null;
request = (DataQuery)requestQueue.firstElement();
requestQueue.removeElementAt(0);
request.setRunning(); // set the running flag (for use by StopMonitor)
}
return request;
}
/**
* Removes a particular query from the pending query list...
*/
public void removeQuery(DataQuery query)
{
synchronized(requestQueue)
{
requestQueue.remove(query);
}
}
/**
* Returns whether there are more DataQuerys pending.
*/
public boolean hasRequests()
{
synchronized(requestQueue)
{
return !requestQueue.isEmpty();
}
}
/**
* Wait until notified that something (presumably
* an addition to the queue) has occured. When woken,
* process all queue requests, and then go back to
* waiting.
*/
public void run()
{
while (true)
{
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " processing Queue of length: " + requestQueue.size() + " in broker " + id);
if (processQueue()==false)
{
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " ending." + requestQueue.size());
return;
}
try
{
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " waiting in run() loop");
synchronized(requestQueue)
{
requestQueue.wait();
}
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " notified in run() loop");
}
catch (Exception e)
{
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " interrupted in run() loop \n " + e);
}
}
}
/**
* process all queue requests
*/
protected boolean processQueue()
{
while (hasRequests())
{
current = pop(); // keep track of the current query for reporting
if (current == null) return true; // sanity check: fantastically small (?) chance of thread magic causing a null return?
processRequest(current);
if (stopMonitor != null) stopMonitor.updateWatchers();
if (current != null && current.isCancelled()) // if the request was cancelled by the user, then
{ // it had probably hung, and another broker thread
return false; // will have been started - so kill this thread off.
}
else
{
current = null;
}
}
return true; // completed queue without any cancellations
}
/**
* Process a specific request.
*/
protected void processRequest(DataQuery request)
{
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " process request " + request.id );
if (request.isCancelled() == true)
{
request.finish();
return;
}
try
{
if (!isActive())
request.setException(new Exception("No Data Connection Enabled"));
if (debug) System.out.println("Thread: " + Thread.currentThread().getName() + " process request " + request.id + " of type " + request.getTypeString());
switch(request.getType())
{
case DataQuery.EXISTS: doExistsQuery(request); break;
case DataQuery.READENTRY: doEntryQuery(request); break;
case DataQuery.LIST: doListQuery(request); break;
case DataQuery.SEARCH: doSearchQuery(request); break;
case DataQuery.MODIFY: doModifyQuery(request); break;
case DataQuery.COPY: doCopyQuery(request); break;
// case DataQuery.GETALLOC: doGetAllOCsQuery(request); break;
case DataQuery.GETRECOC: doGetRecOCsQuery(request); break;
case DataQuery.EXTENDED: doExtendedQuery(request); break;
case DataQuery.UNKNOWN:
default: throw new NamingException("JX Internal Error: Unknown Data Broker Request type: " + request.getType());
}
}
catch (Exception e)
{
request.setException(e);
}
// request *should* already be finished by this stage, but just in case...
request.finish();
}
//
// DATA SOURCE INTERFACE
//
// (Constructs appropriate DataQuerys and queues them)
public DataQuery getChildren(DN nodeDN)
{
return push(new DataQuery(DataQuery.LIST, nodeDN));
}
public DataQuery getEntry(DN nodeDN)
{
return push(new DataQuery(DataQuery.READENTRY, nodeDN));
}
public DataQuery exists(DN nodeDN)
{
return push(new DataQuery(DataQuery.EXISTS, nodeDN));
}
/*
public DataQuery getObjectClasses()
{
return push(new DataQuery(DataQuery.GETALLOC));
}
*/
public DataQuery getRecommendedObjectClasses(DN dn)
{
return push(new DataQuery(DataQuery.GETRECOC, dn));
}
public DataQuery modifyEntry(DXEntry oldEntry, DXEntry newEntry)
{
return push(new DataQuery(DataQuery.MODIFY, oldEntry, newEntry));
}
public DataQuery copyTree(DN oldNodeDN, DN newNodeDN)
{
return push(new DataQuery(DataQuery.COPY, oldNodeDN, newNodeDN));
}
public DataQuery search(DN nodeDN, String filter, int searchLevel, String[] returnAttributes)
{
return push(new DataQuery(DataQuery.SEARCH, nodeDN, filter, searchLevel, returnAttributes));
}
public DataQuery extendedRequest(DataQuery query)
{
return push(query);
}
/**
* Adds a data listener to every DataQuery generated by this broker.
* For threading simplicity, no data listeners are added to DataQuerys
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -