⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 taskserver.java

📁 nesC写的heed算法
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
		}    }	private void handleCalibrationResult(QueryResult qr)	{		Vector fieldValues = qr.getFieldValueObjs();		if (fieldValues.size() < 2)			return;		int nodeId = ((Integer)fieldValues.firstElement()).intValue();		byte[] calib = (byte[])(fieldValues.elementAt(1));		if (!calibQueryInProgress)		{			System.out.println("got calibration result from node " + nodeId + ", but the calibration query is no longer in progress.");			return;		}		int mote = -1;		int i = 0;		for (Iterator it = calibNodes.iterator(); it.hasNext(); i++)		{			if (((Integer)it.next()).intValue() == nodeId)			{				mote = i;				break;			}		}		PreparedStatement ps = null;		if (mote < 0)		{			System.out.println("already got calibration for node " + nodeId);			return;		}		else		{			System.out.println("got new calibration for node " + nodeId);			calibNodes.remove(mote);			if (calibNodes.isEmpty())			{				TinyDBMain.network.abortQuery(calibTinyDBQuery);				try				{					ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL");					ps.setInt(1, CALIBRATION_QUERY_ID);					ps.executeUpdate();					ps.close();				}				catch (SQLException e)				{					try					{						ps.close();					}					catch (Exception e1)					{					}					e.printStackTrace();				}				calibQueryInProgress = false;			}		}		try		{			ps = dbConn.prepareStatement("UPDATE task_mote_info SET calib = ? WHERE mote_id = ?");			ps.setBytes(1, calib);			ps.setInt(2, nodeId);			ps.executeUpdate();			ps.close();		}		catch (SQLException e)		{			try			{				ps.close();			}			catch (Exception e1)			{			}			e.printStackTrace();		}	}    /** Stop a currently running  query */    private boolean stopRunningQuery(TinyDBQuery q, int qid) {	TinyDBMain.network.abortQuery(q);	PreparedStatement ps = null;	try	    {		ps = dbConn.prepareStatement("UPDATE task_query_time_log SET stop_time = now() WHERE query_id = ? AND stop_time IS NULL");		ps.setInt(1, qid);		ps.executeUpdate();		ps.close();		return true;	    }	catch (SQLException e)	    {		try		    {			ps.close();		    }		catch (Exception e1)		    {		    }		e.printStackTrace();		return false;	    }    }    /** Create the server	Currently, only one parameter, "-sim", is understood,	indicating that the server should run in simulation	mode.    */    public static void main(String[] argv) {	TASKServer server = new TASKServer();	boolean sim;	System.out.println("TASK Server online, waiting for connections.");		if (argv.length == 1 && argv[0].equals("-sim")) 	    sim = true;	else	    sim = false;	try {	    TinyDBMain.simulate = sim;	    TinyDBMain.debug = true;	    TinyDBMain.initMain();		TinyDBStatus status = new TinyDBStatus(TinyDBMain.network, TinyDBMain.mif, false);		status.requestStatus(1000, 1);		String serverPortStr = Config.getParam("task-server-port");		if (serverPortStr == null)			serverPort = DEFAULT_SERVER_PORT;		else			serverPort = Integer.valueOf(serverPortStr).intValue();		initDBConn();		prefetchMetaData();		server.fetchRunningQueries(status.getQueryIds());	    server.doServer(serverPort);	} catch (Exception e) {	    System.out.println("SERVER ERROR: " + e);	    e.printStackTrace();	}    }	private void fetchRunningQueries(Vector qids)	{		short qid1 = -1, qid2 = -1;		if (qids.size() == 1)			qid1 = ((Integer)qids.firstElement()).shortValue();		else if (qids.size() >= 2)		{			qid1 = ((Integer)qids.firstElement()).shortValue();			qid2 = ((Integer)qids.elementAt(1)).shortValue();			if (qids.size() > 2)				System.out.println("more than two queries running in sensor network.");		}		System.out.println("feching running queries " + qid1 + " and " + qid2);		if (qid1 != -1)		{			PreparedStatement ps = null;			try			{				String sensorQueryStr = null, healthQueryStr = null, queryType;				byte sensorTinyDBQid = 0, healthTinyDBQid = 0;				int sensorQueryId = TASKQuery.INVALID_QUERYID, healthQueryId = TASKQuery.INVALID_QUERYID;				String sensorTableName = null, healthTableName = null;				ps = dbConn.prepareStatement("SELECT q.query_text, q.query_type, q.query_id, q.table_name FROM task_query_log q, task_query_time_log t WHERE q.tinydb_qid = ? AND q.query_id = t.query_id AND t.stop_time IS NULL;");				ps.setInt(1, qid1);				ResultSet rs = ps.executeQuery();				if (rs.next())				{					queryType = rs.getString(2);					if (queryType.equalsIgnoreCase("health"))					{						healthQueryStr = rs.getString(1);						healthTinyDBQid = (byte)qid1;						healthQueryId = rs.getInt(3);						healthTableName = rs.getString(4);					}					else if (queryType.equalsIgnoreCase("sensor"))					{						sensorQueryStr = rs.getString(1);						sensorTinyDBQid = (byte)qid1;						sensorQueryId = rs.getInt(3);						sensorTableName = rs.getString(4);						System.out.println("fetched sensor query " + sensorQueryId + ": " + sensorQueryStr);					}				}				rs.close();				if (qid2 != -1)				{					ps.setInt(1, qid2);					rs = ps.executeQuery();					if (rs.next())					{						queryType = rs.getString(2);						if (queryType.equalsIgnoreCase("health"))						{							healthQueryStr = rs.getString(1);							healthTinyDBQid = (byte)qid2;							healthQueryId = rs.getInt(3);							healthTableName = rs.getString(4);						}						else						{							sensorQueryStr = rs.getString(1);							sensorTinyDBQid = (byte)qid2;							sensorQueryId = rs.getInt(3);							sensorTableName = rs.getString(4);							System.out.println("fetched sensor query " + sensorQueryId + ": " + sensorQueryStr);						}					}					rs.close();				}				ps.close();				ps = null;				if (sensorQueryStr != null)				{					sensorTinyDBQuery = SensorQueryer.translateQuery(sensorQueryStr, sensorTinyDBQid);					sensorQuery = new TASKQuery(sensorTinyDBQuery, sensorQueryId, sensorTableName);					TinyDBMain.notifyAddedQuery(sensorTinyDBQuery);					TinyDBMain.network.addResultListener(this, true, sensorTinyDBQid);				}				if (healthQueryStr != null)				{					healthTinyDBQuery = SensorQueryer.translateQuery(healthQueryStr, healthTinyDBQid);					healthQuery = new TASKQuery(healthTinyDBQuery, healthQueryId, healthTableName);					TinyDBMain.notifyAddedQuery(healthTinyDBQuery);					TinyDBMain.network.addResultListener(this, true, healthTinyDBQid);				}			}			catch (Exception e)			{				if (ps != null)				{					try					{						ps.close();					}					catch (Exception e1)					{					}				}				e.printStackTrace();			}		}	}	public TASKServer()	{		calibQueryInProgress = false;	}    private static void initDBConn()    {		try 		{			urlPSQL = "jdbc:postgresql://" + Config.getParam("postgres-host") + "/" + Config.getParam("postgres-db");			dbUser = Config.getParam("postgres-user");			dbPwd = Config.getParam("postgres-passwd");			System.out.println("urlPSQL = " + urlPSQL + ", dbUser = " + dbUser + ", dbPwd = " + dbPwd);			Class.forName ( "org.postgresql.Driver" );			dbConn = DriverManager.getConnection(urlPSQL, dbUser, dbPwd);			dbStmt = dbConn.createStatement();			if (TinyDBMain.debug) System.out.println("connected to " + urlPSQL);		} 		catch (Exception ex) 		{			System.out.println("failed to connect to Postgres!\n");			ex.printStackTrace();		}		if (TinyDBMain.debug) System.out.println("Connected to Postgres!\n");    }	private static void prefetchMetaData()	{		attributeInfos = new Vector();		commandInfos = new Vector();		aggregateInfos = new Vector();		try		{			ResultSet rs = dbStmt.executeQuery("SELECT name, typeid, power_cons, description FROM task_attributes");			TASKAttributeInfo attrInfo;			while (rs.next())			{				attrInfo = new TASKAttributeInfo(rs.getString(1),												rs.getInt(2),												rs.getInt(3),												rs.getString(4));				attributeInfos.add(attrInfo);			}			rs.close();			rs = dbStmt.executeQuery("SELECT name, return_type, num_args, arg_types, description FROM task_commands");			TASKCommandInfo cmdInfo;			while (rs.next())			{				cmdInfo = new TASKCommandInfo(rs.getString(1),												rs.getInt(2),												getIntArray(rs.getString(4)),												rs.getString(5));				commandInfos.add(cmdInfo);			}			rs.close();			rs = dbStmt.executeQuery("SELECT name, return_type, num_args, arg_type, description FROM task_aggregates");			TASKAggInfo aggInfo;			while (rs.next())			{				aggInfo = new TASKAggInfo(rs.getString(1),												rs.getInt(2),												rs.getInt(3) - 1,												rs.getInt(4),												rs.getString(5));				aggregateInfos.add(aggInfo);			}			rs.close();		}		catch (Exception e)		{			e.printStackTrace();		}	}	/**	 * convert a string representation of a PostgreSQL array	 * to an int[]	 */	public static int[] getIntArray(String pgArrayStr)	{		Vector v = new Vector();		int curPos = 1;		int endPos;		int[] ret;		while ((endPos = pgArrayStr.indexOf(',', curPos)) != -1)		{			v.add(Integer.valueOf(pgArrayStr.substring(curPos, endPos)));			for (curPos = endPos + 1; pgArrayStr.charAt(curPos) == ' ';				 curPos++);		}		String intStr = pgArrayStr.substring(curPos, pgArrayStr.length() - 1);		if (intStr.length() > 0)			v.add(Integer.valueOf(intStr));		ret = new int[v.size()];		for (int i = 0; i < v.size(); i++)			ret[i] = ((Integer)v.elementAt(i)).intValue();		return ret;	}	private int nextCommandId()	{		int commandId = -1;		PreparedStatement ps = null;		try		{			dbConn.setAutoCommit(false);			ResultSet rs = dbStmt.executeQuery("SELECT command_id FROM task_next_query_id");			if (rs.next())			{				commandId = rs.getInt(1);			}			rs.close();			ps = dbConn.prepareStatement("UPDATE task_next_query_id SET command_id = command_id + 1");			ps.executeUpdate();			ps.close();			dbConn.commit();			dbConn.setAutoCommit(true);		}		catch (Exception e)		{			try			{				dbConn.setAutoCommit(true);				ps.close();			}			catch (Exception e1)			{			}			e.printStackTrace();		}		return commandId;	}	private QueryIds nextQueryId()	{		int queryId = TASKQuery.INVALID_QUERYID;		QueryIds qids = null;		short tinyDBQid = -1;		PreparedStatement ps = null;		try		{			dbConn.setAutoCommit(false);			ResultSet rs = dbStmt.executeQuery("SELECT query_id, tinydb_qid FROM task_next_query_id");			if (rs.next())			{				queryId = rs.getInt(1);				tinyDBQid = rs.getShort(2);			}			rs.close();			qids = new QueryIds(queryId, (byte)tinyDBQid);			tinyDBQid = (short)((tinyDBQid + 1) & (short)0x7F); // truncate to 1 byte			ps = dbConn.prepareStatement("UPDATE task_next_query_id SET query_id = query_id + 1, tinydb_qid = ?");			ps.setShort(1, tinyDBQid);			ps.executeUpdate();			ps.close();			dbConn.commit();			dbConn.setAutoCommit(true);		}		catch (Exception e)		{			try			{				dbConn.setAutoCommit(true);				ps.close();			}			catch (Exception e1)			{			}			e.printStackTrace();		}		return qids;	}	public static int getServerPort()	{		return serverPort;	}    private Hashtable listeners = new Hashtable();	static private String urlPSQL;	static private String dbUser;	static private String dbPwd;	static private Connection dbConn = null;	static private Statement dbStmt = null;	static private int serverPort;	private TASKQuery sensorQuery;	private TinyDBQuery sensorTinyDBQuery;	private TASKQuery healthQuery;	private TinyDBQuery healthTinyDBQuery;	private TinyDBQuery calibTinyDBQuery = null;	private boolean calibQueryInProgress = false;	private Vector calibNodes;	static private Vector attributeInfos;	static private Vector commandInfos;	static private Vector aggregateInfos;}/** Internal class that's responsible for enqueing incoming connections */class SocketLoop implements Runnable {    Vector connections; //note that vector is synchronized, which allows                        //us to get away with all sorts of things that look non-thread safe    ServerSocket ss;	public SocketLoop(ServerSocket ss) 	{		Thread t = new Thread(this);		connections = new Vector();		this.ss = ss;		t.start();	}	public void run() 	{		Socket sock;		int numConnections = 0;		long lastTime = System.currentTimeMillis();		int lastNumConnections = 0;		while (true) 		{			try 			{				lastTime = System.currentTimeMillis();				lastNumConnections = connections.size();				sock = ss.accept();				if (connections.size() > TASKServer.MAX_CLIENT_CONNECTIONS) 				{					System.out.println("Accepting And Closing Connetction");					sock.close();				}				else 				{					System.out.println("Adding new connection");					connections.addElement(sock);				}			} 			catch (java.io.InterruptedIOException e) 			{				//just finished listening			}			catch (Exception e) 			{				System.out.println("Error in accepting connection: " + e);			}		}	}	public Socket getConnection() 	{		Socket sock;		//loop forever		int i=0;		while (connections.size() == 0) 		{			i++;			try 			{				Thread.currentThread().sleep(10);			} 			catch (Exception e) 			{				//who cares?			}			if (i%1000==0) 				System.err.print(".");		}		sock = (Socket)connections.elementAt(0);		connections.removeElementAt(0);		return sock;	}}class QueryIds{	public QueryIds(int queryId, byte tinyDBQid)	{		this.queryId = queryId;		this.tinyDBQid = tinyDBQid;	}	public int		queryId;	public byte		tinyDBQid;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -