📄 multiplexer.java
字号:
service_state_promise.setResult(info.state);
break;
case ServiceInfo.SERVICE_UP:
handleServiceUp(info.service, info.host, true);
break;
case ServiceInfo.SERVICE_DOWN:
handleServiceDown(info.service, info.host, true);
break;
case ServiceInfo.LIST_SERVICES_RSP:
handleServicesRsp(sender, info.state);
break;
default:
if(log.isErrorEnabled())
log.error("service request type " + info.type + " not known");
break;
}
}
private void handleServicesRsp(Address sender, byte[] state) throws Exception {
Object[] keys=(Object[])Util.objectFromByteBuffer(state);
Set s=new HashSet();
for(int i=0; i < keys.length; i++)
s.add(keys[i]);
synchronized(service_responses) {
Set tmp=(Set)service_responses.get(sender);
if(tmp == null)
tmp=new HashSet();
tmp.addAll(s);
service_responses.put(sender, tmp);
if(log.isTraceEnabled())
log.trace("received service response: " + sender + "(" + s.toString() + ")");
service_responses.notifyAll();
}
}
private void handleServiceDown(String service, Address host, boolean received) {
List hosts, hosts_copy;
boolean removed=false;
// discard if we sent this message
if(received && host != null && local_addr != null && local_addr.equals(host)) {
return;
}
synchronized(service_state) {
hosts=(List)service_state.get(service);
if(hosts == null)
return;
removed=hosts.remove(host);
hosts_copy=new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
}
if(removed) {
View service_view=generateServiceView(hosts_copy);
if(service_view != null) {
MuxChannel ch=(MuxChannel)services.get(service);
if(ch != null) {
Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
ch.up(view_evt);
}
else {
if(log.isTraceEnabled())
log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
}
}
}
Address local_address=getLocalAddress();
if(local_address != null && host != null && host.equals(local_address))
unregister(service);
}
private void handleServiceUp(String service, Address host, boolean received) {
List hosts, hosts_copy;
boolean added=false;
// discard if we sent this message
if(received && host != null && local_addr != null && local_addr.equals(host)) {
return;
}
synchronized(service_state) {
hosts=(List)service_state.get(service);
if(hosts == null) {
hosts=new ArrayList();
service_state.put(service, hosts);
}
if(!hosts.contains(host)) {
hosts.add(host);
added=true;
}
hosts_copy=new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
}
if(added) {
View service_view=generateServiceView(hosts_copy);
if(service_view != null) {
MuxChannel ch=(MuxChannel)services.get(service);
if(ch != null) {
Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
ch.up(view_evt);
}
else {
if(log.isTraceEnabled())
log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
}
}
}
}
/**
* Fetches the service states from everyone else in the cluster. Once all states have been received and inserted into
* service_state, compute a service view (a copy of MergeView) for each service and pass it up
* @param view
*/
private void handleMergeView(MergeView view) throws Exception {
long time_to_wait=SERVICES_RSP_TIMEOUT, start;
int num_members=view.size(); // include myself
Map copy=null;
sendServiceState();
synchronized(service_responses) {
start=System.currentTimeMillis();
try {
while(time_to_wait > 0 && numResponses(service_responses) < num_members) {
// System.out.println("time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
// ", num_members=" + num_members + ", service_state=" + service_state);
service_responses.wait(time_to_wait);
time_to_wait-=System.currentTimeMillis() - start;
}
// System.out.println("wait terminated: time_to_wait=" + time_to_wait + ", numResponses(service_responses)=" + numResponses(service_responses) +
// ", num_members=" + num_members + ", service_state=" + service_state);
copy=new HashMap(service_responses);
}
catch(Exception ex) {
if(log.isErrorEnabled())
log.error("failed fetching a list of services from other members in the cluster, cannot handle merge view " + view, ex);
}
}
if(log.isTraceEnabled())
log.trace("merging service state, my service_state: " + service_state + ", received responses: " + copy);
// merges service_responses with service_state and emits MergeViews for the services affected (MuxChannel)
mergeServiceState(view, copy);
service_responses.clear();
temp_merge_view=null;
}
private int numResponses(Map m) {
int num=0;
Collection values=m.values();
for(Iterator it=values.iterator(); it.hasNext();) {
if(it.next() != null)
num++;
}
return num;
}
private void mergeServiceState(MergeView view, Map copy) {
Set modified_services=new HashSet();
Map.Entry entry;
Address host; // address of the sender
Set service_list; // Set<String> of services
List my_services;
String service;
synchronized(service_state) {
for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
entry=(Map.Entry)it.next();
host=(Address)entry.getKey();
service_list=(Set)entry.getValue();
if(service_list == null)
continue;
for(Iterator it2=service_list.iterator(); it2.hasNext();) {
service=(String)it2.next();
my_services=(List)service_state.get(service);
if(my_services == null) {
my_services=new ArrayList();
service_state.put(service, my_services);
}
boolean was_modified=my_services.add(host);
if(was_modified) {
modified_services.add(service);
}
}
}
}
// now emit MergeViews for all services which were modified
for(Iterator it=modified_services.iterator(); it.hasNext();) {
service=(String)it.next();
MuxChannel ch=(MuxChannel)services.get(service);
my_services=(List)service_state.get(service);
MergeView v=(MergeView)view.clone();
v.getMembers().retainAll(my_services);
Event evt=new Event(Event.VIEW_CHANGE, v);
ch.up(evt);
}
}
private void adjustServiceViews(Vector left_members) {
if(left_members != null)
for(int i=0; i < left_members.size(); i++) {
try {
adjustServiceView((Address)left_members.elementAt(i));
}
catch(Throwable t) {
if(log.isErrorEnabled())
log.error("failed adjusting service views", t);
}
}
}
private void adjustServiceView(Address host) {
Map.Entry entry;
List hosts, hosts_copy;
String service;
boolean removed=false;
synchronized(service_state) {
for(Iterator it=service_state.entrySet().iterator(); it.hasNext();) {
entry=(Map.Entry)it.next();
service=(String)entry.getKey();
hosts=(List)entry.getValue();
if(hosts == null)
continue;
removed=hosts.remove(host);
hosts_copy=new ArrayList(hosts); // make a copy so we don't modify hosts in generateServiceView()
if(removed) {
View service_view=generateServiceView(hosts_copy);
if(service_view != null) {
MuxChannel ch=(MuxChannel)services.get(service);
if(ch != null) {
Event view_evt=new Event(Event.VIEW_CHANGE, service_view);
ch.up(view_evt);
}
else {
if(log.isTraceEnabled())
log.trace("service " + service + " not found, cannot dispatch service view " + service_view);
}
}
}
Address local_address=getLocalAddress();
if(local_address != null && host != null && host.equals(local_address))
unregister(service);
}
}
}
/**
* Create a copy of view which contains only members which are present in hosts. Call viewAccepted() on the MuxChannel
* which corresponds with service. If no members are removed or added from/to view, this is a no-op.
* @param hosts List<Address>
* @return the servicd view (a modified copy of the real view), or null if the view was not modified
*/
private View generateServiceView(List hosts) {
View copy=new View(view.getVid(), new Vector(view.getMembers()));
Vector members=copy.getMembers();
members.retainAll(hosts);
return copy;
}
/** Tell the underlying channel to start the flush protocol, this will be handled by FLUSH */
private void startFlush() {
channel.down(new Event(Event.SUSPEND));
}
/** Tell the underlying channel to stop the flush, and resume message sending. This will be handled by FLUSH */
private void stopFlush() {
channel.down(new Event(Event.RESUME));
}
public void addServiceIfNotPresent(String id, MuxChannel ch) {
MuxChannel tmp;
synchronized(services) {
tmp=(MuxChannel)services.get(id);
if(tmp == null) {
services.put(id, ch);
}
}
}
private static class BlockOkCollector {
int num_block_oks=0;
synchronized void reset() {
num_block_oks=0;
}
synchronized void increment() {
num_block_oks++;
}
synchronized void waitUntil(int num) {
while(num_block_oks < num) {
try {
this.wait();
}
catch(InterruptedException e) {
}
}
}
public String toString() {
return String.valueOf(num_block_oks);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -