📄 blockingmessenger.java
字号:
setStateLock(stateMachine); } /** * {@inheritDoc} */ public int getState() { return BlockingMessenger.this.getState(); } /** * {@inheritDoc} */ public void resolve() { BlockingMessenger.this.resolve(); } /** * {@inheritDoc} */ public void close() { BlockingMessenger.this.close(); } /** * {@inheritDoc} * * <p/> * Address rewriting done here. */ public boolean sendMessageN(Message msg, String service, String serviceParam) { return BlockingMessenger.this.sendMessageN(msg, effectiveService(service), effectiveParam(service, serviceParam)); } /** * {@inheritDoc} * * <p/> * Address rewriting done here. */ public void sendMessageB(Message msg, String service, String serviceParam) throws IOException { BlockingMessenger.this.sendMessageB(msg, effectiveService(service), effectiveParam(service, serviceParam)); } /** * {@inheritDoc} * * <p/> * We're supposed to return the complete destination, including * service and param specific to that channel. It is not clear, whether * this should include the cross-group mangling, though. For now, let's * say it does not. */ public EndpointAddress getLogicalDestinationAddress() { EndpointAddress rawLogical = getLogicalDestinationImpl(); if (rawLogical == null) { return null; } return new EndpointAddress(rawLogical, origService, origServiceParam); } // Check if it is worth staying registered public void itemChanged(Object changedObject) { if (!notifyChange()) { if (haveListeners()) { return; } BlockingMessenger.this.unregisterListener(this); if (!haveListeners()) { return; } // Ooops collision. We should not have unregistered. Next time, then. In case of collision, the end result // is always to stay registered. There's no harm in staying registered. BlockingMessenger.this.registerListener(this); } } /** * {@inheritDoc} * <p/> * Always make sure we're registered with the shared messenger. */ @Override protected void registerListener(SimpleSelectable l) { BlockingMessenger.this.registerListener(this); super.registerListener(l); } } private void storeCurrent(Message msg, String service, String param) { currentMessage = msg; currentService = service; currentParam = param; currentThrowable = null; } /** * Constructor. * <p/> * We start in the CONNECTED state, we pretend to have a queue of size 1, and we can never re-connect. Although this * messenger fully respects the asynchronous semantics, it is saturated as soon as one msg is being send, and if not * saturated, send is actually performed by the invoker thread. So return is not completely immediate. This is a barely * acceptable implementation, but this is also a transition adapter that is bound to disappear one release from now. The main * goal is to get things going until transports are adapted. * * @param homeGroupID the group that this messenger works for. This is the group of the endpoint service or transport * that created this messenger. * @param dest where messages should be addressed to * @param selfDestruct true if this messenger must self close destruct when idle. <b>Warning:</b> If selfDestruct is used, * this messenger will remained referenced for as long as isIdleImpl returns false. */ public BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct) { super(dest); this.homeGroupID = homeGroupID; // We tell our superclass that we synchronize our state on the stateMachine object. Logic would dictate that we pass it // to super(), but it is not itself constructed until super() returns. No way around it. setStateLock(stateMachine); /* * Sets up a timer task that will close this messenger if it says to have become idle. It will keep it referenced * until then. * <p/> * As long as this timer task is scheduled, this messenger is not subject to GC. Therefore, its owner, if any, which is strongly * referenced, is not subject to GC either. This avoids prematurely closing open connections just because a destination is * not currently in use, which we would have to do if CanonicalMessengers could be GCed independantly (and which would * force us to use finalizers, too).<p/> * * Such a mechanism is usefull only if this blocking messenger is expensive to make or holds system resources that require * an explicit invocation of the close method. Else, it is better to let it be GC'ed along with any refering canonical * messenger when memory is tight.<p/> * */ // // FIXME 20040413 jice : we trust transports to implement isIdle reasonably, which may be a leap of faith. We // should probably superimpose a time limit of our own. // if (selfDestruct) { selfDestructTask = new TimerTask() { /** * {@inheritDoc} */ @Override public void run() { try { if (isIdleImpl()) { close(); } else { return; } } catch (Throwable uncaught) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in selfDescructTask. ", uncaught); } } cancel(); } }; timer.schedule(selfDestructTask, TimeUtils.AMINUTE, TimeUtils.AMINUTE); } else { selfDestructTask = null; } } /** * Sets an owner for this blocking messenger. Owners are normally canonical messengers. The goal of registering the owner is * to keep that owner reachable as long as this blocking messenger is. Canonical messengers are otherwise softly referenced, * and so, may be deleted whenever memory is tight. * <p/> * We do not want to use finalizers or the equivalent reference queue mechanism; so we have no idea when a blocking messenger * is no-longer referenced by any canonical. In addition it may be expensive to make and so we want to keep it for a while * anyway. As a result, instead of keeping a blocking messenger open as long as there is a canonical, we do the opposite: we * keep the canonical (owner, here) as long as the blocking messenger is open (and usually beyond, memory allowing). How long * a blocking messenger will stay around depends upon that messenger's implementation. That may even be left up to the GC, in * the end (if close is not needed AND the messenger is cheap to make). In that case, the owner is likely the only referrer, * and so both will have the same lifetime. * * @param owner The object that should be kept referenced at least as long as this one. */ public void setOwner(Object owner) { this.owner = owner; } /** * Assemble a destination address for a message based upon the messenger * default destination address and the optional provided overrides. * * @param service The destination service or {@code null} to use default. * @param serviceParam The destination service parameter or {@code null} to * use default. */ protected EndpointAddress getDestAddressToUse(String service, String serviceParam) { EndpointAddress defaultAddress = getDestinationAddress(); EndpointAddress result; if(null == service) { if(null == serviceParam) { // Use default service name and service params result = defaultAddress; } else { // use default service name, override service params result = new EndpointAddress(defaultAddress, defaultAddress.getServiceName(), serviceParam); } } else { if(null == serviceParam) { // override service name, use default service params (this one is pretty weird and probably not useful) result = new EndpointAddress(defaultAddress, service, defaultAddress.getServiceParameter()); } else { // override service name, override service params result = new EndpointAddress(defaultAddress, service, serviceParam); } } return result; } /** * A transport may call this to cause an orderly closure of its messengers. */ protected final void shutdown() { DeferredAction action; synchronized (stateMachine) { stateMachine.shutdownEvent(); action = eventCalled(); } // We called an event. State may have changed. notifyChange(); performDeferredAction(action); } /** * {@inheritDoc} * <p/> * We overload isClosed because many messengers still use super.isClosed() * as part of their own implementation or don't override it at all. They * expect it to be true only when all is shutdown; not while we're closing * gently. * * FIXME - jice@jxta.org 20040413: transports should get a deeper retrofit eventually. */ @Override public boolean isClosed() { return (!lieToOldTransports) && super.isClosed(); } /** * {@inheritDoc} * <p/> * getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end). * For a blocking messenger it's easy. We're born resolved. So, just ask the implementor what it is. */ public final EndpointAddress getLogicalDestinationAddress() { return getLogicalDestinationImpl(); } /** * {@inheritDoc} * * <p/> Some transports historically overload the close method of BlockingMessenger. * The final is there to make sure we know about it. However, there should be no * harm done if the unchanged code is renamed to closeImpl; even if it calls super.close(). * The real problem, however, is transports calling close (was their own, but now it means this one), when * they want to break. It will make things look like someone just called close, but it will not * actually break anything. However, that will cause the state machine to go through the close process. * this will end up calling closeImpl(). That will do. */ public final void close() { DeferredAction action; synchronized (stateMachine) { stateMachine.closeEvent(); action = eventCalled(); } // We called an event. State may have changed. notifyChange();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -