pullpushadapter.java
来自「JGRoups源码」· Java 代码 · 共 480 行 · 第 1/2 页
JAVA
480 行
else if(obj instanceof SetStateEvent) { SetStateEvent evt=(SetStateEvent)obj; String state_id=evt.getStateId(); if(listener != null) { try { if(listener instanceof ExtendedMessageListener && state_id!=null) { ((ExtendedMessageListener)listener).setState(state_id, evt.getArg()); } else { listener.setState(evt.getArg()); } } catch(ClassCastException cast_ex) { if(log.isErrorEnabled()) log.error("received SetStateEvent, but argument " + ((SetStateEvent)obj).getArg() + " is not serializable ! Discarding message."); } } } else if(obj instanceof StreamingGetStateEvent) { StreamingGetStateEvent evt=(StreamingGetStateEvent)obj; if(listener instanceof ExtendedMessageListener) { if(evt.getStateId()==null) { ((ExtendedMessageListener)listener).getState(evt.getArg()); } else { ((ExtendedMessageListener)listener).getState(evt.getStateId(),evt.getArg()); } } } else if(obj instanceof StreamingSetStateEvent) { StreamingSetStateEvent evt=(StreamingSetStateEvent)obj; if(listener instanceof ExtendedMessageListener) { if(evt.getStateId()==null) { ((ExtendedMessageListener)listener).setState(evt.getArg()); } else { ((ExtendedMessageListener)listener).setState(evt.getStateId(),evt.getArg()); } } } else if(obj instanceof View) { notifyViewChange((View)obj); } else if(obj instanceof SuspectEvent) { notifySuspect((Address)((SuspectEvent)obj).getMember()); } else if(obj instanceof BlockEvent) { notifyBlock(); if(transport instanceof Channel) { ((Channel)transport).blockOk(); } } else if(obj instanceof UnblockEvent) { notifyUnblock(); } } catch(ChannelNotConnectedException conn) { Address local_addr=((Channel)transport).getLocalAddress(); if(log.isTraceEnabled()) log.trace('[' + (local_addr == null ? "<null>" : local_addr.toString()) + "] channel not connected, exception is " + conn); Util.sleep(1000); receiver_thread=null; break; } catch(ChannelClosedException closed_ex) { Address local_addr=((Channel)transport).getLocalAddress(); if(log.isTraceEnabled()) log.trace('[' + (local_addr == null ? "<null>" : local_addr.toString()) + "] channel closed, exception is " + closed_ex); // Util.sleep(1000); receiver_thread=null; break; } catch(Throwable e) { } } } /** * Check whether the message has an identifier. If yes, lookup the MessageListener associated with the * given identifier in the hashtable and dispatch to it. Otherwise just use the main (default) message * listener */ protected void handleMessage(Message msg) { PullHeader hdr=(PullHeader)msg.getHeader(PULL_HEADER); Serializable identifier; MessageListener l; if(hdr != null && (identifier=hdr.getIdentifier()) != null) { l=(MessageListener)listeners.get(identifier); if(l == null) { if(log.isErrorEnabled()) log.error("received a messages tagged with identifier=" + identifier + ", but there is no registration for that identifier. Will drop message"); } else l.receive(msg); } else { if(listener != null) listener.receive(msg); } } protected void notifyViewChange(View v) { MembershipListener l; if(v == null) return; for(Iterator it=membership_listeners.iterator(); it.hasNext();) { l=(MembershipListener)it.next(); try { l.viewAccepted(v); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); } } } protected void notifySuspect(Address suspected_mbr) { MembershipListener l; if(suspected_mbr == null) return; for(Iterator it=membership_listeners.iterator(); it.hasNext();) { l=(MembershipListener)it.next(); try { l.suspect(suspected_mbr); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); } } } protected void notifyBlock() { MembershipListener l; for(Iterator it=membership_listeners.iterator(); it.hasNext();) { l=(MembershipListener)it.next(); try { l.block(); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); } } } protected void notifyUnblock() { MembershipListener l; for(Iterator it=membership_listeners.iterator(); it.hasNext();) { l=(MembershipListener)it.next(); if(l instanceof ExtendedMembershipListener){ try { ((ExtendedMembershipListener)l).unblock(); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex); } } } } public void channelConnected(Channel channel) { if(log.isTraceEnabled()) log.trace("channel is connected"); } public void channelDisconnected(Channel channel) { if(log.isTraceEnabled()) log.trace("channel is disconnected"); } public void channelClosed(Channel channel) { } public void channelShunned() { if(log.isTraceEnabled()) log.trace("channel is shunned"); } public void channelReconnected(Address addr) { start(); } public static final class PullHeader extends Header { Serializable identifier=null; public PullHeader() { ; // used by externalization } public PullHeader(Serializable identifier) { this.identifier=identifier; } public Serializable getIdentifier() { return identifier; } public long size() { if(identifier == null) return 12; else return 64; } public String toString() { return "PullHeader"; } public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(identifier); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { identifier=(Serializable)in.readObject(); } } /** * @return Returns the listener. */ public MessageListener getListener() { return listener; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?