📄 requestreader.java
字号:
/**
* Copyright (C) 2003 Manfred Andres
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package freecs.core;
import freecs.*;
import freecs.interfaces.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class RequestReader extends Thread {
public static final short WAITING = 0,
EVAL_GET_MESSAGES_APND2WRITE = 1,
EVAL_GET_MESSAGES_SND_MSGS=2,
EVAL_GET_MESSAGES=3,
EVAL_GET_STATE=4,
EVAL_GET=5,
EVAL_POST=6,
EVAL_POST_LOGIN=7,
EVAL_PREP4SEND=8,
EVAL_SEND=9,
EVAL_SENDFINAL=10,
EVALUATE_COMMAND=11,
EVALUATING=12,
PARSE_MSG=13,
READING=14,
EVAL_POST_LOGIN_RESULT=15,
TRYLOGIN=16,
TRYLOGIN_AUTHENTICATE=17,
TRYLOGIN_CHECK_FRIENDS=18,
TRYLOGIN_CHECK4PRESENCE=19,
TRYLOGIN_CORRECT_PERMISSION=20,
TRYLOGIN_SCHEDULE_FRIENDMSGS=21,
TRYLOGIN_SCHEDULE_VIPMSG=22,
TRYLOGIN_SEND_LOGINMSG=23,
TRYLOGIN_SET_GROUP=24,
TRYLOGIN_SET_PERMISSION=25;
private static Vector reqReaders = new Vector ();
private static short readerID = 0;
private static short failruns=0;
private static boolean flip = false;
private long shutdowntime;
private short ID;
private ByteBuffer buf;
private RequestEvaluator evaluator;
public RequestQueue reqQueue;
public boolean isFixed, working;
public long workstart;
public short currPosition;
private RequestReader(short id) {
this.ID = id;
reqQueue = new RequestQueue (this);
}
public static boolean processRequest (SelectionKey sk) {
// this is the work-to-thread-algorithm
// threads at the beginning have to get more requests, than threads at the
// end of the thread-list, to enable the threads at the end of the thread-list
// to quit working on lower usage. This is done by considering the queue usage and
// the list-index.
// factor = RequestQueue.size() + idx*(READER_MAX_QUEUE / MAX_READERS)
float min = Server.srv.READER_MAX_QUEUE;
float incr = ((float) Server.srv.READER_MAX_QUEUE) / Server.srv.MAX_READERS;
int rrSizeBorder = (int) (reqReaders.size()/1.5);
RequestReader minReader = null;
for (int i = 0; i < reqReaders.size(); i++) {
RequestReader r = (RequestReader) reqReaders.elementAt(i);
int rqSize = r.reqQueue.size();
if (i < rrSizeBorder && rqSize==0) {
minReader=r;
break;
}
float factor = ((float) rqSize) + i * incr;
if (factor < min && !r.isSuspending()) {
min = factor;
minReader=r;
}
}
if (minReader == null) {
minReader = RequestReader.startRequestReader(false);
}
if (minReader == null) {
// if no minReader may be started and every reader's factor is too high
// we will have to loop over all threads and get the lowest requestqueue-size
// to deliver this request
int lowestQueue = Server.srv.READER_MAX_QUEUE;
for (int i = 0; i< reqReaders.size(); i++) {
RequestReader r = (RequestReader) reqReaders.elementAt(i);
if (r.reqQueue.size() < lowestQueue)
minReader = r;
}
if (minReader==null)
return false;
}
minReader.reqQueue.addKey(sk);
return true;
}
private void restart() {
Server.log ("trying to restart thread that was dead", Server.MSG_STATE, Server.LVL_MAJOR);
this.start();
}
public static boolean[] getAliveState () {
boolean[] res = new boolean[reqReaders.size()];
for (int i = 0; i<res.length; i++) {
RequestReader r = (RequestReader) reqReaders.elementAt(i);
res[i] = r.isAlive();
if (!res[i])
r.restart();
}
return res;
}
public static long[][] getWorkingSince () {
long[][] res = new long[reqReaders.size()][2];
for (int i = 0; i<res.length; i++) {
RequestReader r = (RequestReader) reqReaders.elementAt(i);
if (r.working)
res[i][0] = r.workstart;
else
res[i][0] = 0;
res[i][1]=r.currPosition;
}
return res;
}
public static double[] getOveralUsage () {
double[] res = new double[reqReaders.size()];
for (int i = 0; i < res.length; i++) {
RequestReader r = (RequestReader) reqReaders.elementAt(i);
res[i] = r.reqQueue.getUsage();
}
return res;
}
/**
* starts a new requestreader-thread and possibly makes it as
* a fixed thread. A fixed thread will only suspend if the server
* shuts down. If the maximum number of configured RequestReader-threads
* is reached, null will be returned.
* @param fixed markes it as fixed thread if true
* @return the RequestReader
*/
public static RequestReader startRequestReader (boolean fixed) {
if (activeReaders () >= Server.srv.MAX_READERS)
return null;
short cid = readerID++;
RequestReader reqReader = new RequestReader (cid);
reqReader.isFixed = fixed;
if (readerID == Short.MAX_VALUE)
readerID = Short.MIN_VALUE;
reqReader.start ();
reqReaders.add (reqReader);
if (fixed) {
reqReader.setName ("FIXED-RequestReader " + cid);
reqReader.setPriority (Thread.MAX_PRIORITY-1);
StringBuffer tsb = new StringBuffer ("Thread START: (").append (reqReaders.size ()).append (" threads running FIXED THREAD)");
Server.log (tsb.toString (), Server.MSG_STATE, Server.LVL_MAJOR);
} else {
reqReader.setName ("RequestReader " + cid);
reqReader.setPriority (Thread.MAX_PRIORITY-1);
StringBuffer tsb = new StringBuffer ("Thread START: (").append (reqReaders.size ()).append (" threads running)");
Server.log (tsb.toString (), Server.MSG_STATE, Server.LVL_MINOR);
}
return reqReader;
}
/**
* removes a requestreader from the requestreader-list
* @param reqReader the requestreader to remove
*/
public static void removeRequestReader (RequestReader reqReader) {
reqReaders.remove (reqReader);
StringBuffer tsb= new StringBuffer ("Thread STOP: (").append (reqReaders.size ()).append (" threads running)");
Server.log (tsb.toString (), Server.MSG_STATE, Server.LVL_MINOR);
}
/**
* returns the number of requestreaders in the requestreader-list
* @return number of requestreaders
*/
public static int activeReaders () {
return reqReaders.size ();
}
/**
* the work of a requestreader is done her
* .) check if the request-queue has something to read
* .) if there is nothing and the time between the last read
* and now is higher than Server.READER_MAX_IDLETIME and the
* thread is not marked as fixed, the RequestReader will suspend
* .) if there is something to read it will be pricessed
* .) if the processed request is complete, it will be evaluated
*/
public void run() {
buf = ByteBuffer.allocate(Server.srv.READBUFFER_SIZE);
evaluator = new RequestEvaluator (this);
long lastReadTime = System.currentTimeMillis ();
shutdowntime = 0;
boolean suspend = false;
while (!suspend) try {
currPosition=WAITING;
if (!Server.srv.isRunning ()) {
if (shutdowntime == 0)
shutdowntime = System.currentTimeMillis () + 150000;
if (shutdowntime < System.currentTimeMillis ()) {
suspend = true;
break;
}
}
long diff = Server.srv.READER_MAX_IDLETIME;
if (!this.isFixed) {
// if this reader was idle too long, make this thread disapear
diff = System.currentTimeMillis () - lastReadTime;
if (diff > Server.srv.READER_MAX_IDLETIME && activeReaders () > 1)
break;
if (diff > Server.srv.READER_MAX_IDLETIME)
diff = Server.srv.READER_MAX_IDLETIME;
}
SelectionKey sk = reqQueue.getKey (diff);
if (sk == null) {
try {
Thread.sleep (10);
} catch (InterruptedException ie) { }
continue;
}
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
synchronized (cb) {
if (!cb.isValid()) {
cb.notifyAll();
CentralSelector.dropKey(sk);
continue;
}
currPosition=READING;
working = true;
workstart = lastReadTime = System.currentTimeMillis ();
read(sk);
reqQueue.popKey();
working = false;
cb.notifyAll();
}
} catch (Exception e) {
Server.debug ("RequestReader (outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
}
removeRequestReader (this);
Server.log ("RequestReader stopped", Server.MSG_STATE, Server.LVL_VERBOSE);
}
private void read (SelectionKey sk) {
ConnectionBuffer rb = (ConnectionBuffer) sk.attachment ();
if (rb == null || !rb.isValid() || (!sk.isValid () && !sk.channel ().isOpen ())) {
Server.log ("RequestReader.run: droped key", Server.MSG_STATE, Server.LVL_VERBOSE);
CentralSelector.dropKey (sk);
return;
}
long start = System.currentTimeMillis ();
IRequest req = rb.append ();
if (req != null) evaluator.evaluate (req);
if (Server.checkLogLvl (Server.MSG_STATE, Server.LVL_VERY_VERBOSE)) {
long proctime = System.currentTimeMillis () - start;
Server.log ("RequestReader.run: reading ended. took me " + proctime + " millis " + this.toString(), Server.MSG_STATE, Server.LVL_VERBOSE);
}
}
public boolean isSuspending () {
return (shutdowntime != 0 && !isFixed);
}
public int hashCode () { return (int) ID; }
public boolean equals (RequestReader r) { return r.getID () == ID; }
public short getID () { return ID; }
public String toString () { return ("[RequestReader " + ID + "]"); }
/* public void finalize () {
Server.log ("************ RequestReader finalized ************", Server.MSG_STATE, Server.LVL_MAJOR);
} */
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -