⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiplexer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
package org.jgroups.mux;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.*;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;

import java.util.*;

/**
 * Used for dispatching incoming messages. The Multiplexer implements UpHandler and registers with the associated
 * JChannel (there can only be 1 Multiplexer per JChannel). When up() is called with a message, the header of the
 * message is removed and the MuxChannel corresponding to the header's service ID is retrieved from the map,
 * and MuxChannel.up() is called with the message.
 * @author Bela Ban
 * @version $Id: Multiplexer.java,v 1.35 2006/10/30 12:27:55 belaban Exp $
 */
public class Multiplexer implements UpHandler {
    /** Map<String,MuxChannel>. Maintains the mapping between service IDs and their associated MuxChannels */
    private final Map services=new HashMap();
    private final JChannel channel;
    static final Log log=LogFactory.getLog(Multiplexer.class);
    static final String SEPARATOR="::";
    static final short SEPARATOR_LEN=(short)SEPARATOR.length();
    static final String NAME="MUX";
    private final BlockOkCollector block_ok_collector=new BlockOkCollector();

    private MergeView temp_merge_view=null;

    private boolean flush_present=true;
    private boolean blocked=false;



    /** Cluster view */
    View view=null;

    Address local_addr=null;

    /** Map<String,Boolean>. Map of service IDs and booleans that determine whether getState() has already been called */
    private final Map state_transfer_listeners=new HashMap();

    /** Map<String,List<Address>>. A map of services as keys and lists of hosts as values */
    private final Map service_state=new HashMap();

    /** Used to wait on service state information */
    private final Promise service_state_promise=new Promise();

    /** Map<Address, Set<String>>. Keys are senders, values are a set of services hosted by that sender.
     * Used to collect responses to LIST_SERVICES_REQ */
    private final Map service_responses=new HashMap();

    private long SERVICES_RSP_TIMEOUT=10000;




    public Multiplexer() {
        this.channel=null;
        flush_present=isFlushPresent();
    }

    public Multiplexer(JChannel channel) {
        this.channel=channel;
        this.channel.setUpHandler(this);
        this.channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
        flush_present=isFlushPresent();
    }

    /**
     * @deprecated Use ${link #getServiceIds()} instead
     * @return The set of service IDs
     */
    public Set getApplicationIds() {
        return services != null? services.keySet() : null;
    }

    public Set getServiceIds() {
        return services != null? services.keySet() : null;
    }


    public long getServicesResponseTimeout() {
        return SERVICES_RSP_TIMEOUT;
    }

    public void setServicesResponseTimeout(long services_rsp_timeout) {
        this.SERVICES_RSP_TIMEOUT=services_rsp_timeout;
    }

    /** Returns a copy of the current view <em>minus</em> the nodes on which service service_id is <em>not</em> running
     *
     * @param service_id
     * @return The service view
     */
    public View getServiceView(String service_id) {
        List hosts=(List)service_state.get(service_id);
        if(hosts == null) return null;
        return generateServiceView(hosts);
    }

    public boolean stateTransferListenersPresent() {
        return state_transfer_listeners != null && state_transfer_listeners.size() > 0;
    }

    /**
     * Called by a MuxChannel when BLOCK_OK is sent down
     */
    public void blockOk() {
        block_ok_collector.increment();
    }

    public synchronized void registerForStateTransfer(String appl_id, String substate_id) {
        String key=appl_id;
        if(substate_id != null && substate_id.length() > 0)
            key+=SEPARATOR + substate_id;
        state_transfer_listeners.put(key, Boolean.FALSE);
    }

    public synchronized boolean getState(Address target, String id, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
        if(state_transfer_listeners == null)
            return false;
        Map.Entry entry;
        String key;
        for(Iterator it=state_transfer_listeners.entrySet().iterator(); it.hasNext();) {
            entry=(Map.Entry)it.next();
            key=(String)entry.getKey();
            int index=key.indexOf(SEPARATOR);
            boolean match;
            if(index > -1) {
                String tmp=key.substring(0, index);
                match=id.equals(tmp);
            }
            else {
                match=id.equals(key);
            }
            if(match) {
                entry.setValue(Boolean.TRUE);
                break;
            }
        }

        Collection values=state_transfer_listeners.values();
        boolean all_true=Util.all(values, Boolean.TRUE);
        if(!all_true)
            return true; // pseudo

        boolean rc=false;
        try {
            startFlush();
            Set keys=new HashSet(state_transfer_listeners.keySet());
            rc=fetchServiceStates(target, keys, timeout);
            state_transfer_listeners.clear();
        }
        finally {
            stopFlush();
        }
        return rc;
    }

    /** Fetches the app states for all service IDs in keys.
     * The keys are a duplicate list, so it cannot be modified by the caller of this method
     * @param keys
     */
    private boolean fetchServiceStates(Address target, Set keys, long timeout) throws ChannelClosedException, ChannelNotConnectedException {
        boolean rc, all_rcs=true;
        String appl_id;
        for(Iterator it=keys.iterator(); it.hasNext();) {
            appl_id=(String)it.next();
            rc=channel.getState(target, appl_id, timeout);
            if(rc == false)
                all_rcs=false;
        }
        return all_rcs;
    }


    /**
     * Fetches the map of services and hosts from the coordinator (Multiplexer). No-op if we are the coordinator
     */
    public void fetchServiceInformation() throws Exception {
        while(true) {
            Address coord=getCoordinator(), local_address=channel != null? channel.getLocalAddress() : null;
            boolean is_coord=coord != null && local_address != null && local_address.equals(coord);
            if(is_coord) {
                if(log.isTraceEnabled())
                    log.trace("I'm coordinator, will not fetch service state information");
                break;
            }

            ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_REQ, null, null, null);
            MuxHeader hdr=new MuxHeader(si);
            Message state_req=new Message(coord, null, null);
            state_req.putHeader(NAME, hdr);
            service_state_promise.reset();
            channel.send(state_req);

            try {
                byte[] state=(byte[])service_state_promise.getResultWithTimeout(2000);
                if(state != null) {
                    Map new_state=(Map)Util.objectFromByteBuffer(state);
                    synchronized(service_state) {
                        service_state.clear();
                        service_state.putAll(new_state);
                    }
                    if(log.isTraceEnabled())
                        log.trace("service state was set successfully (" + service_state.size() + " entries)");
                }
                else {
                    if(log.isWarnEnabled())
                        log.warn("received service state was null");
                }
                break;
            }
            catch(TimeoutException e) {
                if(log.isTraceEnabled())
                    log.trace("timed out waiting for service state from " + coord + ", retrying");
            }
        }
    }



    public void sendServiceUpMessage(String service, Address host) throws Exception {
        sendServiceMessage(ServiceInfo.SERVICE_UP, service, host);
        if(local_addr != null && host != null && local_addr.equals(host))
            handleServiceUp(service, host, false);
    }


    public void sendServiceDownMessage(String service, Address host) throws Exception {
        sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host);
        if(local_addr != null && host != null && local_addr.equals(host))
            handleServiceDown(service, host, false);
    }





    public void up(Event evt) {
        // remove header and dispatch to correct MuxChannel
        MuxHeader hdr;

        switch(evt.getType()) {
            case Event.MSG:
                Message msg=(Message)evt.getArg();
                hdr=(MuxHeader)msg.getHeader(NAME);
                if(hdr == null) {
                    log.error("MuxHeader not present - discarding message " + msg);
                    return;
                }

                if(hdr.info != null) { // it is a service state request - not a default multiplex request
                    try {
                        handleServiceStateRequest(hdr.info, msg.getSrc());
                    }
                    catch(Exception e) {
                        if(log.isErrorEnabled())
                            log.error("failure in handling service state request", e);
                    }
                    break;
                }

                MuxChannel mux_ch=(MuxChannel)services.get(hdr.id);
                if(mux_ch == null) {
                    log.warn("service " + hdr.id + " not currently running, discarding messgage " + msg);
                    return;
                }
                mux_ch.up(evt);
                break;

            case Event.VIEW_CHANGE:
                Vector old_members=view != null? view.getMembers() : null;
                view=(View)evt.getArg();
                Vector new_members=view != null? view.getMembers() : null;
                Vector left_members=Util.determineLeftMembers(old_members, new_members);

                if(view instanceof MergeView) {
                    temp_merge_view=(MergeView)view.clone();
                    if(log.isTraceEnabled())
                        log.trace("received a MergeView: " + temp_merge_view + ", adjusting the service view");
                    if(!flush_present && temp_merge_view != null) {
                        try {
                            if(log.isTraceEnabled())
                                log.trace("calling handleMergeView() from VIEW_CHANGE (flush_present=" + flush_present + ")");
                            Thread merge_handler=new Thread() {
                                public void run() {
                                    try {
                                        handleMergeView(temp_merge_view);
                                    }
                                    catch(Exception e) {
                                        if(log.isErrorEnabled())
                                            log.error("problems handling merge view", e);
                                    }
                                }
                            };
                            merge_handler.setName("merge handler view_change");
                            merge_handler.setDaemon(false);
                            merge_handler.start();
                        }
                        catch(Exception e) {
                            if(log.isErrorEnabled())
                                log.error("failed handling merge view", e);
                        }
                    }
                    else {
                        ; // don't do anything because we are blocked sending messages anyway
                    }
                }
                else { // regular view
                    synchronized(service_responses) {
                        service_responses.clear();
                    }
                }
                if(left_members.size() > 0)
                    adjustServiceViews(left_members);
                break;

            case Event.SUSPECT:
                Address suspected_mbr=(Address)evt.getArg();

                synchronized(service_responses) {
                    service_responses.put(suspected_mbr, null);
                    service_responses.notifyAll();
                }
                passToAllMuxChannels(evt);
                break;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -