📄 fc.java.new
字号:
public void up(Event evt) { switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); FcHeader hdr=(FcHeader)msg.removeHeader(name); if(hdr != null) { switch(hdr.type) { case FcHeader.REPLENISH: num_credit_responses_received++; handleCredit(msg.getSrc()); break; case FcHeader.CREDIT_REQUEST: num_credit_requests_received++; Address sender=msg.getSrc(); if(trace) log.trace("received credit request from " + sender + ": sending credits"); received.put(sender, max_credits_constant); sendCredit(sender); break; default: log.error("header type " + hdr.type + " not known"); break; } return; // don't pass message up } else { adjustCredit(msg); } break; case Event.VIEW_CHANGE: handleViewChange(((View)evt.getArg()).getMembers()); break; } passUp(evt); } private void handleDownMessage(Event evt) { Message msg=(Message)evt.getArg(); int length=msg.getLength(); Address dest=msg.getDest(); if(Util.acquire(lock)) { try { if(lowest_credit <= length) { determineCreditors(dest, length); insufficient_credit=true; num_blockings++; start_blocking=System.currentTimeMillis(); while(insufficient_credit && running) { try {mutex.timedwait(max_block_time);} catch(InterruptedException e) {} if(insufficient_credit && running) { Util.release(lock); try { if(trace) log.trace("timeout occurred waiting for credits; sending credit request to " + creditors); for(int i=0; i < creditors.size(); i++) { sendCreditRequest((Address)creditors.get(i)); } } finally { Util.acquire(lock); } } } stop_blocking=System.currentTimeMillis(); long block_time=stop_blocking - start_blocking; if(trace) log.trace("total time blocked: " + block_time + " ms"); total_time_blocking+=block_time; last_blockings.add(new Long(block_time)); } else { long tmp=decrementCredit(sent, dest, length); if(tmp != -1) lowest_credit=Math.min(tmp, lowest_credit); } } finally { Util.release(lock); } } // send message - either after regular processing, or after blocking (when enough credits available again) passDown(evt); } /** * Checks whether one member (unicast msg) or all members (multicast msg) have enough credits. Add those * that don't to the creditors list * @param dest * @param length */ private void determineCreditors(Address dest, int length) { boolean multicast=dest == null || dest.isMulticastAddress(); Address mbr; Long credits; if(multicast) { Map.Entry entry; for(Iterator it=sent.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); mbr=(Address)entry.getKey(); credits=(Long)entry.getValue(); if(credits.longValue() <= length) { if(!creditors.contains(mbr)) creditors.add(mbr); } } } else { credits=(Long)sent.get(dest); if(credits != null && credits.longValue() <= length) { if(!creditors.contains(dest)) creditors.add(dest); } } } /** * Decrements credits from a single member, or all members in sent_msgs, depending on whether it is a multicast * or unicast message. No need to acquire mutex (must already be held when this method is called) * @param dest * @param credits * @return The lowest number of credits left, or -1 if a unicast member was not found */ private long decrementCredit(Map m, Address dest, long credits) { boolean multicast=dest == null || dest.isMulticastAddress(); long lowest=max_credits, tmp; Long val; if(multicast) { if(m.size() == 0) return -1; Map.Entry entry; for(Iterator it=m.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); val=(Long)entry.getValue(); tmp=val.longValue(); tmp-=credits; entry.setValue(new Long(tmp)); lowest=Math.min(tmp, lowest); } return lowest; } else { val=(Long)m.get(dest); if(val != null) { lowest=val.longValue(); lowest-=credits; m.put(dest, new Long(lowest)); return lowest; } } return -1; } private void handleCredit(Address sender) { if(sender == null) return; StringBuffer sb=null; if(Util.acquire(lock)) { try { if(trace) { Long old_credit=(Long)sent.get(sender); sb=new StringBuffer(); sb.append("received credit from ").append(sender).append(", old credit was "). append(old_credit).append(", new credits are ").append(max_credits). append(".\nCreditors before are: ").append(creditors); } sent.put(sender, max_credits_constant); lowest_credit=computeLowestCredit(sent); if(creditors.size() > 0) { // we are blocked because we expect credit from one or more members creditors.remove(sender); if(trace) { sb.append("\nCreditors after removal of ").append(sender).append(" are: ").append(creditors); log.trace(sb.toString()); } } if(insufficient_credit && lowest_credit > 0 && creditors.size() == 0) { insufficient_credit=false; mutex.broadcast(); } } finally { Util.release(lock); } } } private static long computeLowestCredit(Map m) { Collection credits=m.values(); // List of Longs (credits) Long retval=(Long)Collections.min(credits); return retval.longValue(); } /** * Check whether sender has enough credits left. If not, send him some more * @param msg */ private void adjustCredit(Message msg) { Address src=msg.getSrc(); long length=msg.getLength(); // we don't care about headers for the purpose of flow control if(src == null) { if(log.isErrorEnabled()) log.error("src is null"); return; } if(length == 0) return; // no effect if(decrementCredit(received, src, length) <= min_credits) { received.put(src, max_credits_constant); if(trace) log.trace("sending replenishment message to " + src); sendCredit(src); } } private void sendCredit(Address dest) { Message msg=new Message(dest, null, null); msg.putHeader(name, REPLENISH_HDR); passDown(new Event(Event.MSG, msg)); num_credit_responses_sent++; } private void sendCreditRequest(final Address dest) { Message msg=new Message(dest, null, null); msg.putHeader(name, CREDIT_REQUEST_HDR); passDown(new Event(Event.MSG, msg)); num_credit_requests_sent++; } private void handleViewChange(Vector mbrs) { Address addr; if(mbrs == null) return; if(trace) log.trace("new membership: " + mbrs); if(Util.acquire(lock)) { try { // add members not in membership to received and sent hashmap (with full credits) for(int i=0; i < mbrs.size(); i++) { addr=(Address) mbrs.elementAt(i); if(!received.containsKey(addr)) received.put(addr, max_credits_constant); if(!sent.containsKey(addr)) sent.put(addr, max_credits_constant); } // remove members that left for(Iterator it=received.keySet().iterator(); it.hasNext();) { addr=(Address) it.next(); if(!mbrs.contains(addr)) it.remove(); } // remove members that left for(Iterator it=sent.keySet().iterator(); it.hasNext();) { addr=(Address)it.next(); if(!mbrs.contains(addr)) it.remove(); // modified the underlying map } // remove all creditors which are not in the new view for(int i=0; i < creditors.size(); i++) { Address creditor=(Address)creditors.get(i); if(!mbrs.contains(creditor)) creditors.remove(creditor); } if(trace) log.trace("creditors are " + creditors); if(insufficient_credit && creditors.size() == 0) { lowest_credit=computeLowestCredit(sent); insufficient_credit=false; mutex.broadcast(); } } finally { Util.release(lock); } } } private static String printMap(Map m) { Map.Entry entry; StringBuffer sb=new StringBuffer(); for(Iterator it=m.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); } return sb.toString(); } public static class FcHeader extends Header implements Streamable { public static final byte REPLENISH = 1; public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester byte type = REPLENISH; public FcHeader() { } public FcHeader(byte type) { this.type=type; } public long size() { return Global.BYTE_SIZE; } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readByte(); } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); } public String toString() { switch(type) { case REPLENISH: return "REPLENISH"; case CREDIT_REQUEST: return "CREDIT_REQUEST"; default: return "<invalid type>"; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -