📄 beeptransport.java
字号:
} catch (ClassNotFoundException e) {
throw new Exception("Class " + classname + " not found");
} catch (ClassCastException e) {
throw new Exception("class " + classname + " does not " +
"implement the org.beepcore.beep.profile.Profile interface");
}
p.SetEndpoint( endpoint );
reg.addStartChannelListener( JxtaBeepProfile.JXTA_URI,
p.init( JxtaBeepProfile.JXTA_URI, profileConfig ), null );
}
catch( Exception caught ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "BeepTransport.init failure", caught );
return;
}
(new IncomingBeepSeesionDaemon( this )).start();
(new BeepSeesionIdleOutDaemon( this )).start();
}
/**
* Creates and return an EndpointMessanger.
*
* @param dst the destination address for this messenger
* @param peer peer address for the destination.
*/
public EndpointMessenger getMessenger( EndpointAddress dst)
throws IOException {
if( !dst.getProtocolName().equals( getProtocolName() ) ) {
throw new IllegalArgumentException( "Not a BEEP EndpointService address!" );
}
String dstAddr = dst.getProtocolAddress();
BeepSession dstSession = null;
// See if we have a session for this dest or need to make one.
synchronized( activeSessions ) {
if( activeSessions.containsKey( dstAddr ) ) {
dstSession = (BeepSession) activeSessions.get( dstAddr );
dstSession.updateLastUsed();
}
else {
dstSession = new BeepSession( this, dst );
activeSessions.put( dstAddr, dstSession );
}
}
return new BeepNonBlockingMessenger( dstSession );
}
/**
* Propagates a TransportMessage on this TransportProtocol
*
* @param message the TransportMessage to be propagated
* @param pName the name of a service
* @param pParams parameters for this message.
* @param prunePeer ????
* @exception IOException thrown if the message could not be sent for some reason.
*/
public synchronized void propagate(Message message,
String pName,
String pParams,
String prunePeer) throws IOException {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("propagate not supported");
}
/**
* Return the protocol name we support
**/
public String getProtocolName() {
return getSupportedProtocolName();
}
/**
* Return the protocol name we support
**/
public static String getSupportedProtocolName() {
return "BEEP";
}
/**
* closes this TransportEndpoint.
*/
public void close() {
synchronized( activeSessions ) {
for( Iterator eachActive = activeSessions.values().iterator();
eachActive.hasNext(); ) {
BeepSession session = (BeepSession) eachActive.next();
session.close();
session = null;
eachActive.remove();
}
}
activeSessions = null;
}
/**
* returns who we are
*
* @return EndpointAddress Our public address
**/
public EndpointAddress getPublicAddress() {
return ourAddress;
}
/**
* Returns true if the endpoint protocol can be used by the EndpointRouter
*
* @return boolean true if the protocol can be used by the EndpointRouter
*/
public boolean allowRouting() {
return true;
}
/**
* Returns true if this is a connected Transport, vs. a polled mailbox type
* @return boolean true if connected, otherwise false.
**/
public boolean isConnectionOriented() {
return true;
}
/**
* Returns true if the address can be reached.
*
* @return boolean true if endpoint can be reached, otherwise false.
**/
public boolean ping( EndpointAddress addr ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ping to " + addr.toString());
// FIXME 20010723 bondolo@jxta.org haven't decided if we should
// do this with beep or raw tcp. ICMP ping for connectivity check is
// all we really want.
boolean res = true;
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ping (always) returns " + res);
return res;
}
/**
* A daemon thread which waits for incoming sessions.
**/
private class IncomingBeepSeesionDaemon extends Thread {
/**
* The transport we are receiving sessions for.
**/
private BeepTransport ourMaster;
IncomingBeepSeesionDaemon( BeepTransport ourMaster ) {
super( "BeepTransport Incoming Session Daemon : " + ourMaster.toString() );
this.ourMaster = ourMaster;
}
/**
* Asynchronously wait for session start requests.
**/
public void run() {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info( "IncomingBeepSeesionDaemon.run starting listener - " +
ourMaster.usingInterface.getHostAddress() + ":" + ourMaster.useTcpPort );
while (true) {
try {
// Loop listening for new Sessions
Session session =
AutomatedTCPSessionCreator.listen( ourMaster.usingInterface,
ourMaster.useTcpPort,
ourMaster.reg);
synchronized( ourMaster.activeSessions ) {
BeepSession newSession = new BeepSession( ourMaster, session );
ourMaster.activeSessions.put( newSession.getDestEndpoint(), newSession );
}
} catch (BEEPException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Listener exception : " + e.getMessage(), e);
} catch (Throwable e) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable, Listener " +
Thread.currentThread().getName() + " exiting", e);
break;
}
}
}
}
/**
* A daemon thread which monitors active sessions and closes them when they
* become idle.
**/
private class BeepSeesionIdleOutDaemon extends Thread {
/**
* How often we poll for idle sessions in seconds.
**/
private final static int POLLINGINTERVAL = 10;
/**
* How long a Session needs to be idle before being closed in seconds.
**/
private final static int IDLEINTERVAL = 2 * 60;
private BeepTransport ourMaster;
BeepSeesionIdleOutDaemon( BeepTransport ourMaster ) {
super( "BeepTransport Session Idle Closer Daemon : " + ourMaster.toString() );
this.ourMaster = ourMaster;
}
/**
* Asynchronously Close Idle Sessions
**/
public void run() {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info( "BeepSeesionIdleOutDaemon.run" );
do {
try {
try {
sleep( POLLINGINTERVAL * 1000 );
} catch ( InterruptedException woken ) {
Thread.currentThread().interrupted();
// XXX 20010725 bondolo@jxta.org should establish
// terminating conditions here.
}
if (LOG.isEnabledFor(Priority.INFO)) LOG.info( "BeepSeesionIdleOutDaemon polling for idle session" );
synchronized( ourMaster.activeSessions ) {
for( Iterator eachSession =
ourMaster.activeSessions.values().iterator();
eachSession.hasNext(); ) {
BeepSession aSession = (BeepSession) eachSession.next();
// check if the session is idle.
if( aSession.getLastUsed() + (IDLEINTERVAL * 1000) <=
System.currentTimeMillis() ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Closing session with " + aSession.getDestEndpoint() );
eachSession.remove();
aSession.close();
aSession = null;
}
}
}
} catch (Throwable e) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable, Listener " +
Thread.currentThread().getName() + " exiting", e);
break;
}
}
while (true);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -