📄 taskserver.java
字号:
System.out.println("queryId = " + qids.queryId + " tinyDBQid = " + qids.tinyDBQid); try { tinyDBQuery = SensorQueryer.translateQuery(queryStr, tinyDBQid); } catch(Exception e) { e.printStackTrace(); error = TASKError.INVALID_QUERY; return error; } query.setQueryId(queryId); query.setTinyDBQid(tinyDBQid); if (whichQuery == SENSOR_QUERY) { sensorQuery = query; sensorTinyDBQuery = tinyDBQuery; } else { healthQuery = query; healthTinyDBQuery = tinyDBQuery; } if (query.getTableName() == null) { tableName = "query" + queryId + "_results"; query.setTableName(tableName); } else { tableName = query.getTableName(); } try { dbConn.setAutoCommit(false); ps = dbConn.prepareStatement("INSERT INTO task_query_log VALUES (?, ?, ?, ?, ?)"); ps.setInt(1, queryId); ps.setShort(2, tinyDBQid); ps.setString(3, queryStr); ps.setString(4, whichQuery == SENSOR_QUERY ? "sensor" : "health"); ps.setString(5, tableName); ps.executeUpdate(); ps.close(); ps = dbConn.prepareStatement("INSERT INTO task_query_time_log (query_id, start_time) VALUES (?, now())"); ps.setInt(1, queryId); ps.executeUpdate(); ps.close(); String createTableStmt = DBLogger.createTableStmt(tinyDBQuery, tableName); dbStmt.executeUpdate(createTableStmt); dbConn.commit(); dbConn.setAutoCommit(true); dbStmt.executeUpdate("DROP VIEW task_current_results"); dbStmt.executeUpdate("CREATE OR REPLACE VIEW task_current_results AS SELECT * FROM " + tableName); error = TASKError.SUCCESS; } catch (SQLException e) { try { dbConn.setAutoCommit(true); ps.close(); } catch (Exception e1) { } error = TASKError.FAIL; e.printStackTrace(); } // must inject query after result table is created try { TinyDBMain.injectQuery(tinyDBQuery,this); } catch(Exception e) { e.printStackTrace(); error = TASKError.TINYOS_MESSAGE_SEND_FAILED; } } else { System.out.println("TASKServer got existing query, id = " + queryId); if (whichQuery == SENSOR_QUERY && sensorQuery.getQueryId() != queryId || whichQuery == HEALTH_QUERY && healthQuery.getQueryId() != queryId) { error = TASKError.STALE_QUERY; return error; } if (whichQuery == SENSOR_QUERY) tinyDBQuery = sensorTinyDBQuery; else tinyDBQuery = healthTinyDBQuery; TinyDBMain.network.sendQuery(tinyDBQuery); try { ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() " + "WHERE query_id = ? AND stop_time IS NULL"); ps.setInt(1, queryId); ps.executeUpdate(); ps.close(); ps = dbConn.prepareStatement("INSERT INTO task_query_time_log (query_id, start_time) VALUES (?, now())"); ps.setInt(1, queryId); ps.executeUpdate(); ps.close(); error = TASKError.SUCCESS; } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } error = TASKError.FAIL; e.printStackTrace(); } } return error; } public int stopTASKQuery(short whichQuery) { int queryId; TinyDBQuery q; if (whichQuery == SENSOR_QUERY) { if (sensorTinyDBQuery == null || sensorQuery == null) { return TASKError.SUCCESS; } q = sensorTinyDBQuery; queryId = sensorQuery.getQueryId(); } else { if (healthTinyDBQuery == null || healthQuery == null) { return TASKError.SUCCESS; } q = healthTinyDBQuery; queryId = healthQuery.getQueryId(); } boolean ok = stopRunningQuery(q,queryId); if (ok) { return TASKError.SUCCESS; } else { return TASKError.FAIL; } } public boolean isTASKQueryActive(short whichQuery) { int queryId; TinyDBQuery q; boolean result; switch (whichQuery) { case SENSOR_QUERY: { if (sensorTinyDBQuery == null || sensorQuery == null) { return false; } q = sensorTinyDBQuery; queryId = sensorQuery.getQueryId(); break; } case HEALTH_QUERY: { if (healthTinyDBQuery == null || healthQuery == null) { return false; } q = healthTinyDBQuery; queryId = healthQuery.getQueryId(); break; } default: return false; } result = q.active(); return result; } public int runTASKCommand(TASKCommand command) { TASKCommandInfo commandInfo = null; PreparedStatement ps = null; boolean found = false; int error = TASKError.SUCCESS; for (Iterator it = commandInfos.iterator(); it.hasNext(); ) { commandInfo = (TASKCommandInfo)it.next(); if (commandInfo.getCommandName().equalsIgnoreCase(command.getCommandName())) { found = true; break; } } if (!found) { error = TASKError.INVALID_COMMAND; return error; } Message cmdMessage = command.getTinyOSMessage(commandInfo); if (cmdMessage == null) { System.out.println("Invalid command."); error = TASKError.INVALID_COMMAND; return error; } error = sendTinyOSMessage(cmdMessage); if (error != TASKError.SUCCESS) { return error; } try { String cmdStr = command.toString(commandInfo); ps = dbConn.prepareStatement("INSERT INTO task_command_log VALUES (?, now(), ?)"); ps.setInt(1, nextCommandId()); ps.setString(2, cmdStr); ps.executeUpdate(); ps.close(); error = TASKError.SUCCESS; } catch (SQLException e) { if (ps != null) { try { ps.close(); } catch (Exception e1) { } } error = TASKError.FAIL; e.printStackTrace(); } return error; } public Vector getTASKAttributes() { return attributeInfos; } public Vector getTASKCommands() { return commandInfos; } public void scheduleTASKShutdown(int seconds) { java.util.Date sdDate = new java.util.Date(System.currentTimeMillis() + seconds*1000); autoShutdownTimer = new Timer(); autoShutdownTimer.schedule(new ShutdownServerTask(), sdDate); return; } /** Create the server Currently, only one parameter, "-sim", is understood, indicating that the server should run in simulation mode. */ public static void main(String[] argv) { TASKServer server = new TASKServer(); String cfgstr; boolean sim; System.out.println("Starting TASK Server..."); if (argv.length == 1 && argv[0].equals("-sim")) sim = true; else sim = false; try { TinyDBMain.simulate = sim; TinyDBMain.debug = true; TinyDBMain.initMain(); TinyDBStatus status = new TinyDBStatus(TinyDBMain.network, TinyDBMain.mif, false); status.requestStatus(1000, 1); cfgstr = Config.getParam("task-server-port"); if (cfgstr == null) serverPort = DEFAULT_SERVER_PORT; else serverPort = Integer.valueOf(cfgstr).intValue(); cfgstr = Config.getParam("task-server-autoshutdown"); if (cfgstr !=null) { server.scheduleTASKShutdown(Integer.valueOf(cfgstr).intValue()); } server.initDBConn(); server.prefetchMetaData(); server.fetchRunningQueries(status.getQueryIds()); server.startHttpServer(); server.doServer(serverPort); // Never returns } catch (Exception e) { System.out.println("SERVER ERROR: " + e); e.printStackTrace(); } } class ShutdownServerTask extends TimerTask { public void run() { try { httpServer.stop(); } catch (Exception e) { } try { dbConn.close(); } catch (Exception e) { } System.exit(0); } } private Hashtable listeners = new Hashtable(); static private String urlPSQL; static private String dbUser; static private String dbPwd; static private Connection dbConn = null; static private Statement dbStmt = null; static private int serverPort; private TASKQuery sensorQuery; private TinyDBQuery sensorTinyDBQuery; private TASKQuery healthQuery; private TinyDBQuery healthTinyDBQuery; private TinyDBQuery calibTinyDBQuery = null; private boolean calibQueryInProgress = false; private Vector calibNodes; static private Vector attributeInfos; static private Vector commandInfos; static private Vector aggregateInfos; private Server httpServer = null; private Timer autoShutdownTimer;}/** Internal class that's responsible for enqueing incoming connections */class SocketLoop implements Runnable { Vector connections; //note that vector is synchronized, which allows //us to get away with all sorts of things that look non-thread safe ServerSocket ss; public SocketLoop(ServerSocket ss) { Thread t = new Thread(this); connections = new Vector(); this.ss = ss; t.start(); } public void run() { Socket sock; int numConnections = 0; long lastTime = System.currentTimeMillis(); int lastNumConnections = 0; while (true) { try { lastTime = System.currentTimeMillis(); lastNumConnections = connections.size(); sock = ss.accept(); if (connections.size() > TASKServer.MAX_CLIENT_CONNECTIONS) { System.out.println("Accepting And Closing Connetction"); sock.close(); } else { System.out.println("Adding new connection"); connections.addElement(sock); } } catch (java.io.InterruptedIOException e) { //just finished listening } catch (Exception e) { System.out.println("Error in accepting connection: " + e); } } } public Socket getConnection() { Socket sock; //loop forever int i=0; while (connections.size() == 0) { i++; try { Thread.currentThread().sleep(10); } catch (Exception e) { //who cares? } if (i%1000==0) System.err.print("."); } sock = (Socket)connections.elementAt(0); connections.removeElementAt(0); return sock; }}class QueryIds{ public QueryIds(int queryId, byte tinyDBQid) { this.queryId = queryId; this.tinyDBQid = tinyDBQid; } public int queryId; public byte tinyDBQid;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -