📄 multiplexer.java
字号:
package org.jgroups.mux;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.*;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;
import java.util.*;
/**
* Used for dispatching incoming messages. The Multiplexer implements UpHandler and registers with the associated
* JChannel (there can only be 1 Multiplexer per JChannel). When up() is called with a message, the header of the
* message is removed and the MuxChannel corresponding to the header's service ID is retrieved from the map,
* and MuxChannel.up() is called with the message.
* @author Bela Ban
* @version $Id: Multiplexer.java,v 1.35 2006/10/30 12:27:55 belaban Exp $
*/
public class Multiplexer implements UpHandler {
/** Map<String,MuxChannel>. Maintains the mapping between service IDs and their associated MuxChannels */
private final Map services=new HashMap();
private final JChannel channel;
static final Log log=LogFactory.getLog(Multiplexer.class);
static final String SEPARATOR="::";
static final short SEPARATOR_LEN=(short)SEPARATOR.length();
static final String NAME="MUX";
private final BlockOkCollector block_ok_collector=new BlockOkCollector();
private MergeView temp_merge_view=null;
private boolean flush_present=true;
private boolean blocked=false;
/** Cluster view */
View view=null;
Address local_addr=null;
/** Map<String,Boolean>. Map of service IDs and booleans that determine whether getState() has already been called */
private final Map state_transfer_listeners=new HashMap();
/** Map<String,List<Address>>. A map of services as keys and lists of hosts as values */
private final Map service_state=new HashMap();
/** Used to wait on service state information */
private final Promise service_state_promise=new Promise();
/** Map<Address, Set<String>>. Keys are senders, values are a set of services hosted by that sender.
* Used to collect responses to LIST_SERVICES_REQ */
private final Map service_responses=new HashMap();
private long SERVICES_RSP_TIMEOUT=10000;
public Multiplexer() {
this.channel=null;
flush_present=isFlushPresent();
}
public Multiplexer(JChannel channel) {
this.channel=channel;
this.channel.setUpHandler(this);
this.channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
flush_present=isFlushPresent();
}
/**
* @deprecated Use ${link #getServiceIds()} instead
* @return The set of service IDs
*/
public Set getApplicationIds() {
return services != null? services.keySet() : null;
}
public Set getServiceIds() {
return services != null? services.keySet() : null;
}
public long getServicesResponseTimeout() {
return SERVICES_RSP_TIMEOUT;
}
public void setServicesResponseTimeout(long services_rsp_timeout) {
this.SERVICES_RSP_TIMEOUT=services_rsp_timeout;
}
/** Returns a copy of the current view <em>minus</em> the nodes on which service service_id is <em>not</em> running
*
* @param service_id
* @return The service view
*/
public View getServiceView(String service_id) {
List hosts=(List)service_state.get(service_id);
if(hosts == null) return null;
return generateServiceView(hosts);
}
public boolean stateTransferListenersPresent() {
return state_transfer_listeners != null && state_transfer_listeners.size() > 0;
}
/**
* Called by a MuxChannel when BLOCK_OK is sent down
*/
public void blockOk() {
block_ok_collector.increment();
}
public synchronized void registerForStateTransfer(String appl_id, String substate_id) {
String key=appl_id;
if(substate_id != null && substate_id.length() > 0)
key+=SEPARATOR + substate_id;
state_transfer_listeners.put(key, Boolean.FALSE);
}
public synchronized boolean getState(Address target, String id, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
if(state_transfer_listeners == null)
return false;
Map.Entry entry;
String key;
for(Iterator it=state_transfer_listeners.entrySet().iterator(); it.hasNext();) {
entry=(Map.Entry)it.next();
key=(String)entry.getKey();
int index=key.indexOf(SEPARATOR);
boolean match;
if(index > -1) {
String tmp=key.substring(0, index);
match=id.equals(tmp);
}
else {
match=id.equals(key);
}
if(match) {
entry.setValue(Boolean.TRUE);
break;
}
}
Collection values=state_transfer_listeners.values();
boolean all_true=Util.all(values, Boolean.TRUE);
if(!all_true)
return true; // pseudo
boolean rc=false;
try {
startFlush();
Set keys=new HashSet(state_transfer_listeners.keySet());
rc=fetchServiceStates(target, keys, timeout);
state_transfer_listeners.clear();
}
finally {
stopFlush();
}
return rc;
}
/** Fetches the app states for all service IDs in keys.
* The keys are a duplicate list, so it cannot be modified by the caller of this method
* @param keys
*/
private boolean fetchServiceStates(Address target, Set keys, long timeout) throws ChannelClosedException, ChannelNotConnectedException {
boolean rc, all_rcs=true;
String appl_id;
for(Iterator it=keys.iterator(); it.hasNext();) {
appl_id=(String)it.next();
rc=channel.getState(target, appl_id, timeout);
if(rc == false)
all_rcs=false;
}
return all_rcs;
}
/**
* Fetches the map of services and hosts from the coordinator (Multiplexer). No-op if we are the coordinator
*/
public void fetchServiceInformation() throws Exception {
while(true) {
Address coord=getCoordinator(), local_address=channel != null? channel.getLocalAddress() : null;
boolean is_coord=coord != null && local_address != null && local_address.equals(coord);
if(is_coord) {
if(log.isTraceEnabled())
log.trace("I'm coordinator, will not fetch service state information");
break;
}
ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_REQ, null, null, null);
MuxHeader hdr=new MuxHeader(si);
Message state_req=new Message(coord, null, null);
state_req.putHeader(NAME, hdr);
service_state_promise.reset();
channel.send(state_req);
try {
byte[] state=(byte[])service_state_promise.getResultWithTimeout(2000);
if(state != null) {
Map new_state=(Map)Util.objectFromByteBuffer(state);
synchronized(service_state) {
service_state.clear();
service_state.putAll(new_state);
}
if(log.isTraceEnabled())
log.trace("service state was set successfully (" + service_state.size() + " entries)");
}
else {
if(log.isWarnEnabled())
log.warn("received service state was null");
}
break;
}
catch(TimeoutException e) {
if(log.isTraceEnabled())
log.trace("timed out waiting for service state from " + coord + ", retrying");
}
}
}
public void sendServiceUpMessage(String service, Address host) throws Exception {
sendServiceMessage(ServiceInfo.SERVICE_UP, service, host);
if(local_addr != null && host != null && local_addr.equals(host))
handleServiceUp(service, host, false);
}
public void sendServiceDownMessage(String service, Address host) throws Exception {
sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host);
if(local_addr != null && host != null && local_addr.equals(host))
handleServiceDown(service, host, false);
}
public void up(Event evt) {
// remove header and dispatch to correct MuxChannel
MuxHeader hdr;
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
hdr=(MuxHeader)msg.getHeader(NAME);
if(hdr == null) {
log.error("MuxHeader not present - discarding message " + msg);
return;
}
if(hdr.info != null) { // it is a service state request - not a default multiplex request
try {
handleServiceStateRequest(hdr.info, msg.getSrc());
}
catch(Exception e) {
if(log.isErrorEnabled())
log.error("failure in handling service state request", e);
}
break;
}
MuxChannel mux_ch=(MuxChannel)services.get(hdr.id);
if(mux_ch == null) {
log.warn("service " + hdr.id + " not currently running, discarding messgage " + msg);
return;
}
mux_ch.up(evt);
break;
case Event.VIEW_CHANGE:
Vector old_members=view != null? view.getMembers() : null;
view=(View)evt.getArg();
Vector new_members=view != null? view.getMembers() : null;
Vector left_members=Util.determineLeftMembers(old_members, new_members);
if(view instanceof MergeView) {
temp_merge_view=(MergeView)view.clone();
if(log.isTraceEnabled())
log.trace("received a MergeView: " + temp_merge_view + ", adjusting the service view");
if(!flush_present && temp_merge_view != null) {
try {
if(log.isTraceEnabled())
log.trace("calling handleMergeView() from VIEW_CHANGE (flush_present=" + flush_present + ")");
Thread merge_handler=new Thread() {
public void run() {
try {
handleMergeView(temp_merge_view);
}
catch(Exception e) {
if(log.isErrorEnabled())
log.error("problems handling merge view", e);
}
}
};
merge_handler.setName("merge handler view_change");
merge_handler.setDaemon(false);
merge_handler.start();
}
catch(Exception e) {
if(log.isErrorEnabled())
log.error("failed handling merge view", e);
}
}
else {
; // don't do anything because we are blocked sending messages anyway
}
}
else { // regular view
synchronized(service_responses) {
service_responses.clear();
}
}
if(left_members.size() > 0)
adjustServiceViews(left_members);
break;
case Event.SUSPECT:
Address suspected_mbr=(Address)evt.getArg();
synchronized(service_responses) {
service_responses.put(suspected_mbr, null);
service_responses.notifyAll();
}
passToAllMuxChannels(evt);
break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -