📄 simpleserver.java
字号:
/* * @(#)$Id: SimpleServer.java,v 1.12 2004/07/20 18:45:39 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package pier.helpers.handlers;import java.io.ByteArrayInputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.Properties;import org.apache.log4j.Logger;import pier.components.PierFrontend;import pier.components.PierNetServer;import pier.data.Tuple;import pier.data.TupleCollection;import pier.helpers.formatters.Formatter;import pier.helpers.formatters.SimpleFormatter;import pier.query.QueryPlan;import pier.query.QueryTag;import services.LocalNode;import services.Output;import services.network.tcpraw.TCPRawConnection;import services.timer.TimerClient;import util.logging.StructuredLogMessage;/** * Class SimpleServer */public class SimpleServer implements NetHandler, TimerClient { private static Logger logger = Logger.getLogger(SimpleServer.class); protected static final int OPTIONS_END_CHAR = '#'; protected static final int QUERY_END_CHAR = 4; protected static final int CONNECTION_END_CHAR = 4; protected static final String QUERY_START = "#"; protected static final int STATE_OPTIONS_READ = 1; protected static final int STATE_QUERY_READ = 2; protected static final int STATE_REQUEST_PROCESS = 3; protected static final int STATE_QUERY_STARTED = 4; protected static final int STATE_QUERY_ENDED = 5; protected static final int STATE_CONNECTION_CLOSED = 6; protected static final String FORMATTER_KEY = "FORMATTER"; protected static final String DEFAULT_FORMATTER = null; protected static final String TIMEOUT_KEY = "TIMEOUT"; protected static final String DEFAULT_TIMEOUT = "0"; protected static final String MAXTUPLES_KEY = "MAXTUPLES"; protected static final String DEFAULT_MAXTUPLES = "0"; protected static final String MAXBYTES_KEY = "MAXBYTES"; protected static final String DEFAULT_MAXBYTES = "0"; protected static final String TIMESTAMP_KEY = "TIMESTAMP"; protected static final String DEFAULT_TIMESTAMP = "1"; protected static final Integer SIGNAL_TIMEOUT = new Integer(1); protected static final int READSIZE = 1500; protected TCPRawConnection connection; protected PierNetServer parent; protected String options; protected Properties optionsTable; protected String query; protected ByteBuffer readBuffer; protected int curState; protected Formatter formatter; protected int refnum; protected int maxTuples; protected long maxBytes; protected int timestampType; protected long queryStartTime; /** * Constructor SimpleServer */ public SimpleServer() { this.readBuffer = ByteBuffer.allocate(READSIZE); this.curState = STATE_OPTIONS_READ; this.options = new String(); optionsTable = new Properties(); this.query = QUERY_START; this.formatter = new SimpleFormatter(); this.refnum = 1; } /** * Method init * * @param connection * @param parent */ public void init(TCPRawConnection connection, PierNetServer parent) { this.connection = connection; this.parent = parent; } /** * Method closeConnection */ public void closeConnection() { curState = STATE_CONNECTION_CLOSED; connection.disconnect(); parent.removeConnection(connection); } /** * Method readConnection */ public void readConnection() { // Read new data readBuffer.clear(); int bytesRead = connection.read(readBuffer); readBuffer.position(0); if (bytesRead > 0) { // Append new data to partialReadString byte[] theBytes = new byte[bytesRead]; String theString = null; readBuffer.get(theBytes); theString = new String(theBytes); String[] result; switch (curState) { case STATE_OPTIONS_READ: result = findChar(theString, options, OPTIONS_END_CHAR); options = result[0]; if (result[1] != null) { processOptions(); curState = STATE_QUERY_READ; theString = result[1]; } else { break; } case STATE_QUERY_READ: result = findChar(theString, query, QUERY_END_CHAR); query = result[0]; if (result[1] != null) { curState = STATE_REQUEST_PROCESS; theString = result[1]; } else { break; } case STATE_REQUEST_PROCESS: startQuery(); curState = STATE_QUERY_STARTED; break; default: if (theString.length() > 0) { int findChar = theString.indexOf(CONNECTION_END_CHAR); if (findChar >= 0) { closeConnection(); break; } } } } } /** * Method findChar * * @param src * @param dst * @param searchChar * @return */ public String[] findChar(String src, String dst, int searchChar) { int findChar = src.indexOf(searchChar); if (findChar == -1) { dst = dst + src; return new String[]{dst, null}; } else { if (findChar > 0) { dst = dst + src.substring(0, findChar); } String leftOver = src.substring(findChar + 1); return new String[]{dst, leftOver}; } } /** * Method processOptions */ public void processOptions() { // Replace (non slash), with \n options = options.replaceAll("[^\\\\]\\u002C", "\n"); // Replace \, with , options = options.replaceAll("\\\\,", ","); // Replace \h with # options = options.replaceAll("\\\\h", "#"); try { optionsTable.load(new ByteArrayInputStream(options.getBytes())); } catch (Exception exception) { error(PierFrontend.createErrorFromThrowable(exception), refnum, null, null); } if (optionsTable.getProperty(FORMATTER_KEY, DEFAULT_FORMATTER) != null) { try { Class formatterClass = Class.forName(optionsTable.getProperty(FORMATTER_KEY, DEFAULT_FORMATTER)); formatter = (Formatter) formatterClass.newInstance(); } catch (Exception exception) { error(PierFrontend.createErrorFromThrowable(exception), refnum, null, null); } } } /** * Method currentTimestamp * @return */ public long currentTimestamp() { switch (timestampType) { case 0: return -1; case 1: long currentTime = (long) (LocalNode.myTimer.getCurrentTime() * 1000); return (currentTime - queryStartTime); case 2: return (long) (LocalNode.myTimer.getCurrentTime() * 1000); default: return -1; } } /** * Method startQuery */ public void startQuery() { queryStartTime = (long) (LocalNode.myTimer.getCurrentTime() * 1000); double timeoutValue = Double.parseDouble(optionsTable.getProperty(TIMEOUT_KEY, DEFAULT_TIMEOUT)); if (timeoutValue > 0) { LocalNode.myTimer.schedule(timeoutValue, SIGNAL_TIMEOUT, this); } maxTuples = Integer.parseInt(optionsTable.getProperty(MAXTUPLES_KEY, DEFAULT_MAXTUPLES)); maxBytes = Integer.parseInt(optionsTable.getProperty(MAXBYTES_KEY, DEFAULT_MAXBYTES)); timestampType = Integer.parseInt(optionsTable.getProperty(TIMESTAMP_KEY, DEFAULT_TIMESTAMP)); connection.write(formatter.messagePreamble(Formatter.MESSAGE_TYPE_ALL, currentTimestamp())); connection.write(formatter.formatQuery(query, refnum, null, currentTimestamp())); connection.write(formatter.formatMessage(options, refnum, null, null, currentTimestamp())); QueryPlan thePlan = parent.startQuery(query, refnum, this); if (thePlan != null) { connection.write(formatter.formatQuery(thePlan, refnum, null, currentTimestamp())); } } /** * Method result * * @param tuples * @param refnum * @param queryTag * @param source */ public void result(TupleCollection tuples, int refnum, QueryTag queryTag, InetSocketAddress source) { if ((curState == STATE_QUERY_STARTED) && (curState != STATE_CONNECTION_CLOSED)) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( tuples, "Sending Results to Client", null, null)); } int numTuples = tuples.size(); for (int i = 0; i < numTuples; i++) { Tuple theTuple = tuples.getTuple(i); ByteBuffer tupleBuffer = formatter.formatTuple(theTuple, refnum, queryTag, source, currentTimestamp()); int tupleBytes = tupleBuffer.limit(); connection.write(tupleBuffer); if (maxTuples > 0) { maxTuples--; if (maxTuples == 0) { curState = STATE_QUERY_ENDED; connection.write( formatter.formatMessage( "Connection Tuple Maximum Reached", refnum, null, null, currentTimestamp())); closeConnection(); break; } } if (maxBytes > 0) { maxBytes -= tupleBytes; if (maxBytes <= 0) { curState = STATE_QUERY_ENDED; connection.write( formatter.formatMessage( "Connection Byte Maximum Reached", refnum, null, null, currentTimestamp())); closeConnection(); break; } } } } } /** * Method error * * @param errorMessage * @param refnum * @param queryTag * @param source */ public void error(String errorMessage, int refnum, QueryTag queryTag, InetSocketAddress source) { if (curState != STATE_CONNECTION_CLOSED) { connection.write(formatter.formatError(errorMessage, refnum, queryTag, source, currentTimestamp())); } } /** * Method handleClock * * @param clockData */ public void handleClock(Object clockData) { if ((clockData.equals(SIGNAL_TIMEOUT)) && (curState != STATE_CONNECTION_CLOSED)) { curState = STATE_QUERY_ENDED; connection.write( formatter.formatMessage( "Connection Timeout Reached", refnum, null, null, currentTimestamp())); closeConnection(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -