📄 ediscbasedsort.java
字号:
/*
* Copyright (c) 2000-2004, Rickard C鰏ter, Martin Svensson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of SICS nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*
*/
package com.mellowtech.disc.sort;
import com.mellowtech.disc.*;
import java.nio.*;
import java.io.*;
import java.nio.channels.*;
import java.util.*;
import java.util.zip.*;
import se.sics.util.*;
/**
* DiscBasedSort sorts large amounts of data by combining in-memory sorting
* with disc-based merging. It uses quicksort for the in-memory sorting
* step. This sort also uses overlapping IO-opertations to maximize
* speed (i.e. the creation of the array of objects to sort works in
* parallel with reading input data).
* <p>
* The Sorter operates on ByteStorable objects that can be compared on a
* byte level.
* <p>
* The EDiscBased sort is optimal in the sense that it only operates on
* one large buffer of bytes all through the sort process by making all
* comparisons on a byte level
* (i.e. by only sorting offset in the buffer of bytes). This has two main
* advantages:
* <p>1. Speed - actual objects are never created in the sort.</p>
* <p>2. Memory - we have full control over memory utilization.</p>
* <p>
* If the ByteStorable objects that are to be sorted can be compared at a byte
* level this sort is highly preferred over ordinary DiscBasedSort
*
* @author Martin Svensson
* @version 1.0
* @see com.mellowtech.disc.ByteComparable
*/
public class EDiscBasedSort{
public static final String SORT_RUN_FILE = "disc_sort_e_run.";
private static final String SEP
= System.getProperties().getProperty("file.separator");
private int blockSize = 4096;
private ByteStorable template;
private ByteComparable bc;
private int complevel = 0;
private String tempDir = null;
/**
* Set the blocksize. How much data to read/write from disc. This
* number should seldom be no more than 4096.
*
* @param size block size. (a multiple of 1024)
*/
public void setBlockSize(int size){
blockSize = size;
}
/**
* Get the blocksize. How much data to read/write from disc. This
* number should seldom be no more than 4096.
*
* @return the block size, in bytes (a multiple of 1024)
*/
public int getBlockSize(){
return blockSize;
}
/**
* Create a new DiscBased sorter that will operate
* on a specific type of objects with a specific ByteComparable object.
*
* @param template the type of object to sort
* @param bc the object that lets the sort compare the ByteStorable
* objects at a byte level.
* @param tempDir temporary directory for sort runs
*/
public EDiscBasedSort(ByteStorable template, ByteComparable bc,
String tempDir){
this(template, bc, 0, tempDir);
}
/**
* Create a new DiscBased sorter that will operate
* on a specific type of objects with a specific ByteComparable object.
*
* @param template the type of object to sort
* @param bc the object that lets the sort compare the ByteStorable
* objects at a byte level.
* @param complevel the level of GZIP compression for runs (1-9, where
* 1 is fastest)
* and 9 is highest compression)
* @param tempDir temporary directory for sort runs
*/
public EDiscBasedSort(ByteStorable template,
ByteComparable bc,
int complevel,
String tempDir){
this.bc = bc;
this.template = template;
this.complevel = complevel;
this.tempDir = tempDir;
try {
File file = new File(tempDir);
if (!file.isDirectory())
throw new Exception("");
}
catch(Exception e) {
System.out.println("Could not open temp dir.." + tempDir);
tempDir = System.getProperty("java.io.tmpdir");
System.out.println("Setting temp dir to ....." + tempDir);
}
}
/**
* Sorts an inputfile and prints it to a designated outputfile. If these
* are the same the inputfile will be overwritten.
*
* @param fName File to sort
* @param outputFile Ouputfile
* @param memorySize The amount of memory that can be used for the
* in-memory opertaions, must be at least as large as blockSize().
* @return the number of objects sorted.
*/
public int sort(String fName, String outputFile, int memorySize){
try{
FileChannel fc = (new FileInputStream(fName)).getChannel();
FileChannel fo = (new FileOutputStream(outputFile)).getChannel();
int ret = sort(fc, fo, memorySize);
fc.close();
fo.close();
return ret;
}
catch(IOException e){
System.out.println(e);
return -1;
}
}
/**
* Sorts a input stream and print the result to a designated output stream. If
* these are the same the input channel will be overwritten. This method
* simply creates appropriate channesl for the input and output and
* calls sort on Channels.
*
* @param input input stream
* @param output output stream
* @param memorySize number of bytes used for in-memory sorting,
* must be at least as large as blockSize().
* @return number of objects sorted
* @see #sort(ReadableByteChannel, WritableByteChannel, int)
*/
public int sort(InputStream input, OutputStream output,
int memorySize){
return sort(Channels.newChannel(input), Channels.newChannel(output),
memorySize);
}
/**
* Sorts a byte channel and print the result to a designated byte channel. If
* these are the same the input channel will be overwritten.
*
* @param input input channel
* @param output output channel
* @param memorySize the number of bytes that can be used for
* in-memory sorting, must be at least as large as blockSize().
* @return the number of objects sorted
*/
public int sort(ReadableByteChannel input, WritableByteChannel output,
int memorySize){
if (memorySize < blockSize)
memorySize = blockSize;
ByteBuffer ob = ByteBuffer.allocateDirect(blockSize);
ByteBuffer large = ByteBuffer.allocate(memorySize);
if(tempDir == null)
tempDir = ".";
System.out.println("SORT:sort():Making runs.");
int numFiles = makeRuns(input, large, ob, tempDir);
if(numFiles <= 0)
return -1;
//now merge:
File f = new File(tempDir);
String[] fNames = f.list(new FFilter());
try{
System.out.println("SORT:sort():Merging runs");
Merge.merge(fNames, template, large, ob, output, bc, tempDir,
complevel > 0 ? true : false);
//output.close();
}
catch(Exception e){e.printStackTrace();}
try {
System.out.println("SORT:sort():Removing temp files.");
File fileDir = new File(tempDir);
System.out.println(fileDir);
File[] files = fileDir.listFiles(new FilenameFilter() {
public boolean accept(File file, String name) {
return name.indexOf(SORT_RUN_FILE) >= 0 ? true : false;
}
});
for (int i = 0; i < files.length; i++) {
System.out.println("sort run " + files[i]);
System.out.println("delete " + files[i].delete());
}
}
catch(Exception e) {
System.out.println(e);
}
return numFiles;
}
private int makeRuns(ReadableByteChannel input,
ByteBuffer large, ByteBuffer ob, String tempDir){
try{
EDBSContainer hb = new EDBSContainer(large, input, blockSize,
template, bc);
int[] offsets = new int[10000];
int i = 0;
int numSorts = 0;
while(true){
i++;
if(!hb.prepareRun())
break;
EDBSProducer p = new EDBSProducer(hb);
EDBSConsumer c = new EDBSConsumer(hb, offsets);
p.start();
c.start();
c.join();
//done filling a file:
offsets = c.getOffsets();
numSorts += c.getNumOffsets();
sortRun(hb.getBuffer(), input, offsets, c.getNumOffsets(), i,
tempDir, ob);
}
return numSorts;
}
catch(Exception e){e.printStackTrace(); return -1;}
}
private int sortRun(ByteBuffer bb, ReadableByteChannel c, int[] offsets,
int numOffsets, int i, String dir,
ByteBuffer output) throws Exception{
output.clear(); //clear output buffer:
//Create output channel:
WritableByteChannel fc;
if (complevel > 0)
fc = Channels.newChannel(new DeflaterOutputStream(new FileOutputStream(dir+SEP+SORT_RUN_FILE+i),
new Deflater(complevel)));
else
fc = (new FileOutputStream(dir+SEP+SORT_RUN_FILE+i)).getChannel();
int size = 0, numBytes = 0;
//sort offsets:
long l = System.currentTimeMillis();
if(bb.hasArray())
Sorters.quickSort(offsets, bc, bb.array(), numOffsets);
else
Sorters.quickSort(offsets, bc, bb, numOffsets);
for(int j = 0; j < numOffsets; j++){
bb.limit(bb.capacity());
bb.position(offsets[j]);
size = template.byteSize(bb);
if(size > output.remaining()){
output.flip();
fc.write(output);
output.clear();
}
bb.limit(offsets[j]+size);
output.put(bb);
numBytes += size;
}
//flush outputbuffer:
output.flip();
fc.write(output);
fc.close();
return numBytes;
}
}
class EDBSProducer extends Thread {
private EDBSContainer hb;
public EDBSProducer(EDBSContainer hb){
this.hb = hb;
}
public void run(){
try{
while(!hb.producedAll())
hb.produce();
}
catch(Exception e){e.printStackTrace();}
}
}
class EDBSConsumer extends Thread{
private EDBSContainer hb;
private int offset;
private int[] offsets;
private int numOffsets = 0;
public EDBSConsumer(EDBSContainer hb, int[] offsets){
this.offsets = offsets;
this.hb = hb;
}
public int getNumOffsets(){
return numOffsets;
}
public int[] getOffsets(){
return offsets;
}
public void run(){
try{
while(!hb.consumedAll()){
offset = hb.consume();
if(offset != -1){
//just a test:
if(numOffsets == offsets.length){
offsets = ArrayUtils.setSize(offsets, (int) (offsets.length * 1.75));
}
offsets[numOffsets++] = offset;
}
}
}
catch(Exception e){e.printStackTrace();}
System.out.println("SORT:consumer.run(): consumed all: "+hb.consumedAll()+" "+numOffsets);
}
}
class EDBSContainer{
ByteBuffer buffer;
ByteBuffer consumerBuffer;
ByteComparable bc;
ReadableByteChannel c;
boolean noMore = false, consumedAll = false, endOfStream = false;
int slack = -1, totConsumed, totProduced, blockSize, read;
ByteStorable template;
public EDBSContainer(ByteBuffer b, ReadableByteChannel c, int blockSize,
ByteStorable template, ByteComparable bc){
this.blockSize = blockSize;
this.bc = bc;
this.template = template;
totConsumed = totProduced = 0;
this.c = c;
buffer = b;
consumerBuffer = b.asReadOnlyBuffer();
consumerBuffer.limit(buffer.position());
System.out.println("SORT:container():consumer buffer limit: "+consumerBuffer.limit());
}
public boolean prepareRun(){
if(endOfStream)
return false;
noMore = false;
consumedAll = false;
buffer.limit(buffer.capacity());
if(slack > 0){
buffer.position(buffer.capacity() - slack);
ByteStorable.copyToBeginning(buffer, slack);
totProduced = slack;
}
else{
buffer.clear();
totProduced = 0;
}
totConsumed = 0;
consumerBuffer.limit(buffer.position());
consumerBuffer.position(0);
return true;
}
public ByteBuffer getBuffer(){
return buffer;
}
public boolean producedAll(){
return noMore;
}
public int getTotalConsumed(){
return totConsumed;
}
public int getTotalProduced(){
return totProduced;
}
public boolean consumedAll(){
return consumedAll;
}
public ByteBuffer getEDBSConsumerBuffer(){
return consumerBuffer;
}
public synchronized int consume(){ //as much as possible
try{
if((noMore && slack >= 0) || consumedAll){
notifyAll();
consumedAll = true;
return -1;
}
if(slack != -1){
wait();
}
consumerBuffer.position(totConsumed);
consumerBuffer.mark();
int bSize = ByteStorable.slackOrSize(consumerBuffer, template);
consumerBuffer.reset();
if(bSize <= 0){
slack = Math.abs(bSize);
notifyAll();
return -1;
}
totConsumed += bSize;
notifyAll();
return totConsumed - bSize;
}
catch(InterruptedException e){
;
}
return -1;
}
public synchronized int produce(){
read = -1;
//slack = -1;
if(noMore){
notifyAll();
return -1;
}
try{
int left = buffer.capacity() - buffer.position();
int moveAhead = (left < blockSize) ? left : blockSize;
buffer.limit(buffer.position() + moveAhead);
read = c.read(buffer);
if(read == -1){ //end of stream
noMore = true;
endOfStream = true;
slack = -1;
}
else{
totProduced += read;
consumerBuffer.limit(buffer.position());
if(totProduced == buffer.capacity())
noMore = true;
slack = -1;
}
notifyAll();
return read;
}
catch(Exception e){e.printStackTrace();}
return -1;
}
}
class FFilter implements java.io.FilenameFilter{
public boolean accept(File dir, String name){
if(name.startsWith(EDiscBasedSort.SORT_RUN_FILE))
return true;
return false;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -