📄 flush.java
字号:
{ boolean successfulBlock = false; blockok_promise.reset(); new Thread(Util.getGlobalThreadGroup(), new Runnable() { public void run() { passUp(new Event(Event.BLOCK)); } }, "FLUSH block").start(); try { blockok_promise.getResultWithTimeout(btimeout); successfulBlock = true; } catch (TimeoutException e) { log.warn("Blocking of channel using BLOCK event timed out after " + btimeout + " msec."); } return successfulBlock; } private boolean isCurrentFlushMessage(FlushHeader fh) { return fh.viewID == currentViewId(); } private long currentViewId() { long viewId = -1; synchronized (sharedLock) { ViewId view = currentView.getVid(); if (view != null) { viewId = view.getId(); } } return viewId; } private boolean onViewChange(View view) { boolean amINewCoordinator = false; if (receivedFirstView) { receivedMoreThanOneView = true; } if (!receivedFirstView) { receivedFirstView = true; } synchronized (sharedLock) { suspected.retainAll(view.getMembers()); currentView = view; amINewCoordinator = flushCoordinator != null && !view.getMembers().contains(flushCoordinator) && localAddress.equals(view.getMembers().get(0)); } //If coordinator leaves, its STOP FLUSH message will be discarded by //other members at NAKACK layer. Remaining members will be hung, waiting //for STOP_FLUSH message. If I am new coordinator I will complete the //FLUSH and send STOP_FLUSH on flush callers behalf. if (amINewCoordinator) { if (log.isDebugEnabled()) log.debug("Coordinator left, " + localAddress + " will complete flush"); onResume(); } if (log.isDebugEnabled()) log.debug("Installing view at " + localAddress + " view is " + view); return receivedFirstView && !receivedMoreThanOneView; } private void onStopFlush() { if (stats) { long stopFlushTime = System.currentTimeMillis(); totalTimeInFlush += (stopFlushTime - startFlushTime); if (numberOfFlushes > 0) { averageFlushDuration = totalTimeInFlush / (double)numberOfFlushes; } } if (!shouldReturnLastFromFlush) { if (log.isDebugEnabled()) log.debug("At " + localAddress + " unblocking FLUSH.down() and sending UNBLOCK up"); synchronized (blockMutex) { isBlockState = false; blockMutex.notifyAll(); } passUp(new Event(Event.UNBLOCK)); } //ack this STOP_FLUSH Message msg = new Message(null, localAddress, null); msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH_OK,currentViewId())); passDown(new Event(Event.MSG, msg)); if (log.isDebugEnabled()) log.debug("Received STOP_FLUSH and sent STOP_FLUSH_OK from " + localAddress); } private void onSuspend(View view) { Message msg = null; Collection participantsInFlush = null; synchronized (sharedLock) { //start FLUSH only on group members that we need to flush if (view != null) { participantsInFlush = new ArrayList(view.getMembers()); participantsInFlush.retainAll(currentView.getMembers()); } else { participantsInFlush = new ArrayList(currentView.getMembers()); } msg = new Message(null, localAddress, null); msg.putHeader(getName(), new FlushHeader(FlushHeader.START_FLUSH, currentViewId(), participantsInFlush)); } if (participantsInFlush.isEmpty()) { passUp(new Event(Event.SUSPEND_OK)); passDown(new Event(Event.SUSPEND_OK)); } else { passDown(new Event(Event.MSG, msg)); if (log.isDebugEnabled()) log.debug("Received SUSPEND at " + localAddress + ", sent START_FLUSH to " + participantsInFlush); } } private void onResume() { long viewID = currentViewId(); Message msg = new Message(null, localAddress, null); msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH,viewID)); passDown(new Event(Event.MSG, msg)); if (log.isDebugEnabled()) log.debug("Received RESUME at " + localAddress + ", sent STOP_FLUSH to all"); } private void onStartFlush(Address flushStarter, FlushHeader fh) { if (stats) { startFlushTime = System.currentTimeMillis(); numberOfFlushes += 1; } synchronized (sharedLock) { flushCoordinator = flushStarter; flushMembers.clear(); if(fh.flushParticipants!=null) { flushMembers.addAll(fh.flushParticipants); } flushMembers.removeAll(suspected); } Message msg = new Message(null, localAddress, null); msg.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_OK, fh.viewID)); passDown(new Event(Event.MSG, msg)); if (log.isDebugEnabled()) log.debug("Received START_FLUSH at " + localAddress + " responded with FLUSH_OK"); } private void onFlushOk(Address address, long viewID) { boolean flushOkCompleted = false; Message m = null; synchronized (sharedLock) { flushOkSet.add(address); flushOkCompleted = flushOkSet.containsAll(flushMembers); if (flushOkCompleted) { m = new Message(flushCoordinator, localAddress, null); } } if (log.isDebugEnabled()) log.debug("At " + localAddress + " FLUSH_OK from " + address + ",completed " + flushOkCompleted + ", flushOkSet " + flushOkSet.toString()); if (flushOkCompleted) { isBlockState = true; m.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_COMPLETED, viewID)); passDown(new Event(Event.MSG, m)); if (log.isDebugEnabled()) log.debug(localAddress + " is blocking FLUSH.down(). Sent FLUSH_COMPLETED message to " + flushCoordinator); } } private void onStopFlushOk(Address address, long viewID) { boolean stopFlushOkCompleted = false; synchronized (sharedLock) { stopFlushOkSet.add(address); TreeSet membersCopy = new TreeSet(currentView.getMembers()); membersCopy.removeAll(suspected); stopFlushOkCompleted = stopFlushOkSet.containsAll(membersCopy); } if (log.isDebugEnabled()) log.debug("At " + localAddress + " STOP_FLUSH_OK from " + address + ",completed " + stopFlushOkCompleted + ", stopFlushOkSet " + stopFlushOkSet.toString()); if (stopFlushOkCompleted) { if (shouldReturnLastFromFlush) { if (log.isDebugEnabled()) log.debug("At " + localAddress + " unblocking FLUSH.down() and sending UNBLOCK up"); synchronized (blockMutex) { isBlockState = false; blockMutex.notifyAll(); } passUp(new Event(Event.UNBLOCK)); } synchronized (sharedLock) { flushCompletedSet.clear(); flushOkSet.clear(); stopFlushOkSet.clear(); flushMembers.clear(); suspected.clear(); flushCoordinator = null; } shouldReturnLastFromFlush = false; } } private void onFlushCompleted(Address address) { boolean flushCompleted = false; synchronized (sharedLock) { flushCompletedSet.add(address); flushCompleted = flushCompletedSet.containsAll(flushMembers); } if (log.isDebugEnabled()) log.debug("At " + localAddress + " FLUSH_COMPLETED from " + address + ",completed " + flushCompleted + ",flushCompleted " + flushCompletedSet.toString()); if (flushCompleted) { //needed for jmx operation startFlush(timeout); flush_promise.setResult(Boolean.TRUE); passUp(new Event(Event.SUSPEND_OK)); passDown(new Event(Event.SUSPEND_OK)); if (log.isDebugEnabled()) log.debug("All FLUSH_COMPLETED received at " + localAddress + " sent SUSPEND_OK down/up"); } } private void onSuspect(Address address) { boolean flushOkCompleted = false; Message m = null; long viewID = 0; synchronized (sharedLock) { suspected.add(address); flushMembers.removeAll(suspected); viewID = currentViewId(); flushOkCompleted = !flushOkSet.isEmpty() && flushOkSet.containsAll(flushMembers); if (flushOkCompleted) { m = new Message(flushCoordinator, localAddress, null); } } if (flushOkCompleted) { m.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_COMPLETED, viewID)); passDown(new Event(Event.MSG, m)); if (log.isDebugEnabled()) log.debug(localAddress + " sent FLUSH_COMPLETED message to " + flushCoordinator); } if (log.isDebugEnabled()) log.debug("Suspect is " + address + ",completed " + flushOkCompleted + ", flushOkSet " + flushOkSet + " flushMembers " + flushMembers); } private boolean isFlushRunning() { return isBlockState; } public static class FlushHeader extends Header implements Streamable { public static final byte START_FLUSH = 0; public static final byte FLUSH_OK = 1; public static final byte STOP_FLUSH = 2; public static final byte FLUSH_COMPLETED = 3; public static final byte STOP_FLUSH_OK = 4; byte type; long viewID; Collection flushParticipants; public FlushHeader() { this(START_FLUSH,0); } // used for externalization public FlushHeader(byte type, long viewID) { this(type, viewID, null); } public FlushHeader(byte type, long viewID, Collection flushView) { this.type = type; this.viewID = viewID; this.flushParticipants = flushView; } public String toString() { switch (type) { case START_FLUSH : return "FLUSH[type=START_FLUSH,viewId=" + viewID + ",members=" + flushParticipants + "]"; case FLUSH_OK : return "FLUSH[type=FLUSH_OK,viewId=" + viewID + "]"; case STOP_FLUSH : return "FLUSH[type=STOP_FLUSH,viewId=" + viewID + "]"; case STOP_FLUSH_OK : return "FLUSH[type=STOP_FLUSH_OK,viewId=" + viewID + "]"; case FLUSH_COMPLETED : return "FLUSH[type=FLUSH_COMPLETED,viewId=" + viewID + "]"; default : return "[FLUSH: unknown type (" + type + ")]"; } } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); out.writeLong(viewID); out.writeObject(flushParticipants); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type = in.readByte(); viewID = in.readLong(); flushParticipants = (Collection) in.readObject(); } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); out.writeLong(viewID); if (flushParticipants != null && !flushParticipants.isEmpty()) { out.writeShort(flushParticipants.size()); for (Iterator iter = flushParticipants.iterator(); iter.hasNext();) { Address address = (Address) iter.next(); Util.writeAddress(address, out); } } else { out.writeShort(0); } } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type = in.readByte(); viewID = in.readLong(); int flushParticipantsSize = in.readShort(); if (flushParticipantsSize > 0) { flushParticipants = new ArrayList(flushParticipantsSize); for (int i = 0; i < flushParticipantsSize; i++) { flushParticipants.add(Util.readAddress(in)); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -