⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 notificationbus.java

📁 JGRoups源码
💻 JAVA
字号:
// $Id: NotificationBus.java,v 1.11 2006/05/25 12:10:18 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Promise;import org.jgroups.util.Util;import java.io.Serializable;import java.util.Vector;/** * This class provides notification sending and handling capability.  * Producers can send notifications to all registered consumers. * Provides hooks to implement shared group state, which allows an * application programmer to maintain a local cache which is replicated  * by all instances. NotificationBus sits on  * top of a channel, however it creates its channel itself, so the  * application programmers do not have to provide their own channel.  * * @author Bela Ban */public class NotificationBus implements Receiver {    final Vector members=new Vector();    Channel channel=null;    Address local_addr=null;    Consumer consumer=null; // only a single consumer allowed    String bus_name="notification_bus";    final Promise get_cache_promise=new Promise();    final Object cache_mutex=new Object();    protected final Log log=LogFactory.getLog(getClass());    String props=null;    public interface Consumer {        void handleNotification(Serializable n);        /** Called on the coordinator to obtains its cache */        Serializable getCache();        void memberJoined(Address mbr);        void memberLeft(Address mbr);    }    public NotificationBus() throws Exception {        this((Channel)null, null);    }    public NotificationBus(String bus_name) throws Exception {        this(bus_name, null);    }    public NotificationBus(String bus_name, String properties) throws Exception {        if(bus_name != null) this.bus_name=bus_name;        if(properties != null) props=properties;        channel=new JChannel(props);        channel.setReceiver(this);    }    public NotificationBus(Channel channel, String bus_name) throws Exception {        if(bus_name != null) this.bus_name=bus_name;        this.channel=channel;        channel.setReceiver(this);    }    public void setConsumer(Consumer c) {        consumer=c;    }    public Address getLocalAddress() {        if(local_addr != null) return local_addr;        if(channel != null)            local_addr=channel.getLocalAddress();        return local_addr;    }    /**     * Returns a reference to the real membership: don't modify.      * If you need to modify, make a copy first !     * @return Vector of Address objects     */    public Vector getMembership() {        return members;    }    /**      * Answers the Channel.     * Used to operate on the underlying channel directly, e.g. perform operations that are not     * provided using only NotificationBus. Should be used sparingly.     * @return underlying Channel     */    public Channel getChannel() {        return channel;    }    public boolean isCoordinator() {        Object first_mbr=null;        synchronized(members) {            first_mbr=members.size() > 0 ? members.elementAt(0) : null;            if(first_mbr == null)                return true;        }        if(getLocalAddress() != null)            return getLocalAddress().equals(first_mbr);        return false;    }    public void start() throws Exception {        channel.connect(bus_name);    }    public void stop() {        if(channel != null) {            channel.close();  // disconnects from channel and closes it            channel=null;        }    }    /** Pack the argument in a Info, serialize that one into the message buffer and send the message */    public void sendNotification(Serializable n) {        sendNotification(null, n);    }        /** Pack the argument in a Info, serialize that one into the message buffer and send the message */    public void sendNotification(Address dest, Serializable n) {        Message msg=null;        byte[] data=null;        Info info;        try {            if(n == null) return;            info=new Info(Info.NOTIFICATION, n);            data=Util.objectToByteBuffer(info);            msg=new Message(dest, null, data);            if(channel == null) {                if(log.isErrorEnabled()) log.error("channel is null. Won't send notification");                return;            }            channel.send(msg);        }        catch(Throwable ex) {            if(log.isErrorEnabled()) log.error("error sending notification", ex);        }    }    /**     Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),     null will be returned. Used only internally by NotificationBus.     @param timeout Max number of msecs until the call returns     @param max_tries Max number of attempts to fetch the cache from the coordinator     */    public Serializable getCacheFromCoordinator(long timeout, int max_tries) {        return getCacheFromMember(null, timeout, max_tries);    }    /**     Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),     null will be returned. Used only internally by NotificationBus.     @param mbr The address of the member from which to fetch the state. If null, the current coordinator     will be asked for the state     @param timeout Max number of msecs until the call returns - if timeout elapses     null will be returned     @param max_tries Max number of attempts to fetch the cache from the coordinator (will be set to 1 if < 1)     */    public Serializable getCacheFromMember(Address mbr, long timeout, int max_tries) {        Serializable cache=null;        int num_tries=0;        Info info=new Info(Info.GET_CACHE_REQ);        Message msg;        Address dst=mbr;  // member from which to fetch the cache        long start, stop; // +++ remove        if(max_tries < 1) max_tries=1;        get_cache_promise.reset();        while(num_tries <= max_tries) {            if(mbr == null) {  // mbr == null means get cache from coordinator                dst=determineCoordinator();                if(dst == null || dst.equals(getLocalAddress())) { // we are the first member --> empty cache                    if(log.isInfoEnabled()) log.info("[" + getLocalAddress() +                                                     "] no coordinator found --> first member (cache is empty)");                    return null;                }            }            // +++ remove            if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] dst=" + dst +                                             ", timeout=" + timeout + ", max_tries=" + max_tries + ", num_tries=" + num_tries);            info=new Info(Info.GET_CACHE_REQ);            msg=new Message(dst, null, info);            channel.down(new Event(Event.MSG, msg));            start=System.currentTimeMillis();            cache=(Serializable) get_cache_promise.getResult(timeout);            stop=System.currentTimeMillis();            if(cache != null) {                if(log.isInfoEnabled()) log.info("got cache from " +                                                 dst + ": cache is valid (waited " + (stop - start) + " msecs on get_cache_promise)");                return cache;            }            else {                if(log.isErrorEnabled()) log.error("received null cache; retrying (waited " +                                                   (stop - start) + " msecs on get_cache_promise)");            }            Util.sleep(500);            ++num_tries;        }        if(cache == null)            if(log.isErrorEnabled()) log.error("[" + getLocalAddress() +                                               "] cache is null (num_tries=" + num_tries + ')');        return cache;    }    /**     Don't multicast this to all members, just apply it to local consumers.     */    public void notifyConsumer(Serializable n) {        if(consumer != null && n != null)            consumer.handleNotification(n);    }    /* -------------------------------- Interface MessageListener -------------------------------- */    public void receive(Message msg) {        Info info=null;        Object obj;        if(msg == null || msg.getLength() == 0) return;        try {            obj=msg.getObject();            if(!(obj instanceof Info)) {                    if(log.isErrorEnabled()) log.error("expected an instance of Info (received " +                                                             obj.getClass().getName() + ')');                return;            }            info=(Info) obj;            switch(info.type) {                case Info.NOTIFICATION:                    notifyConsumer(info.data);                    break;                case Info.GET_CACHE_REQ:                    handleCacheRequest(msg.getSrc());                    break;                case Info.GET_CACHE_RSP:                    // +++ remove                    if(log.isDebugEnabled()) log.debug("[GET_CACHE_RSP] cache was received from " + msg.getSrc());                    get_cache_promise.setResult(info.data);                    break;                default:                    if(log.isErrorEnabled()) log.error("type " + info.type + " unknown");                    break;            }        }        catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("exception=" + ex);        }    }    public byte[] getState() {        return null;    }    public void setState(byte[] state) {    }    /* ----------------------------- End of Interface MessageListener ---------------------------- */    /* ------------------------------- Interface MembershipListener ------------------------------ */    public synchronized void viewAccepted(View new_view) {        Vector joined_mbrs, left_mbrs, tmp;        Object tmp_mbr;        if(new_view == null) return;        tmp=new_view.getMembers();        synchronized(members) {            // get new members            joined_mbrs=new Vector();            for(int i=0; i < tmp.size(); i++) {                tmp_mbr=tmp.elementAt(i);                if(!members.contains(tmp_mbr))                    joined_mbrs.addElement(tmp_mbr);            }            // get members that left            left_mbrs=new Vector();            for(int i=0; i < members.size(); i++) {                tmp_mbr=members.elementAt(i);                if(!tmp.contains(tmp_mbr))                    left_mbrs.addElement(tmp_mbr);            }            // adjust our own membership            members.removeAllElements();            members.addAll(tmp);        }        if(consumer != null) {            if(joined_mbrs.size() > 0)                for(int i=0; i < joined_mbrs.size(); i++)                    consumer.memberJoined((Address) joined_mbrs.elementAt(i));            if(left_mbrs.size() > 0)                for(int i=0; i < left_mbrs.size(); i++)                    consumer.memberLeft((Address) left_mbrs.elementAt(i));        }    }    public void suspect(Address suspected_mbr) {    }    public void block() {    }    /* ----------------------------- End of Interface MembershipListener ------------------------- */    /* ------------------------------------- Private Methods ------------------------------------- */    Address determineCoordinator() {        Vector v=channel != null ? channel.getView().getMembers() : null;        return v != null ? (Address) v.elementAt(0) : null;    }    void handleCacheRequest(Address sender) {        Serializable cache=null;        Message msg;        Info info;        if(sender == null) {            // +++ remove            //            if(log.isErrorEnabled()) log.error("sender is null");            return;        }        synchronized(cache_mutex) {            cache=getCache(); // get the cache from the consumer            info=new Info(Info.GET_CACHE_RSP, cache);            msg=new Message(sender, null, info);            if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] returning cache to " + sender);            channel.down(new Event(Event.MSG, msg));        }    }    public Serializable getCache() {        return consumer != null ? consumer.getCache() : null;    }    /* --------------------------------- End of Private Methods ---------------------------------- */    private static class Info implements Serializable {        public final static int NOTIFICATION=1;        public final static int GET_CACHE_REQ=2;        public final static int GET_CACHE_RSP=3;        int type=0;        Serializable data=null;  // if type == NOTIFICATION data is notification, if type == GET_CACHE_RSP, data is cache        private static final long serialVersionUID = -7198723001828406107L;        public Info(int type) {            this.type=type;        }        public Info(int type, Serializable data) {            this.type=type;            this.data=data;        }        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append("type= ");            if(type == NOTIFICATION)                sb.append("NOTIFICATION");            else if(type == GET_CACHE_REQ)                sb.append("GET_CACHE_REQ");            else if(type == GET_CACHE_RSP)                sb.append("GET_CACHE_RSP");            else                sb.append("<unknown>");            if(data != null) {                if(type == NOTIFICATION)                    sb.append(", notification=" + data);                else if(type == GET_CACHE_RSP) sb.append(", cache=" + data);            }            return sb.toString();        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -