📄 taskserver.java
字号:
/* if (ls != null) { Iterator it = ls.iterator(); while (it.hasNext()) { ObjectOutputStream outStream = (ObjectOutputStream)it.next(); try { // XXX repeatedly sending fieldinfos is very wasteful! Vector fieldInfos = new Vector(); Vector fieldValues = qr.getFieldValueObjs(); TinyDBQuery q = qr.getQuery(); for (int i = 0; i < fieldValues.size(); i++) { QueryField qf = q.getField(i); fieldInfos.add(new TASKFieldInfo(qf)); } fieldValues.add(new Integer(qr.epochNo())); fieldInfos.add(new TASKFieldInfo("epoch", TASKTypes.UINT16)); TASKResult result = new TASKResult(queryId, fieldValues, fieldInfos); outStream.writeObject(result); outStream.flush(); System.out.println("TASKResult sent "+qr.epochNo()); } catch (IOException e) { System.out.println("Removing listener."); it.remove(); //delete this listener, since it died e.printStackTrace(); } } } */ } private void handleCalibrationResult(QueryResult qr) { Vector fieldValues = qr.getFieldValueObjs(); if (fieldValues.size() < 2) return; int nodeId = ((Integer)fieldValues.firstElement()).intValue(); byte[] calib = (byte[])(fieldValues.elementAt(1)); if (!calibQueryInProgress) { System.out.println("got calibration result from node " + nodeId + ", but the calibration query is no longer in progress."); return; } int mote = -1; int i = 0; for (Iterator it = calibNodes.iterator(); it.hasNext(); i++) { if (((Integer)it.next()).intValue() == nodeId) { mote = i; break; } } PreparedStatement ps = null; if (mote < 0) { System.out.println("already got calibration for node " + nodeId); return; } else { System.out.println("got new calibration for node " + nodeId); calibNodes.remove(mote); if (calibNodes.isEmpty()) { TinyDBMain.network.abortQuery(calibTinyDBQuery); try { ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL"); ps.setInt(1, CALIBRATION_QUERY_ID); ps.executeUpdate(); ps.close(); } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } e.printStackTrace(); } calibQueryInProgress = false; } } try { ps = dbConn.prepareStatement("UPDATE task_mote_info SET calib = ? WHERE mote_id = ?"); ps.setBytes(1, calib); ps.setInt(2, nodeId); ps.executeUpdate(); ps.close(); } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } e.printStackTrace(); } } /** Stop a currently running query */ private boolean stopRunningQuery(TinyDBQuery q, int qid) { PreparedStatement ps = null; TinyDBMain.network.abortQuery(q); try { ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL"); ps.setInt(1, qid); ps.executeUpdate(); ps.close(); return true; } catch (SQLException e) { try { ps.close(); } catch (Exception e1) { } e.printStackTrace(); return false; } } private void fetchRunningQueries(Vector qids) { short qid1 = -1, qid2 = -1; if (qids.size() == 1) qid1 = ((Integer)qids.firstElement()).shortValue(); else if (qids.size() >= 2) { qid1 = ((Integer)qids.firstElement()).shortValue(); qid2 = ((Integer)qids.elementAt(1)).shortValue(); if (qids.size() > 2) System.out.println("more than two queries running in sensor network."); } System.out.println("feching running queries " + qid1 + " and " + qid2); if (qid1 != -1) { PreparedStatement ps = null; try { String sensorQueryStr = null, healthQueryStr = null, queryType; byte sensorTinyDBQid = 0, healthTinyDBQid = 0; int sensorQueryId = TASKQuery.INVALID_QUERYID, healthQueryId = TASKQuery.INVALID_QUERYID; String sensorTableName = null, healthTableName = null; ps = dbConn.prepareStatement("SELECT q.query_text, q.query_type, q.query_id, q.table_name FROM task_query_log q, task_query_time_log t WHERE q.tinydb_qid = ? AND q.query_id = t.query_id AND t.stop_time IS NULL;"); ps.setInt(1, qid1); ResultSet rs = ps.executeQuery(); if (rs.next()) { queryType = rs.getString(2); if (queryType.equalsIgnoreCase("health")) { healthQueryStr = rs.getString(1); healthTinyDBQid = (byte)qid1; healthQueryId = rs.getInt(3); healthTableName = rs.getString(4); } else if (queryType.equalsIgnoreCase("sensor")) { sensorQueryStr = rs.getString(1); sensorTinyDBQid = (byte)qid1; sensorQueryId = rs.getInt(3); sensorTableName = rs.getString(4); System.out.println("fetched sensor query " + sensorQueryId + ": " + sensorQueryStr); } } rs.close(); if (qid2 != -1) { ps.setInt(1, qid2); rs = ps.executeQuery(); if (rs.next()) { queryType = rs.getString(2); if (queryType.equalsIgnoreCase("health")) { healthQueryStr = rs.getString(1); healthTinyDBQid = (byte)qid2; healthQueryId = rs.getInt(3); healthTableName = rs.getString(4); } else { sensorQueryStr = rs.getString(1); sensorTinyDBQid = (byte)qid2; sensorQueryId = rs.getInt(3); sensorTableName = rs.getString(4); System.out.println("fetched sensor query " + sensorQueryId + ": " + sensorQueryStr); } } rs.close(); } ps.close(); ps = null; if (sensorQueryStr != null) { sensorTinyDBQuery = SensorQueryer.translateQuery(sensorQueryStr, sensorTinyDBQid); sensorQuery = new TASKQuery(sensorTinyDBQuery, sensorQueryId, sensorTableName); TinyDBMain.notifyAddedQuery(sensorTinyDBQuery); TinyDBMain.network.addResultListener(this, true, sensorTinyDBQid); } if (healthQueryStr != null) { healthTinyDBQuery = SensorQueryer.translateQuery(healthQueryStr, healthTinyDBQid); healthQuery = new TASKQuery(healthTinyDBQuery, healthQueryId, healthTableName); TinyDBMain.notifyAddedQuery(healthTinyDBQuery); TinyDBMain.network.addResultListener(this, true, healthTinyDBQid); } } catch (Exception e) { if (ps != null) { try { ps.close(); } catch (Exception e1) { } } e.printStackTrace(); } } } public TASKServer() { calibQueryInProgress = false; } private static void initDBConn() { try { urlPSQL = "jdbc:postgresql://" + Config.getParam("postgres-host") + "/" + Config.getParam("postgres-db"); dbUser = Config.getParam("postgres-user"); dbPwd = Config.getParam("postgres-passwd"); System.out.println("urlPSQL = " + urlPSQL + ", dbUser = " + dbUser + ", dbPwd = " + dbPwd); Class.forName ( "org.postgresql.Driver" ); dbConn = DriverManager.getConnection(urlPSQL, dbUser, dbPwd); dbStmt = dbConn.createStatement(); dbStmt.setQueryTimeout(5); if (TinyDBMain.debug) System.out.println("connected to " + urlPSQL); } catch (Exception ex) { System.out.println("failed to connect to Postgres!\n"); ex.printStackTrace(); } if (TinyDBMain.debug) System.out.println("Connected to Postgres!\n"); } private static void prefetchMetaData() { attributeInfos = new Vector(); commandInfos = new Vector(); aggregateInfos = new Vector(); try { ResultSet rs = dbStmt.executeQuery("SELECT name, typeid, power_cons, description FROM task_attributes"); TASKAttributeInfo attrInfo; while (rs.next()) { attrInfo = new TASKAttributeInfo(rs.getString(1), rs.getInt(2), rs.getInt(3), rs.getString(4)); attributeInfos.add(attrInfo); } rs.close(); rs = dbStmt.executeQuery("SELECT name, return_type, num_args, arg_types, description FROM task_commands"); TASKCommandInfo cmdInfo; while (rs.next()) { cmdInfo = new TASKCommandInfo(rs.getString(1), rs.getInt(2), getIntArray(rs.getString(4)), rs.getString(5)); commandInfos.add(cmdInfo); } rs.close(); rs = dbStmt.executeQuery("SELECT name, return_type, num_args, arg_type, description FROM task_aggregates"); TASKAggInfo aggInfo; while (rs.next()) { aggInfo = new TASKAggInfo(rs.getString(1), rs.getInt(2), rs.getInt(3) - 1, rs.getInt(4), rs.getString(5)); aggregateInfos.add(aggInfo); } rs.close(); } catch (Exception e) { e.printStackTrace(); } } /** * convert a string representation of a PostgreSQL array * to an int[] */ public static int[] getIntArray(String pgArrayStr) { Vector v = new Vector(); int curPos = 1; int endPos; int[] ret; while ((endPos = pgArrayStr.indexOf(',', curPos)) != -1) { v.add(Integer.valueOf(pgArrayStr.substring(curPos, endPos))); for (curPos = endPos + 1; pgArrayStr.charAt(curPos) == ' '; curPos++); } String intStr = pgArrayStr.substring(curPos, pgArrayStr.length() - 1); if (intStr.length() > 0) v.add(Integer.valueOf(intStr)); ret = new int[v.size()]; for (int i = 0; i < v.size(); i++) ret[i] = ((Integer)v.elementAt(i)).intValue(); return ret; } private int nextCommandId() { int commandId = -1; PreparedStatement ps = null; try { dbConn.setAutoCommit(false); ResultSet rs = dbStmt.executeQuery("SELECT command_id FROM task_next_query_id"); if (rs.next()) { commandId = rs.getInt(1); } rs.close(); ps = dbConn.prepareStatement("UPDATE task_next_query_id SET command_id = command_id + 1"); ps.executeUpdate(); ps.close(); dbConn.commit(); dbConn.setAutoCommit(true); } catch (Exception e) { try { dbConn.setAutoCommit(true); ps.close(); } catch (Exception e1) { } e.printStackTrace(); } return commandId; } private QueryIds nextQueryId() { int queryId = TASKQuery.INVALID_QUERYID; QueryIds qids = null; short tinyDBQid = -1; PreparedStatement ps = null; try { dbConn.setAutoCommit(false); ResultSet rs = dbStmt.executeQuery("SELECT query_id, tinydb_qid FROM task_next_query_id"); if (rs.next()) { queryId = rs.getInt(1); tinyDBQid = rs.getShort(2); } rs.close(); qids = new QueryIds(queryId, (byte)tinyDBQid); tinyDBQid = (short)((tinyDBQid + 1) & (short)0x7F); // truncate to 1 byte ps = dbConn.prepareStatement("UPDATE task_next_query_id SET query_id = query_id + 1, tinydb_qid = ?"); ps.setShort(1, tinyDBQid); ps.executeUpdate(); ps.close(); dbConn.commit(); dbConn.setAutoCommit(true); } catch (Exception e) { try { dbConn.setAutoCommit(true); ps.close(); } catch (Exception e1) { } e.printStackTrace(); } return qids; } public static int getServerPort() { return serverPort; } public TASKQuery getTASKQuery(short whichQuery) { if (whichQuery == SENSOR_QUERY) return sensorQuery; else return healthQuery; } public int runTASKQuery(TASKQuery query, short whichQuery) throws IOException { String queryStr; TinyDBQuery tinyDBQuery = null; PreparedStatement ps = null; String tableName = null; int queryId = query.getQueryId(); boolean isNewQuery = (queryId == TASKQuery.INVALID_QUERYID); byte tinyDBQid; int error = TASKError.SUCCESS; if (isNewQuery) { QueryIds qids; // XXX should try to unify TASKQuery and TinyDBQuery queryStr = query.toSQL(); System.out.println("TASKServer got new query: " + queryStr); qids = nextQueryId(); queryId = qids.queryId; tinyDBQid = qids.tinyDBQid;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -