📄 taskserver.java
字号:
} } private void handleCalibrationResult(QueryResult qr) { Vector fieldValues = qr.getFieldValueObjs(); if (fieldValues.size() < 2) return; int nodeId = ((Integer)fieldValues.firstElement()).intValue(); byte[] calib = (byte[])(fieldValues.elementAt(1)); if (!calibQueryInProgress) { System.out.println("got calibration result from node " + nodeId + ", but the calibration query is no longer in progress."); return; } int mote = -1; int i = 0; for (Iterator it = calibNodes.iterator(); it.hasNext(); i++) { if (((Integer)it.next()).intValue() == nodeId) { mote = i; break; } } PreparedStatement ps = null; if (mote < 0) { System.out.println("already got calibration for node " + nodeId); return; } else { System.out.println("got new calibration for node " + nodeId); calibNodes.remove(mote); if (calibNodes.isEmpty()) { TinyDBMain.network.abortQuery(calibTinyDBQuery); try { ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL"); ps.setInt(1, CALIBRATION_QUERY_ID); ps.executeUpdate(); ps.close(); } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } e.printStackTrace(); } calibQueryInProgress = false; } } try { ps = dbConn.prepareStatement("UPDATE task_mote_info SET calib = ? WHERE mote_id = ?"); ps.setBytes(1, calib); ps.setInt(2, nodeId); ps.executeUpdate(); ps.close(); } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } e.printStackTrace(); } } /** Stop a currently running query */ private boolean stopRunningQuery(TinyDBQuery q, int qid) { TinyDBMain.network.abortQuery(q); PreparedStatement ps = null; try { ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL"); ps.setInt(1, qid); ps.executeUpdate(); ps.close(); return true; } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } e.printStackTrace(); return false; } } /** Create the server Currently, only one parameter, "-sim", is understood, indicating that the server should run in simulation mode. */ public static void main(String[] argv) { TASKServer server = new TASKServer(); boolean sim; System.out.println("TASK Server online, waiting for connections."); if (argv.length == 1 && argv[0].equals("-sim")) sim = true; else sim = false; try { TinyDBMain.simulate = sim; TinyDBMain.debug = true; TinyDBMain.initMain(); TinyDBStatus status = new TinyDBStatus(TinyDBMain.network, TinyDBMain.mif, false); status.requestStatus(1000, 1); String serverPortStr = Config.getParam("task-server-port"); if (serverPortStr == null) serverPort = DEFAULT_SERVER_PORT; else serverPort = Integer.valueOf(serverPortStr).intValue(); initDBConn(); prefetchMetaData(); server.fetchRunningQueries(status.getQueryIds()); server.doServer(serverPort); } catch (Exception e) { System.out.println("SERVER ERROR: " + e); e.printStackTrace(); } } private void fetchRunningQueries(Vector qids) { short qid1 = -1, qid2 = -1; if (qids.size() == 1) qid1 = ((Integer)qids.firstElement()).shortValue(); else if (qids.size() >= 2) { qid1 = ((Integer)qids.firstElement()).shortValue(); qid2 = ((Integer)qids.elementAt(1)).shortValue(); if (qids.size() > 2) System.out.println("more than two queries running in sensor network."); } System.out.println("feching running queries " + qid1 + " and " + qid2); if (qid1 != -1) { PreparedStatement ps = null; try { String sensorQueryStr = null, healthQueryStr = null, queryType; byte sensorTinyDBQid = 0, healthTinyDBQid = 0; int sensorQueryId = TASKQuery.INVALID_QUERYID, healthQueryId = TASKQuery.INVALID_QUERYID; String sensorTableName = null, healthTableName = null; ps = dbConn.prepareStatement("SELECT q.query_text, q.query_type, q.query_id, q.table_name FROM task_query_log q, task_query_time_log t WHERE q.tinydb_qid = ? AND q.query_id = t.query_id AND t.stop_time IS NULL;"); ps.setInt(1, qid1); ResultSet rs = ps.executeQuery(); if (rs.next()) { queryType = rs.getString(2); if (queryType.equalsIgnoreCase("health")) { healthQueryStr = rs.getString(1); healthTinyDBQid = (byte)qid1; healthQueryId = rs.getInt(3); healthTableName = rs.getString(4); } else if (queryType.equalsIgnoreCase("sensor")) { sensorQueryStr = rs.getString(1); sensorTinyDBQid = (byte)qid1; sensorQueryId = rs.getInt(3); sensorTableName = rs.getString(4); System.out.println("fetched sensor query " + sensorQueryId + ": " + sensorQueryStr); } } rs.close(); if (qid2 != -1) { ps.setInt(1, qid2); rs = ps.executeQuery(); if (rs.next()) { queryType = rs.getString(2); if (queryType.equalsIgnoreCase("health")) { healthQueryStr = rs.getString(1); healthTinyDBQid = (byte)qid2; healthQueryId = rs.getInt(3); healthTableName = rs.getString(4); } else { sensorQueryStr = rs.getString(1); sensorTinyDBQid = (byte)qid2; sensorQueryId = rs.getInt(3); sensorTableName = rs.getString(4); System.out.println("fetched sensor query " + sensorQueryId + ": " + sensorQueryStr); } } rs.close(); } ps.close(); ps = null; if (sensorQueryStr != null) { sensorTinyDBQuery = SensorQueryer.translateQuery(sensorQueryStr, sensorTinyDBQid); sensorQuery = new TASKQuery(sensorTinyDBQuery, sensorQueryId, sensorTableName); TinyDBMain.notifyAddedQuery(sensorTinyDBQuery); TinyDBMain.network.addResultListener(this, true, sensorTinyDBQid); } if (healthQueryStr != null) { healthTinyDBQuery = SensorQueryer.translateQuery(healthQueryStr, healthTinyDBQid); healthQuery = new TASKQuery(healthTinyDBQuery, healthQueryId, healthTableName); TinyDBMain.notifyAddedQuery(healthTinyDBQuery); TinyDBMain.network.addResultListener(this, true, healthTinyDBQid); } } catch (Exception e) { if (ps != null) { try { ps.close(); } catch (Exception e1) { } } e.printStackTrace(); } } } public TASKServer() { calibQueryInProgress = false; } private static void initDBConn() { try { urlPSQL = "jdbc:postgresql://" + Config.getParam("postgres-host") + "/" + Config.getParam("postgres-db"); dbUser = Config.getParam("postgres-user"); dbPwd = Config.getParam("postgres-passwd"); System.out.println("urlPSQL = " + urlPSQL + ", dbUser = " + dbUser + ", dbPwd = " + dbPwd); Class.forName ( "org.postgresql.Driver" ); dbConn = DriverManager.getConnection(urlPSQL, dbUser, dbPwd); dbStmt = dbConn.createStatement(); if (TinyDBMain.debug) System.out.println("connected to " + urlPSQL); } catch (Exception ex) { System.out.println("failed to connect to Postgres!\n"); ex.printStackTrace(); } if (TinyDBMain.debug) System.out.println("Connected to Postgres!\n"); } private static void prefetchMetaData() { attributeInfos = new Vector(); commandInfos = new Vector(); aggregateInfos = new Vector(); try { ResultSet rs = dbStmt.executeQuery("SELECT name, typeid, power_cons, description FROM task_attributes"); TASKAttributeInfo attrInfo; while (rs.next()) { attrInfo = new TASKAttributeInfo(rs.getString(1), rs.getInt(2), rs.getInt(3), rs.getString(4)); attributeInfos.add(attrInfo); } rs.close(); rs = dbStmt.executeQuery("SELECT name, return_type, num_args, arg_types, description FROM task_commands"); TASKCommandInfo cmdInfo; while (rs.next()) { cmdInfo = new TASKCommandInfo(rs.getString(1), rs.getInt(2), getIntArray(rs.getString(4)), rs.getString(5)); commandInfos.add(cmdInfo); } rs.close(); rs = dbStmt.executeQuery("SELECT name, return_type, num_args, arg_type, description FROM task_aggregates"); TASKAggInfo aggInfo; while (rs.next()) { aggInfo = new TASKAggInfo(rs.getString(1), rs.getInt(2), rs.getInt(3) - 1, rs.getInt(4), rs.getString(5)); aggregateInfos.add(aggInfo); } rs.close(); } catch (Exception e) { e.printStackTrace(); } } /** * convert a string representation of a PostgreSQL array * to an int[] */ public static int[] getIntArray(String pgArrayStr) { Vector v = new Vector(); int curPos = 1; int endPos; int[] ret; while ((endPos = pgArrayStr.indexOf(',', curPos)) != -1) { v.add(Integer.valueOf(pgArrayStr.substring(curPos, endPos))); for (curPos = endPos + 1; pgArrayStr.charAt(curPos) == ' '; curPos++); } String intStr = pgArrayStr.substring(curPos, pgArrayStr.length() - 1); if (intStr.length() > 0) v.add(Integer.valueOf(intStr)); ret = new int[v.size()]; for (int i = 0; i < v.size(); i++) ret[i] = ((Integer)v.elementAt(i)).intValue(); return ret; } private int nextCommandId() { int commandId = -1; PreparedStatement ps = null; try { dbConn.setAutoCommit(false); ResultSet rs = dbStmt.executeQuery("SELECT command_id FROM task_next_query_id"); if (rs.next()) { commandId = rs.getInt(1); } rs.close(); ps = dbConn.prepareStatement("UPDATE task_next_query_id SET command_id = command_id + 1"); ps.executeUpdate(); ps.close(); dbConn.commit(); dbConn.setAutoCommit(true); } catch (Exception e) { try { dbConn.setAutoCommit(true); ps.close(); } catch (Exception e1) { } e.printStackTrace(); } return commandId; } private QueryIds nextQueryId() { int queryId = TASKQuery.INVALID_QUERYID; QueryIds qids = null; short tinyDBQid = -1; PreparedStatement ps = null; try { dbConn.setAutoCommit(false); ResultSet rs = dbStmt.executeQuery("SELECT query_id, tinydb_qid FROM task_next_query_id"); if (rs.next()) { queryId = rs.getInt(1); tinyDBQid = rs.getShort(2); } rs.close(); qids = new QueryIds(queryId, (byte)tinyDBQid); tinyDBQid = (short)((tinyDBQid + 1) & (short)0x7F); // truncate to 1 byte ps = dbConn.prepareStatement("UPDATE task_next_query_id SET query_id = query_id + 1, tinydb_qid = ?"); ps.setShort(1, tinyDBQid); ps.executeUpdate(); ps.close(); dbConn.commit(); dbConn.setAutoCommit(true); } catch (Exception e) { try { dbConn.setAutoCommit(true); ps.close(); } catch (Exception e1) { } e.printStackTrace(); } return qids; } public static int getServerPort() { return serverPort; } private Hashtable listeners = new Hashtable(); static private String urlPSQL; static private String dbUser; static private String dbPwd; static private Connection dbConn = null; static private Statement dbStmt = null; static private int serverPort; private TASKQuery sensorQuery; private TinyDBQuery sensorTinyDBQuery; private TASKQuery healthQuery; private TinyDBQuery healthTinyDBQuery; private TinyDBQuery calibTinyDBQuery = null; private boolean calibQueryInProgress = false; private Vector calibNodes; static private Vector attributeInfos; static private Vector commandInfos; static private Vector aggregateInfos;}/** Internal class that's responsible for enqueing incoming connections */class SocketLoop implements Runnable { Vector connections; //note that vector is synchronized, which allows //us to get away with all sorts of things that look non-thread safe ServerSocket ss; public SocketLoop(ServerSocket ss) { Thread t = new Thread(this); connections = new Vector(); this.ss = ss; t.start(); } public void run() { Socket sock; int numConnections = 0; long lastTime = System.currentTimeMillis(); int lastNumConnections = 0; while (true) { try { lastTime = System.currentTimeMillis(); lastNumConnections = connections.size(); sock = ss.accept(); if (connections.size() > TASKServer.MAX_CLIENT_CONNECTIONS) { System.out.println("Accepting And Closing Connetction"); sock.close(); } else { System.out.println("Adding new connection"); connections.addElement(sock); } } catch (java.io.InterruptedIOException e) { //just finished listening } catch (Exception e) { System.out.println("Error in accepting connection: " + e); } } } public Socket getConnection() { Socket sock; //loop forever int i=0; while (connections.size() == 0) { i++; try { Thread.currentThread().sleep(10); } catch (Exception e) { //who cares? } if (i%1000==0) System.err.print("."); } sock = (Socket)connections.elementAt(0); connections.removeElementAt(0); return sock; }}class QueryIds{ public QueryIds(int queryId, byte tinyDBQid) { this.queryId = queryId; this.tinyDBQid = tinyDBQid; } public int queryId; public byte tinyDBQid;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -