pullpushadapter.java

来自「JGRoups源码」· Java 代码 · 共 480 行 · 第 1/2 页

JAVA
480
字号
                else if(obj instanceof SetStateEvent) {                    SetStateEvent evt=(SetStateEvent)obj;                    String state_id=evt.getStateId();                    if(listener != null) {                        try {                            if(listener instanceof ExtendedMessageListener && state_id!=null) {                                ((ExtendedMessageListener)listener).setState(state_id, evt.getArg());                            }                            else {                                listener.setState(evt.getArg());                            }                        }                        catch(ClassCastException cast_ex) {                            if(log.isErrorEnabled()) log.error("received SetStateEvent, but argument " +                                    ((SetStateEvent)obj).getArg() + " is not serializable ! Discarding message.");                        }                    }                }                else if(obj instanceof StreamingGetStateEvent) {                	StreamingGetStateEvent evt=(StreamingGetStateEvent)obj;                	if(listener instanceof ExtendedMessageListener)                	{                		if(evt.getStateId()==null)                		{                			((ExtendedMessageListener)listener).getState(evt.getArg());	                		}                		                		else                		{                			((ExtendedMessageListener)listener).getState(evt.getStateId(),evt.getArg());                		}                	}                }                else if(obj instanceof StreamingSetStateEvent) {                	StreamingSetStateEvent evt=(StreamingSetStateEvent)obj;                	if(listener instanceof ExtendedMessageListener)                	{                		if(evt.getStateId()==null)                		{                			((ExtendedMessageListener)listener).setState(evt.getArg());                		}                		else                			                		{                			((ExtendedMessageListener)listener).setState(evt.getStateId(),evt.getArg());                		}                	}                }                else if(obj instanceof View) {                    notifyViewChange((View)obj);                }                else if(obj instanceof SuspectEvent) {                    notifySuspect((Address)((SuspectEvent)obj).getMember());                }                else if(obj instanceof BlockEvent) {                    notifyBlock();                    if(transport instanceof Channel) {                       ((Channel)transport).blockOk();                    }                }                else if(obj instanceof UnblockEvent) {                   notifyUnblock();                }            }            catch(ChannelNotConnectedException conn) {                Address local_addr=((Channel)transport).getLocalAddress();                if(log.isTraceEnabled()) log.trace('[' + (local_addr == null ? "<null>" : local_addr.toString()) +                        "] channel not connected, exception is " + conn);                Util.sleep(1000);                receiver_thread=null;                break;            }            catch(ChannelClosedException closed_ex) {                Address local_addr=((Channel)transport).getLocalAddress();                if(log.isTraceEnabled()) log.trace('[' + (local_addr == null ? "<null>" : local_addr.toString()) +                        "] channel closed, exception is " + closed_ex);                // Util.sleep(1000);                receiver_thread=null;                break;            }            catch(Throwable e) {            }        }    }    /**     * Check whether the message has an identifier. If yes, lookup the MessageListener associated with the     * given identifier in the hashtable and dispatch to it. Otherwise just use the main (default) message     * listener     */    protected void handleMessage(Message msg) {        PullHeader hdr=(PullHeader)msg.getHeader(PULL_HEADER);        Serializable identifier;        MessageListener l;        if(hdr != null && (identifier=hdr.getIdentifier()) != null) {            l=(MessageListener)listeners.get(identifier);            if(l == null) {                if(log.isErrorEnabled()) log.error("received a messages tagged with identifier=" +                        identifier + ", but there is no registration for that identifier. Will drop message");            }            else                l.receive(msg);        }        else {            if(listener != null)                listener.receive(msg);        }    }    protected void notifyViewChange(View v) {        MembershipListener l;        if(v == null) return;        for(Iterator it=membership_listeners.iterator(); it.hasNext();) {            l=(MembershipListener)it.next();            try {                l.viewAccepted(v);            }            catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);            }        }    }    protected void notifySuspect(Address suspected_mbr) {        MembershipListener l;        if(suspected_mbr == null) return;        for(Iterator it=membership_listeners.iterator(); it.hasNext();) {            l=(MembershipListener)it.next();            try {                l.suspect(suspected_mbr);            }            catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);            }        }    }    protected void notifyBlock() {        MembershipListener l;        for(Iterator it=membership_listeners.iterator(); it.hasNext();) {            l=(MembershipListener)it.next();            try {                l.block();            }            catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);            }        }    }        protected void notifyUnblock() {       MembershipListener l;       for(Iterator it=membership_listeners.iterator(); it.hasNext();) {           l=(MembershipListener)it.next();           if(l instanceof ExtendedMembershipListener){                           try {                 ((ExtendedMembershipListener)l).unblock();              }              catch(Throwable ex) {                  if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);              }           }       }   }    public void channelConnected(Channel channel) {        if(log.isTraceEnabled())            log.trace("channel is connected");    }    public void channelDisconnected(Channel channel) {        if(log.isTraceEnabled())            log.trace("channel is disconnected");    }    public void channelClosed(Channel channel) {    }    public void channelShunned() {        if(log.isTraceEnabled())            log.trace("channel is shunned");    }    public void channelReconnected(Address addr) {        start();    }    public static final class PullHeader extends Header {        Serializable identifier=null;        public PullHeader() {            ; // used by externalization        }        public PullHeader(Serializable identifier) {            this.identifier=identifier;        }        public Serializable getIdentifier() {            return identifier;        }        public long size() {            if(identifier == null)                return 12;            else                return 64;        }        public String toString() {            return "PullHeader";        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeObject(identifier);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            identifier=(Serializable)in.readObject();        }    }	/**	 * @return Returns the listener.	 */	public MessageListener getListener() {		return listener;	}}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?