📄 udpccfragmessenger.java
字号:
* Only Called from NetworkCore * * @param msg * @param src * @param local * @param tries * @param wait_ms * @param est_rtt_ms */ public void recv(Object msg, InetSocketAddress src, InetSocketAddress local, int tries, long wait_ms, long est_rtt_ms) { try { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( msg, "Network Message Fragment Received", new Object[]{"t", String.valueOf(tries), "w", String.valueOf(wait_ms), "r", String.valueOf(est_rtt_ms)}, null)); } long currentTime = (long) LocalNode.myTimer.getCurrentTime() * 1000; if (msg instanceof Fragment) { Fragment fragment = (Fragment) msg; FragmentIdentifier identifier = new FragmentIdentifier(src, fragment.messageID); FragmentedMessage message = (FragmentedMessage) partialMessages.get(identifier); if (message == null) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( msg, "First Fragment Received", new Object[] { "s", src, "m", String.valueOf(fragment.messageID), "f", String.valueOf(fragment.totalFragments), "z", String.valueOf(fragment.totalLength), "n", String.valueOf(fragment.fragmentNum), "b", String.valueOf(fragment.start), "l", String.valueOf(fragment.fragmentLength), "h", String.valueOf(identifier.hashCode()) }, null)); } message = new FragmentedMessage(fragment.totalFragments, fragment.totalLength, currentTime); partialMessages.put(identifier, message); } else { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( msg, "Additional Fragment Received", new Object[] { "s", src, "m", String.valueOf(fragment.messageID), "f", String.valueOf(fragment.totalFragments), "z", String.valueOf(fragment.totalLength), "n", String.valueOf(fragment.fragmentNum), "b", String.valueOf(fragment.start), "l", String.valueOf(fragment.fragmentLength), "h", String.valueOf(identifier.hashCode()) }, null)); } } message.addFragment(fragment.fragmentNum, fragment.start, fragment.buffer, fragment.fragmentLength, currentTime); if (message.isComplete()) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( msg, "All Fragments Recevied", null, null)); } partialMessages.remove(identifier); Object payload = null; try { message.buffer.position(0); payload = super.deserialize(message.buffer); } catch (Exception exception) { logger.error( new StructuredLogMessage( msg, "Deserialization Error", null, null), exception); } UDPClient client = (UDPClient) listenersClients.get( new Integer(local.getPort())); if ((client != null) && (payload != null)) { eventCore.register_timer( 0, this, (new RunObject( client, local.getPort(), src, (Payload) payload))); } } else { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( msg, "More Fragments Required", null, null)); } } } } catch (Exception exception) { return; } } /** * Method timer_cb * * @param user_data */ public void timer_cb(Object user_data) { if (user_data.equals(SIGNAL_CLEANUP)) { cleanup(); } else { super.timer_cb(user_data); } } /** * Method cleanup */ protected void cleanup() { long currentTime = (long) LocalNode.myTimer.getCurrentTime() * 1000; Set partialsSet = partialMessages.entrySet(); Iterator partials = partialsSet.iterator(); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( partials, "Beginning Dead Message Cleanup", new Object[]{"z", String.valueOf(partialsSet.size())}, null)); } while (partials.hasNext()) { Map.Entry entry = (Map.Entry) partials.next(); FragmentedMessage message = (FragmentedMessage) entry.getValue(); if (message.receiveTime + maxTimeBetweenFragments < currentTime) { partials.remove(); FragmentIdentifier identifier = (FragmentIdentifier) entry.getKey(); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( identifier, "Removed Dead Message", new Object[]{"s", identifier.source, "m", String.valueOf(identifier.messageID), "t", String.valueOf( message.receiveTime)}, null)); } } } networkCore.register_timer(cleanupInterval, this, SIGNAL_CLEANUP); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( partials, "Completed Dead Message Cleanup", null, null)); } } /** * Class Fragment * */ public class Fragment { ByteBuffer buffer; public int start; public int fragmentLength; public int totalLength; public byte fragmentNum; public byte totalFragments; public int messageID; /** * Constructor Fragment * * @param buffer * @param start * @param fragmentLength * @param totalLength * @param fragmentNum * @param totalFragments * @param messageID */ public Fragment(ByteBuffer buffer, int start, int fragmentLength, int totalLength, byte fragmentNum, byte totalFragments, int messageID) { this.buffer = buffer; this.start = start; this.fragmentLength = fragmentLength; this.totalLength = totalLength; this.fragmentNum = fragmentNum; this.totalFragments = totalFragments; this.messageID = messageID; } } /** * Class FragmentInfo * */ public class FragmentInfo { public int numFragments; public int posAckedFragments; public int negAckedFragments; public AckObject ackObject; /** * Constructor FragmentInfo * * @param numFragments * @param ackObject */ public FragmentInfo(int numFragments, AckObject ackObject) { this.numFragments = numFragments; this.ackObject = ackObject; } } /** * Class FragmentedMessage * */ public class FragmentedMessage { public ByteBuffer buffer; public boolean[] fragments; public long receiveTime; /** * Constructor FragmentedMessage * * @param numFragments * @param totalSize * @param receiveTime */ public FragmentedMessage(int numFragments, int totalSize, long receiveTime) { this.buffer = ByteBuffer.allocate(totalSize); this.fragments = new boolean[numFragments]; this.receiveTime = receiveTime; } /** * Method addFragment * * @param fragmentNum * @param start * @param buffer * @param length * @param receiveTime */ public void addFragment(int fragmentNum, int start, ByteBuffer buffer, int length, long receiveTime) { try { this.buffer.position(start); buffer.limit(buffer.position() + length); this.buffer.put(buffer); this.receiveTime = receiveTime; fragments[fragmentNum] = true; } catch (Exception exception) { logger.error( new StructuredLogMessage( buffer, "Unable to add fragment to message", null, null), exception); } } /** * Method isComplete * @return */ public boolean isComplete() { for (int i = 0; i < fragments.length; i++) { if (fragments[i] == false) { return false; } } return true; } } /** * Class FragmentIdentifier * */ public class FragmentIdentifier { public InetSocketAddress source; public int messageID; public int hashcode; /** * Constructor FragmentIdentifier * * @param source * @param messageID */ public FragmentIdentifier(InetSocketAddress source, int messageID) { this.source = source; this.messageID = messageID; computeHash(); } /** * Method computeHash */ public void computeHash() { hashcode = 0; if (source != null) { hashcode += source.hashCode(); } hashcode += messageID; } /** * Method hashCode * @return */ public int hashCode() { return hashcode; } /** * Method equals * * @param other * @return */ public boolean equals(Object other) { if (other instanceof FragmentIdentifier) { FragmentIdentifier otherFragment = (FragmentIdentifier) other; if ((messageID == otherFragment.messageID) && (source.equals(otherFragment.source))) { return true; } else { return false; } } else { return false; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -