📄 getcontentrequest.java
字号:
// message.setBytes(CMS.RANGE_END, Long.toString(rangeEnd).getBytes()); messenger.sendMessage(message); } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not send GET request", e); return false; } return true; } public void processIncomingMessage( Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("processIncomingMessage()"); if (message != null) { // Handle the message String messageType = null; String mRid = null; String mCid = null; try { messageType = CMS.popString(message, CMS.MESSAGE_TYPE); mRid = CMS.popString(message, CMS.REQUEST_ID); mCid = CMS.popString(message, CMS.CONTENT_ID); TargetItem ti = (TargetItem) targetTable.get(mRid); if (CMS.GET_RESULT.equals(messageType) && ti != null && cAdv.getContentId().toString().equals(mCid)) { ti.timestamp = System.currentTimeMillis(); ti.errorcount = 0; handleMessage(message); } } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not handle message", e); } } } public void cancel() { isDone = true; if (null != thread) { thread.interrupt(); } group.getEndpointService().removeIncomingMessageListener( address.getServiceName(),address.getServiceParameter()); trimFile(); } private synchronized void trimFile() { if (hasFailed) { return; } hasFailed = true; if (bitSet == null) { return; } if (numberReceived >= numberChunks) { return; } int count = 0; while (bitSet.get(count)) { count++; } if (count * chunkSize > contentLength) { return; } try { RandomAccessFile op = new RandomAccessFile(destFile, "rw"); op.setLength(count * chunkSize); op.close(); } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not trim file " + destFile, e); destFile.delete(); return; } } public boolean isDone() { return isDone; } public boolean hasFailed() { return hasFailed; } public int getPercentDone() { if (0 == numberChunks) { return 0; } return (numberReceived * 100) / numberChunks; } /** * These notify methods are to be overridden by a subclass that * wants to track the progress of the request */ public void notifyDone() { } public void notifyFailure() { } public void notifyUpdate(int percentage) { } public File getFile() { return destFile; } private synchronized void handleMessage(Message message) { long length = 0; long offset = 0; int size = 0; // extract the content length try { length = popLong(message, CMS.CONTENT_LENGTH); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not pop CONTENT_LENGTH tag", e); return; } catch (NumberFormatException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not parse CONTENT_LENGTH tag", e); return; } // zero-sized content (special case) if (length == 0) { try { RandomAccessFile op = new RandomAccessFile(destFile, "rw"); op.setLength(0); op.close(); } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not create file " + destFile, e); return; } isDone = true; notifyDone(); group.getEndpointService().removeIncomingMessageListener( address.getServiceName(),address.getServiceParameter()); return; } // extract the content offset try { offset = popLong(message, CMS.CHUNK_OFFSET); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not pop CHUNK_OFFSET tag", e); return; } catch (NumberFormatException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not parse CHUNK_OFFSET tag", e); return; } // extract the chunk size try { size = popInt(message, CMS.CHUNK_SIZE); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not pop CHUNK_SIZE tag", e); return; } catch (NumberFormatException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not parse CHUNK_SIZE tag", e); return; } // check if the bitSet has been created if (bitSet == null) { numberChunks = (int) (length / size); // check for case where there is a particle last chunk if (size * numberChunks < length) { numberChunks++; } // this is the first chunk bitSet = new BitSet(numberChunks); contentLength = length; chunkSize = size; // check received chunks long fileLength = destFile.length(); for (int i = 0;(i + 1) * chunkSize <= fileLength; i++) { numberReceived++; bitSet.set(i); } // for request bitSetReq = new BitSet(numberChunks); } if (contentLength != length) { // not compatible if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("incompatible content length"); return; } // check if this chunk has been received already int chunkNumber = (int) (offset / chunkSize); if (bitSet.get(chunkNumber)) { // already seen this chunk return; } if (chunkNumber < numberChunks - 1 && chunkSize != size) { // not compatible if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("incompatible chunk size"); return; } if (chunkNumber * chunkSize != offset) { // not compatible if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("invalid offset"); return; } if (isDone) { return; } InputStream chunkStream = null; try { chunkStream = message.getMessageElement(CMS.CHUNK_DATA).getStream(); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("could not pop CHUNK_DATA tag", e); } return; } RandomAccessFile op = null; try { op = new RandomAccessFile(destFile, "rw"); } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not create file " + destFile, e); return; } try { int available = chunkStream.available(); byte[] buffer = new byte[available]; int res = chunkStream.read(buffer); op.seek(offset); op.write(buffer, 0, res); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not write to file " + destFile, e); return; } try { op.close(); chunkStream.close(); } catch (IOException e) { if (LOG.isEnabledFor(Level.DEBUG)) LOG.debug("could not close file " + destFile, e); } // set the chunk as seen bitSet.set(chunkNumber); numberReceived++; notifyUpdate(getPercentDone()); if (numberReceived >= numberChunks) { isDone = true; notifyDone(); group.getEndpointService().removeIncomingMessageListener( address.getServiceName(),address.getServiceParameter()); } } private static long popLong(Message message, String tag) throws IOException, NumberFormatException { String str = CMS.popString(message, tag); return Long.parseLong(str); } private static int popInt(Message message, String tag) throws IOException, NumberFormatException { String str = CMS.popString(message, tag); return Integer.parseInt(str); } public void finalize() { cancel(); } private int countBitSet(BitSet bs) { int c = 0; for (int i = 0; i < bs.length(); i++) { if (bs.get(i)) { c++; } } return c; } private void clearBitSetReq(BitSet bs) { for (int i = 0; i < bs.length(); i++) { if (bs.get(i)) { bs.clear(i); if (bitSetReq != null && bitSetReq != bs) { bitSetReq.clear(i); } } } } // to clear received chunk // to find a chunk hole, better way? private void gcBitSetReq(BitSet bs) { for (int i = 0; i < bs.length(); i++) { if (bitSet.get(i)) { bs.clear(i); if (bitSetReq != null && bitSetReq != bs) { bitSetReq.clear(i); } } else if ( bs == bitSetReq && bitSet.get(i + 1) && i < numberReceived) { bitSetReq.clear(i); } } } class TargetItem { ContentAdvertisement cAdv = null; long timestamp = 0; int errorcount = 0; BitSet bsReq = new BitSet(); TargetItem(ContentAdvertisement c) { cAdv = c; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -