📄 taskserver.java
字号:
// $Id: TASKServer.java,v 1.1.2.3 2003/09/25 01:18:37 whong Exp $/* tab:4 * "Copyright (c) 2000-2003 The Regents of the University of California. * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement is * hereby granted, provided that the above copyright notice, the following * two paragraphs and the author appear in all copies of this software. * * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS." * * Copyright (c) 2002-2003 Intel Corporation * All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE * file. If you do not find these files, copies can be found by writing to * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, Berkeley, CA, * 94704. Attention: Intel License Inquiry. */package net.tinyos.task.tasksvr;import java.util.*;import java.net.*;import java.io.*;import java.sql.*;import net.tinyos.tinydb.*;import net.tinyos.tinydb.parser.*;import net.tinyos.task.taskapi.*;import net.tinyos.message.*;/** TASKServer handles commands on port SERVER_PORT, Commands have the form: commandid\n commandparam1\n commandparam2\n ... Results are returned as a character followed by a newline on this socket; typically, a result code of '1' indicates success, anything else indicates failure. After the result code, the connection is usually closed, although some commands create persisten connections (e.g. ADD_LISTENER) See the list of available commands below @author madden@cs.berkeley.edu*/public class TASKServer implements ResultListener { public static final int DEFAULT_SERVER_PORT = 5431; //the default server port static final int MAX_HANG_TIME=512; //in ms /** RUN_QUERY_COMMAND initiates a query on the server parameters are as follows: 1: query id (byte in range 0-127) 2: query string (SQL string) */ public static final short RUN_QUERY=1; /** ADD_LISTENER adds a listener for a specified query id (which need not be running yet.) Parameters are as follows: 1: query id Note that after the result code is returned, the connection remains open and query results are streamed accross it */ public static final short ADD_LISTENER=2; public static final short PREFETCH_METADATA=3; public static final short GET_QUERY=4; // moved to the client side for performance reasons // public static final short ESTIMATE_LIFETIME=5; // public static final short ESTIMATE_SAMPLEPERIODS=6; public static final short GET_CLIENTINFOS=7; public static final short GET_CLIENTINFO=8; public static final short ADD_CLIENTINFO=9; public static final short DELETE_CLIENTINFO=10; public static final short GET_MOTECLIENTINFO=11; public static final short ADD_MOTE=12; public static final short DELETE_MOTE=13; public static final short GET_ALLMOTECLIENTINFO=14; public static final short GET_SERVERCONFIGINFO=15; public static final short STOP_QUERY=16; public static final short RUN_COMMAND=17; public static final short DELETE_MOTES=18; public static final short RUN_CALIBRATION=19; public static final short SENSOR_QUERY=1; public static final short HEALTH_QUERY=2; // reserved TinyDB query id for the calibration query public static final byte CALIBRATION_TINYDB_QID=0; public static final byte CALIBRATION_QUERY_ID=0; //maximum number of outstaning connections static final int MAX_CLIENT_CONNECTIONS=5; /** Do the main loop of the server, listening on port <port> */ private void doServer(int port) throws IOException { Socket sock = null; ObjectInputStream r; ObjectOutputStream w; InetAddress from; ServerSocket ss = null; SocketLoop sl; boolean closeSock; short commandId; try { ss = new ServerSocket(port); ss.setSoTimeout(MAX_HANG_TIME); sl = new SocketLoop(ss); //start the socket listener } catch (IOException e) { System.out.println("doServer failed, couldn't open new ServerSocket: " + e); throw e; } while (true) { try { sock = sl.getConnection(); w = new ObjectOutputStream(sock.getOutputStream()); w.flush(); r = new ObjectInputStream(sock.getInputStream()); commandId = r.readShort(); System.out.println("Read command: " + commandId); closeSock = processCommand(commandId, r, w, sock); if (closeSock) sock.close(); } catch (SocketException e) { //probably means client disappeared System.out.println("Socket exception, client is dead? \n" +e); if (sock != null) sock.close(); } catch (IOException e) { System.out.println("Error in getting command -- " + e); throw e; } } } /** Process a command received over the server socket @param command The first line of the command from the socket @param inStream An ObjectInputStream containing the rest of the command (if any) @param sock The socket over which the command was received and acknowledgements can be sent @throws IOException If an IO error occurs @throws NumberFormatException If the command can't be parsed or is invalid @return true if the socket should be close, false if it should be left open. */ private boolean processCommand(short commandId, ObjectInputStream inStream, ObjectOutputStream outStream, Socket sock) throws IOException, NumberFormatException { switch (commandId) { case RUN_QUERY: { TASKQuery query = null; try { query = (TASKQuery)inStream.readObject(); } catch (Exception e) { e.printStackTrace(); } short whichQuery = inStream.readShort(); String queryStr; int error = TASKError.SUCCESS; int queryId = query.getQueryId(); boolean isNewQuery = (queryId == TASKQuery.INVALID_QUERYID); byte tinyDBQid; TinyDBQuery tinyDBQuery = null; PreparedStatement ps = null; String tableName = null; if (isNewQuery) { QueryIds qids; // XXX should try to unify TASKQuery and TinyDBQuery queryStr = query.toSQL(); System.out.println("TASKServer got new query: " + queryStr); qids = nextQueryId(); queryId = qids.queryId; tinyDBQid = qids.tinyDBQid; System.out.println("queryId = " + qids.queryId + " tinyDBQid = " + qids.tinyDBQid); try { tinyDBQuery = SensorQueryer.translateQuery(queryStr, tinyDBQid); TinyDBMain.injectQuery(tinyDBQuery,this); } catch(Exception e) { e.printStackTrace(); error = TASKError.INVALID_QUERY; outStream.writeInt(error); outStream.flush(); return true; } query.setQueryId(queryId); query.setTinyDBQid(tinyDBQid); if (whichQuery == SENSOR_QUERY) { sensorQuery = query; sensorTinyDBQuery = tinyDBQuery; } else { healthQuery = query; healthTinyDBQuery = tinyDBQuery; } if (query.getTableName() == null) { tableName = "query" + queryId + "_results"; query.setTableName(tableName); } else tableName = query.getTableName(); try { dbConn.setAutoCommit(false); ps = dbConn.prepareStatement("INSERT INTO task_query_log VALUES (?, ?, ?, ?, ?)"); ps.setInt(1, queryId); ps.setShort(2, tinyDBQid); ps.setString(3, queryStr); ps.setString(4, whichQuery == SENSOR_QUERY ? "sensor" : "health"); ps.setString(5, tableName); ps.executeUpdate(); ps.close(); ps = dbConn.prepareStatement("INSERT INTO task_query_time_log (query_id, start_time) VALUES (?, now())"); ps.setInt(1, queryId); ps.executeUpdate(); ps.close(); String createTableStmt = DBLogger.createTableStmt(tinyDBQuery, tableName); dbStmt.executeUpdate(createTableStmt); dbConn.commit(); dbConn.setAutoCommit(true); outStream.writeInt(TASKError.SUCCESS); } catch (SQLException e) { try { dbConn.setAutoCommit(true); ps.close(); } catch (Exception e1) { } outStream.writeInt(TASKError.FAIL); e.printStackTrace(); } } else { if (whichQuery == SENSOR_QUERY && sensorQuery.getQueryId() != queryId || whichQuery == HEALTH_QUERY && healthQuery.getQueryId() != queryId) { error = TASKError.STALE_QUERY; outStream.writeInt(error); outStream.flush(); return true; } if (whichQuery == SENSOR_QUERY) tinyDBQuery = sensorTinyDBQuery; else tinyDBQuery = healthTinyDBQuery; TinyDBMain.network.sendQuery(tinyDBQuery); try { ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL"); ps.setInt(1, queryId); ps.executeUpdate(); ps.close(); ps = dbConn.prepareStatement("INSERT INTO task_query_time_log (query_id, start_time) VALUES (?, now())"); ps.setInt(1, queryId); ps.executeUpdate(); ps.close(); outStream.writeInt(TASKError.SUCCESS); } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } outStream.writeInt(TASKError.FAIL); e.printStackTrace(); } } outStream.flush(); return true; } case ADD_LISTENER: { short whichQuery = inStream.readShort(); TASKQuery query; Integer queryId; Vector ls; if (whichQuery == SENSOR_QUERY) query = sensorQuery; else query = healthQuery; queryId = new Integer(query.getQueryId()); System.out.println("add listener for query id " + queryId); ls = (Vector)listeners.get(queryId); if (ls == null) { ls = new Vector(); listeners.put(queryId, ls); } ls.addElement(outStream); System.out.println("add listener done."); outStream.writeInt(TASKError.SUCCESS); outStream.flush(); System.out.println("add listener success message sent."); return false; } case PREFETCH_METADATA: outStream.writeObject(attributeInfos); System.out.println("attributeinfos send done."); outStream.writeObject(commandInfos); System.out.println("commandinfos send done."); outStream.writeObject(aggregateInfos); System.out.println("aggregateinfos send done."); outStream.writeInt(TASKError.SUCCESS); outStream.flush(); return true; case GET_QUERY: { short whichQuery = inStream.readShort(); if (whichQuery == SENSOR_QUERY) outStream.writeObject(sensorQuery); else outStream.writeObject(healthQuery); outStream.flush(); return true; } /* * moved to client side case ESTIMATE_LIFETIME: { TASKQuery sensorQuery = null, healthQuery = null; int lifeTime; try { sensorQuery = (TASKQuery)inStream.readObject(); healthQuery = (TASKQuery)inStream.readObject(); } catch (Exception e) { e.printStackTrace(); } lifeTime = estimateLifeTime(sensorQuery, healthQuery); outStream.writeInt(lifeTime); outStream.flush(); return true; } case ESTIMATE_SAMPLEPERIODS: { TASKQuery sensorQuery = null, healthQuery = null; int lifeTime; lifeTime = inStream.readInt(); try { sensorQuery = (TASKQuery)inStream.readObject(); healthQuery = (TASKQuery)inStream.readObject(); } catch (Exception e) { e.printStackTrace(); } estimateSamplePeriods(lifeTime, sensorQuery, healthQuery); outStream.writeInt(sensorQuery.getSamplePeriod()); outStream.writeInt(healthQuery.getSamplePeriod()); outStream.flush(); return true; } */ case GET_CLIENTINFOS: { Vector clientInfos = new Vector(); try { ResultSet rs = dbStmt.executeQuery("SELECT name FROM task_client_info"); while (rs.next()) { clientInfos.add(rs.getString(1)); } rs.close(); } catch (Exception e) { e.printStackTrace(); } outStream.writeObject(clientInfos); System.out.println("Vector of ClientInfo names sent."); outStream.flush(); return true; } case GET_CLIENTINFO: { try { String name = (String)inStream.readObject(); ResultSet rs = dbStmt.executeQuery("SELECT name, type, clientinfo FROM task_client_info where name = '" + name + "'"); TASKClientInfo clientInfo; if (!rs.next()) clientInfo = null; clientInfo = new TASKClientInfo(rs.getString(1), rs.getString(2), rs.getBytes(3)); rs.close(); outStream.writeObject(clientInfo); } catch (Exception e) { outStream.writeObject(null); e.printStackTrace(); } outStream.flush(); return true; } case ADD_CLIENTINFO: { TASKClientInfo clientInfo = null; try { clientInfo = (TASKClientInfo)inStream.readObject(); } catch (Exception e) { e.printStackTrace(); } PreparedStatement ps = null; try { ps = dbConn.prepareStatement("INSERT INTO task_client_info values (?, ?, ?)"); ps.setString(1, clientInfo.name); ps.setString(2, clientInfo.type); ps.setBytes(3, clientInfo.data); ps.executeUpdate(); ps.close(); outStream.writeInt(TASKError.SUCCESS); } catch (SQLException e) { if (ps != null) { try { ps.close(); } catch (Exception e1) { e1.printStackTrace(); } } outStream.writeInt(TASKError.FAIL); e.printStackTrace(); } outStream.flush(); return true; } case DELETE_CLIENTINFO: { String name = null; try { name = (String)inStream.readObject(); } catch (Exception e) { e.printStackTrace(); } PreparedStatement ps = null; try { ps = dbConn.prepareStatement("DELETE FROM task_client_info WHERE name = ?"); ps.setString(1, name); ps.executeUpdate(); ps.close(); outStream.writeInt(TASKError.SUCCESS); } catch (SQLException e) { if (ps != null) { try { ps.close(); } catch (Exception e1) { e1.printStackTrace(); } } outStream.writeInt(TASKError.FAIL); e.printStackTrace(); } outStream.flush(); return true; } case GET_MOTECLIENTINFO: { try { int moteId = inStream.readInt(); ResultSet rs = dbStmt.executeQuery("SELECT mote_id, clientinfo_name, x_coord, y_coord, z_coord, moteinfo, clientinfo_name FROM task_mote_info where mote_id = " + moteId); TASKMoteClientInfo moteClientInfo; if (!rs.next()) moteClientInfo = null; moteClientInfo = new TASKMoteClientInfo(rs.getInt(1), rs.getDouble(2), rs.getDouble(3), rs.getDouble(4), rs.getBytes(5), rs.getString(6)); rs.close();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -