📄 agentmanagerimp.java
字号:
} } public void monitorAgent( Handle han, InetAddress host, AgentObserver obs ) throws RequestFailedException { try { _monitorAgent( han, Request.commServerAt( host ), obs ); } catch ( InterruptedException exc ) { throw new RequestFailedException( Request.icmMonitorAgent, exc ); } catch ( java.io.IOException exc ) { throw new RequestFailedException( Request.icmMonitorAgent, exc ); } } void _monitorAgent( Handle target, Handle cs, AgentObserver obs ) throws java.io.IOException, InterruptedException, RequestFailedException { // ICM bug workaround: Monitoring agent must be registered with the cs // to avoid segfaults at agent registration if ( !self_.commServers().contains( cs ) ) self_.register( cs.getHome() ); // make the request and add a reactor rule to self for streaming monitor events Handle id = EngineUtilities.makeIdentifier( self_.getHandle() ); MessagePattern ptrn = RequestPattern .createListPattern( cs, Request.icmMonitorAgent, EngineUtilities.makeIdentifierTest( id ) ); self_.send( Request.createMonitorRequest( id, target, cs ) ); List resp = (List)self_.recv( ptrn ).getContent(); if ( Request.icmFailed.equals( resp.get( 1 ) ) ) throw new RequestFailedException( Request.icmLookupLocation, "Refused by comm server" ); // ok, set-up the reaction rule self_.setProperty( id + ".observer", obs ); self_.setProperty( obs + ".monitor-id", id ); Agent.MessageHandler hdlr = new MonitorHandler(); self_.setProperty( id + ".handler", hdlr ); self_.registerHandler( ptrn, hdlr ); } public void unmonitorAgent( Handle han, AgentObserver obs ) throws RequestFailedException { try { _unmonitorAgent( han, Request.commServer, obs ); } catch ( InterruptedException exc ) { throw new RequestFailedException( Request.icmMonitorAgent, exc ); } catch ( java.io.IOException exc ) { throw new RequestFailedException( Request.icmMonitorAgent, exc ); } } public void unmonitorAgent( Handle han, InetAddress host, AgentObserver obs ) throws RequestFailedException { try { _unmonitorAgent( han, Request.commServerAt( host ), obs ); } catch ( InterruptedException exc ) { throw new RequestFailedException( Request.icmMonitorAgent, exc ); } catch ( java.io.IOException exc ) { throw new RequestFailedException( Request.icmMonitorAgent, exc ); } } void _unmonitorAgent( Handle target, Handle cs, AgentObserver obs ) throws java.io.IOException, InterruptedException, RequestFailedException { Handle id = (Handle)self_.getProperty( obs + ".monitor-id" ); if ( id == null ) throw new RequestFailedException( Request.icmUnMonitorAgent, "No such monitor" ); MessagePattern ptrn = RequestPattern .createListPattern( cs, Request.icmUnMonitorAgent, EngineUtilities.makeIdentifierTest( id ) ); self_.send( Request.createUnMonitorRequest( id, target, cs ) ); List resp = (List)self_.recv( ptrn ).getContent(); if ( Request.icmFailed.equals( resp.get( 1 ) ) ) throw new RequestFailedException( Request.icmLookupLocation, "Refused by comm server" ); // remove the reaction rule self_.unregisterHandler( (Agent.MessageHandler)self_ .getProperty( id + ".handler" ) ); // clean-up properties self_.clearProperty( id + ".observer" ); self_.clearProperty( obs + ".id" ); self_.clearProperty( id + ".handler" ); } public synchronized void onError( Thread thread, Throwable t ) { AgentImp owner = (AgentImp)thread_to_agent_.get( thread.getThreadGroup() ); if ( owner != null && registered_agents_.containsKey( owner ) ) { t.printStackTrace(); System.err.println( "Critical condition at " + thread ); System.err.println( "Owner Agent: " + owner.getHandle() ); System.err.println( "Killing Agent..." ); try { owner.destroy(); } catch ( Exception exc ) { exc.printStackTrace(); _destroyAgent( owner ); } } // else agent is probably already dead? } public synchronized void onException( Thread thread, Throwable t ) { Agent owner = (Agent)thread_to_agent_.get( thread.getThreadGroup() ); if ( owner != null ) { // agent is still alive // print the stack trace of the exception t.printStackTrace(); System.err.println( "Non critical exception at " + thread ); System.err.println( "Owner Agent: " + owner.getHandle() ); } } public synchronized void onCriticalException( Thread thread, Throwable t ) { onError( thread, t ); } public synchronized void onDestroying( AgentImp ag ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.DISPOSING, null ) ); } public synchronized void onDestroyed( AgentImp ag ) { AgentObserver obs = (AgentObserver)agent_observers_.get( ag ); _destroyAgent( ag ); obs.onAgentEvent( new AgentEvent( ag, AgentEvent.DISPOSED, null ) ); } public synchronized void onRegistering( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.REGISTERING, at ) ); } public synchronized void onRegistered( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.REGISTERED, at ) ); } public synchronized void onDeregistering( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.DEREGISTERING, at ) ); } public synchronized void onDeregistered( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.DEREGISTERED, at ) ); } public synchronized void onDetaching( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.DETACHING, at ) ); } public synchronized void onDetached( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.DETACHED, at ) ); } public synchronized void onAttaching( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.ATTACHING, at ) ); } public synchronized void onAttached( AgentImp ag, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.ATTACHED, at ) ); } public synchronized void onFwding( AgentImp ag, Handle to, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.FORWARDING, new Object[]{to, at} ) ); } public synchronized void onFwded( AgentImp ag, Handle to, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.FORWARDED, new Object[]{to, at} ) ); } public synchronized void onUnFwding( AgentImp ag, Handle to, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.CANCELING_FWD, new Object[]{to, at} ) ); } public synchronized void onUnFwded( AgentImp ag, Handle to, InetAddress at ) { ((AgentObserver)agent_observers_.get( ag )) .onAgentEvent( new AgentEvent( ag, AgentEvent.CANCELED_FWD, new Object[]{to, at} ) ); } // Lifetime stuff public void load( java.util.Properties prop ) { try { // init objects tg_ = new AgentThreadGroup( this ); thread_to_agent_ = new HashMap(); agent_observers_ = new HashMap(); registered_agents_ = new HashMap(); router_ = MessageRouter.instance; // start self-agent self_ = createAgent( prop.getProperty( "agent.name" ) + "-" + hashCode(), new jacomma.platform.core.AgentObserverAdapter() ); } catch ( RequestFailedException exc ) { throw new CheckedException( exc ); } } public synchronized void stopEngine() { // We really need to do a security check here, but I don't bother // for the moment // make a safe copy first List ags = new ArrayList( registered_agents_.keySet() ); ags.remove( self_ ); for ( Iterator it = ags.iterator(); it.hasNext(); ) { Agent ag = (Agent)it.next(); ((AgentObserver)agent_observers_.get( ag )).onExit( ag ); } try { self_.destroy(); } catch ( RequestFailedException exc ) { exc.printStackTrace(); _destroyAgent( (AgentImp)self_ ); } } // Internal stuff synchronized AgentImp _createAgent( Handle han, AgentObserver obs ) throws java.io.IOException { Connection conn = Connection.open(); MessageQueue mq = new MessageQueue(); ConnectionBuffer cb = new ConnectionBuffer( conn, mq ); MessageDispatcher mdsp = new MessageDispatcher( mq ); AgentImp ag = new AgentImp( han, mq, router_, mdsp, cb, this ); mdsp.agent_ = ag; agent_observers_.put( ag, obs ); // set routing info and start the threads router_.registerAgent( ag.getHandle(), ag.que_, ag.cb_ ); // Initialize the thread group _startThreads( ag ); registered_agents_.put( ag, ag.getHandle() ); return ag; } synchronized void _destroyAgent( AgentImp ag ) { registered_agents_.remove( ag ); _stopThreads( ag ); try { ag.cb_.conn_.close(); } catch ( Exception exc ) { exc.printStackTrace(); } router_.unregisterAgent( ag.getHandle() ); agent_observers_.remove( ag ); } void _startThreads( AgentImp ag ) { ag.tg_ = new ThreadGroup( tg_, ag.toString() ); thread_to_agent_.put( ag.tg_, ag ); ag.cb_.startReader( tg_ ); ag.dsp_.startDispatcher( tg_ ); } void _stopThreads( AgentImp ag ) { ag.dsp_.stopDispatcher(); ag.cb_.stopReader(); ag.tg_.interrupt(); thread_to_agent_.remove( ag.tg_ ); } public void dumpInternalQueue() throws InterruptedException { self_.try_accept( new DebugVisitor() ); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -