partitioner.java

来自「JGRoups源码」· Java 代码 · 共 175 行

JAVA
175
字号
// $Id: PARTITIONER.java,v 1.5 2005/05/30 14:31:07 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Header;import org.jgroups.Message;import org.jgroups.stack.Protocol;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * This layer can be put on top of the bottommost layer and is useful to simulate partitions. * It simply adds a header with its partition number and discards Messages with other partition numbers.<br> * If it receives an Event of type Event.SET_PARTITIONS it sends a Header of type COMMAND with the Hashtable * contained in the Event argument to set the partitions of ALL processes (not just processes of the current view but * every process with the same group address that receives the message. */public class PARTITIONER extends Protocol {    final Vector   members=new Vector();    Address  local_addr=null;    int      my_partition=1;    /** All protocol names have to be unique ! */    public String  getName() {return "PARTITIONER";}    public boolean setProperties(Properties props) {        String     str;        super.setProperties(props);        if(props.size() > 0) {            log.error("EXAMPLE.setProperties(): these properties are not recognized: " + props);            return false;        }        return true;    }    /** Just remove if you don't need to reset any state */    public void reset() {}    /**     * Discards Messages with the wrong partition number and sets local partition number if     * it receives a COMMAND Header     */    public void up(Event evt) {        Message msg;        Integer num;        PartitionerHeader partHead=null;        switch(evt.getType()) {        case Event.SET_LOCAL_ADDRESS:	    local_addr=(Address) evt.getArg();	     if(log.isInfoEnabled()) log.info("local address is " + local_addr);	    break;        case Event.MSG:            msg=(Message)evt.getArg();            partHead=(PartitionerHeader) msg.removeHeader(getName());            if (partHead.type == PartitionerHeader.COMMAND) {		num = (Integer) partHead.Destinations.get(local_addr);		if (num == null) return;		 if(log.isInfoEnabled()) log.info("new partition = " + num);		my_partition =num.intValue();		return;            }            if (partHead.type == PartitionerHeader.NORMAL && partHead.partition != my_partition ) return;            break;        }        passUp(evt);            // Pass up to the layer above us    }    /**     * Adds to Messages a Header with the local partitin number and if receives a SET_PARTITIONS Event sends     * a new Message with a PartitionerHeader set to COMMAND that carries the Hashtable     */    public void down(Event evt) {        Message msg;        Event newEvent;        PartitionerHeader partHeader;        switch(evt.getType()) {        case Event.SET_PARTITIONS:	    //Sends a partitioning message	     if(log.isInfoEnabled()) log.info("SET_PARTITIONS received, argument " + evt.getArg().toString());	    msg = new Message(null,null,null);	    partHeader = new PartitionerHeader(PartitionerHeader.COMMAND);	    partHeader.Destinations = (Hashtable) evt.getArg();	    msg.putHeader(getName(), partHeader);	    passDown(new Event(Event.MSG,msg));	    break;        case Event.MSG:            msg=(Message)evt.getArg();            msg.putHeader(getName(), new PartitionerHeader(PartitionerHeader.NORMAL,my_partition));            // Do something with the event, e.g. add a header to the message            // Optionally pass down            break;        }        passDown(evt);          // Pass on to the layer below us    }/** * The Partitioner header normally (type = NORMAL) contains just the partition number that is checked to discard messages * received from other partitions. * If type is COMMAND Destination contains an Hashtable where keys are of type Address and represent process (channel) * addresses and values are Integer representing the partition that shuold be assigned to each Address. */    public static class PartitionerHeader extends Header {	// your variables	static final int NORMAL=0; //normal header (do nothing)	static final int COMMAND=1; //set partition vector	int type=0,partition=1;	Hashtable Destinations=null;		public PartitionerHeader () {} // used for externalization	public PartitionerHeader (int type) { this.type= type;    }	public PartitionerHeader (int type,int partition) { this.type= type; this.partition = partition;  }		public String toString() {	    switch (type) {	    case NORMAL: return "NORMAL ->partition :" + partition;	    case COMMAND: return "COMMAND ->hashtable :" + Destinations;	    default: return "<unknown>";			    }	}		public void writeExternal(ObjectOutput out) throws IOException {	    out.writeInt(type);	    out.writeInt(partition);	    out.writeObject(Destinations);	}				public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {	    type=in.readInt();	    partition=in.readInt();	    Destinations=(Hashtable)in.readObject();	}}    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?