📄 queue.java
字号:
* @param putStrategy Queue put strategy: if null Tail strategy is used.
* @param myReg the blocking region to which the owner node of this queue belongs
*/
public Queue(JobInfoList preLoad, QueueGetStrategy getStrategy,
QueuePutStrategy putStrategy[], BlockingRegion myReg) {
//uses constructor for generic queue
this(preLoad, getStrategy, putStrategy);
//sets blocking region properties
redirectionON = true;
myRegion = myReg;
regionInputStation = myRegion.getInputStation();
}
/** Creates a new instance of finite redirecting Queue.
* @param preLoad Queue preload: if null no preload id done.
* @param size Queue size.
* @param getStrategy Queue get strategy: if null FCFS strategy is used.
* @param putStrategy Queue put strategy: if null Tail strategy is used.
* @param drop True if the queue should rejects new jobs when it's full,
* false otherwise.
* @param myReg the blocking region to which the owner node of this queue belongs
*/
public Queue(int size, boolean drop, JobInfoList preLoad, QueueGetStrategy
getStrategy, QueuePutStrategy putStrategy[], BlockingRegion myReg) {
//uses constructor for generic queue
this(size, drop, preLoad, getStrategy, putStrategy);
//sets blocking region properties
redirectionON = true;
myRegion = myReg;
regionInputStation = myRegion.getInputStation();
}
/** Creates a new instance of finite redirecting Queue.
* @param preLoad Queue preload: if null no preload id done.
* @param size Queue size.
* @param getStrategy Queue get strategy: if null FCFS strategy is used.
* @param putStrategy Queue put strategy: if null Tail strategy is used.
* @param drop True if the queue should rejects new jobs when it's full,
* false otherwise.
* @param myReg the blocking region to which the owner node of this queue belongs
*/
public Queue(Integer size, Boolean drop, JobInfoList preLoad, QueueGetStrategy
getStrategy, QueuePutStrategy putStrategy[], BlockingRegion myReg) {
this(size.intValue(), drop.booleanValue(), preLoad, getStrategy, putStrategy, myReg);
}
/**
* Creates a new instance of finite Queue. This is the newwst constructor that supports
* differend drop strategies. Other constructors are left for compatibility.
* @param size Queue size (-1 = infinite queue).
* @param getStrategy Queue get strategy: if null FCFS strategy is used.
* @param putStrategy Queue put strategy: if null Tail strategy is used.
* @param dropStrategies
*/
public Queue(Integer size, String[] dropStrategies, QueueGetStrategy getStrategy,
QueuePutStrategy putStrategy[]) {
this(size.intValue(), false, getStrategy, putStrategy );
// Decodes drop strategies
for (int i=0; i<dropStrategies.length; i++) {
if (dropStrategies[i].equals(FINITE_DROP)) {
drop[i] = true;
block[i] = false;
}
else if (dropStrategies[i].equals(FINITE_BLOCK)) {
drop[i] = false;
block[i] = true;
}
else if (dropStrategies[i].equals(FINITE_WAITING)) {
drop[i] = false;
block[i] = false;
}
}
}
/**
* Turns on the "redirecting queue" behaviour.
* @param region the blocking region to which the owner node
* of this queue belongs
*/
public void redirectionTurnON(BlockingRegion region) {
//sets blocking region properties
redirectionON = true;
myRegion = region;
regionInputStation = myRegion.getInputStation();
}
/**
* Turns off the "redirecting queue" behaviour.
*/
public void redirectionTurnOFF() {
//sets blocking region properties
redirectionON = false;
myRegion = null;
regionInputStation = null;
}
/**
* Tells whether the "redirecting queue" behaviour has been turned on.
* @return true, if the "redirecting queue" behaviour is on; false otherwise.
*/
public boolean isRedirectionON() {
return redirectionON;
}
//end NEW
public boolean isEnabled(int id) throws jmt.common.exception.NetException {
switch (id) {
case PROPERTY_ID_INFINITE:
return infinite;
default:
return super.isEnabled(id);
}
}
public int getIntSectionProperty(int id) throws jmt.common.exception.NetException {
switch (id) {
case PROPERTY_ID_SIZE:
return size;
case PROPERTY_ID_WAITING_REQUESTS:
return waitingRequests.size();
//NEW
//@author Stefano Omini
case PROPERTY_ID_DROPPED_JOBS:
return droppedJobs;
//end NEW
default:
return super.getIntSectionProperty(id);
}
}
public int getIntSectionProperty(int id, JobClass jobClass) throws jmt.common.exception.NetException {
switch (id) {
case PROPERTY_ID_WAITING_REQUESTS:
return waitingRequests.size(jobClass);
//NEW
//@author Stefano Omini
case PROPERTY_ID_DROPPED_JOBS:
return droppedJobsPerClass[jobClass.getId()];
//end NEW
default:
return super.getIntSectionProperty(id, jobClass);
}
}
public Object getObject(int id) throws jmt.common.exception.NetException {
switch (id) {
case PROPERTY_ID_GET_STRATEGY:
return getStrategy;
case PROPERTY_ID_PUT_STRATEGY:
return putStrategy;
default:
return super.getObject(id);
}
}
public boolean hasInfiniteQueue() {
return infinite;
}
protected void nodeLinked(NetNode node) {
// Sets netnode dependent properties
waitingRequests = new JobInfoList(getJobClasses().size(), true);
if (putStrategy == null) {
putStrategy = new QueuePutStrategy[getJobClasses().size()];
for (int i = 0; i < getJobClasses().size(); i++)
putStrategy[i] = new TailStrategy();
}
if (jobsList == null)
jobsList = new JobInfoList(getJobClasses().size(), true);
if (!infinite) {
droppedJobs = 0;
droppedJobsPerClass = new int[getJobClasses().size()];
for (int i = 0; i < droppedJobsPerClass.length; i++) {
droppedJobsPerClass[i] = 0;
}
}
//retrieves the job info list of the owner node
nodeJobsList = getOwnerNode().getJobInfoList();
}
/** This method implements a generic finite/infinite queue
* @param message message to be processed.
* @throws jmt.common.exception.NetException
*/
protected int process(NetMessage message) throws jmt.common.exception.NetException {
Job job;
switch (message.getEvent()) {
case NetEvent.EVENT_START:
//EVENT_START
//If there are jobs in queue, the first (chosen using the specified
//get strategy) is forwarded and coolStart becomes false.
if (jobsList.size() > 0) {
//the first job is forwarded to service section
forward(getStrategy.get(jobsList));
coolStart = false;
}
break;
case NetEvent.EVENT_ACK:
//EVENT_ACK
//If there are waiting requests, the first is taken (if the source node of this request
//is the owner node of this section, an ack message is sent).
//The job contained is put into the queue using the specified put strategy.
//
//At this point, if there are jobs in queue, the first is taken (using the
//specified get strategy) and forwarded. Otherwise, if there are no jobs, coolStart
//is set true.
// if there is a waiting request send ack to the first node
//(note that with infinite queue there are no waitinq requests)
if (waitingRequests.size() != 0) {
WaitingRequest wr;
wr = (WaitingRequest) waitingRequests.removeFirst();
// If the source is not the owner node sends ack if blocking is enabled. Otherwise
// ack was already sent.
if (!isMyOwnerNode(wr.getNode()) && block[wr.getJob().getJobClass().getId()]) {
send(NetEvent.EVENT_ACK, wr.getJob(), 0.0, wr.getSection(), wr.getNode());
}
//the class ID of this job
int c = wr.getJob().getJobClass().getId();
//the job is put into the queue according to its own class put strategy
putStrategy[c].put(wr.getJob(), jobsList,
message.getSourceSection(),
message.getSource(),
this);
}
// if there is at least one job, sends it
if (jobsList.size() > 0) {
// Gets job using a specific strategy and sends job
Job jobSent = getStrategy.get(jobsList);
forward(jobSent);
} else {
// else set coolStart to true
coolStart = true;
}
break;
case NetEvent.EVENT_JOB:
//EVENT_JOB
//If the queue is a redirecting queue, jobs arriving from the outside of
//the blocking region must be redirected to the region input station
//
//Otherwise the job is processed as usual.
//
//If coolStart is true, the queue is empty, so the job is added to the job list
//and immediately forwarded to the next section. An ack is sent and coolStart is
//set to false.
//
//If the queue is not empty, it should be distinguished between finite/infinite queue.
//
//If the queue is finite, checks the size: if it's not full the job is put into the
//queue and an ack is sent. Else, if it's full, checks the owner node: if the
//source node is the owner node of this section, an ack is sent and a waiting
//request is created. If the source is another node the waiting request is created
//only if drop is false, otherwise an ack is sent but the job is rejected.
//
//If the queue is infinite, the job is put into the queue and an ack is sent
job = message.getJob();
//----REDIRECTION BEHAVIOUR----------//
if (isRedirectionON()) {
NetNode source = message.getSource();
boolean fromTheInside = myRegion.belongsToRegion(source);
//the first time input station isn't known yet
if (regionInputStation == null) {
regionInputStation = myRegion.getInputStation();
}
if (!fromTheInside) {
//this message has arrived from the outside of the blocking region
if ((source != regionInputStation)) {
//the external source is not the input station
//the message must be redirected to the input station,
//without processing it
//redirects the message to the inputStation
redirect(NetEvent.EVENT_JOB, job, 0.0, NodeSection.INPUT, regionInputStation);
//send a ack to the source
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(),
message.getSource());
return MSG_PROCESSED;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -