⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ediscbasedsort.java

📁 实现数据库的storage manager 功能
💻 JAVA
字号:
/*
 * Copyright (c) 2000-2004, Rickard C鰏ter, Martin Svensson
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions are met:
 * 
 * Redistributions of source code must retain the above copyright notice, 
 * this list of conditions and the following disclaimer. 
 * Redistributions in binary form must reproduce the above copyright notice, 
 * this list of conditions and the following disclaimer in the documentation 
 * and/or other materials provided with the distribution. 
 * Neither the name of SICS nor the names of its contributors 
 * may be used to endorse or promote products derived from this software 
 * without specific prior written permission. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 * POSSIBILITY OF SUCH DAMAGE.
 *
 *
 */

package com.mellowtech.disc.sort;

import com.mellowtech.disc.*;
import java.nio.*;
import java.io.*;
import java.nio.channels.*;
import java.util.*;
import java.util.zip.*;
import se.sics.util.*;


/**
 * DiscBasedSort sorts large amounts of data by combining in-memory sorting 
 * with disc-based merging. It uses quicksort for the in-memory sorting 
 * step. This sort also uses overlapping IO-opertations to maximize 
 * speed (i.e. the creation of the array of objects to sort works in 
 * parallel with reading input data).
 * <p>
 * The Sorter operates on ByteStorable objects that can be compared on a 
 * byte level.
 * <p>
 * The EDiscBased sort is optimal in the sense that it only operates on 
 * one large buffer of bytes all through the sort process by making all 
 * comparisons on a byte level 
 * (i.e. by only sorting offset in the buffer of bytes). This has two main 
 * advantages:
 * <p>1. Speed - actual objects are never created in the sort.</p>
 * <p>2. Memory - we have full control over memory utilization.</p>
 * <p>
 * If the ByteStorable objects that are to be sorted can be compared at a byte 
 * level this sort is highly preferred over ordinary DiscBasedSort
 *
 * @author Martin Svensson
 * @version 1.0
 * @see com.mellowtech.disc.ByteComparable
 */
public class EDiscBasedSort{
  public static final String SORT_RUN_FILE = "disc_sort_e_run.";
  private static final String SEP 
    = System.getProperties().getProperty("file.separator");
  
  private int blockSize = 4096;
  private ByteStorable template;
  private ByteComparable bc;
  private int complevel = 0;
  private String tempDir = null;
  
  /**
   * Set the blocksize. How much data to read/write from disc. This
   * number should seldom be no more than 4096.
   *
   * @param size block size. (a multiple of 1024)
   */
  public void setBlockSize(int size){
    blockSize = size;
  }
  
  /**
   * Get the blocksize. How much data to read/write from disc. This
   * number should seldom be no more than 4096.
   *
   * @return the block size, in bytes (a multiple of 1024)
   */
  public int getBlockSize(){
    return blockSize;
  }

  /**
   * Create a new DiscBased sorter that will operate
   * on a specific type of objects with a specific ByteComparable object.
   *
   * @param template the type of object to sort
   * @param bc the object that lets the sort compare the ByteStorable 
   * objects at a byte level.
   * @param tempDir temporary directory for sort runs
   */
  public EDiscBasedSort(ByteStorable template, ByteComparable bc, 
			String tempDir){
    this(template, bc, 0, tempDir);
  }
  
  /**
   * Create a new DiscBased sorter that will operate
   * on a specific type of objects with a specific ByteComparable object.
   *
   * @param template the type of object to sort
   * @param bc the object that lets the sort compare the ByteStorable 
   * objects at a byte level.
   * @param complevel the level of GZIP compression for runs (1-9, where 
   * 1 is fastest) 
   * and 9 is highest compression)
   * @param tempDir temporary directory for sort runs
   */
  public EDiscBasedSort(ByteStorable template, 
			ByteComparable bc, 
			int complevel, 
			String tempDir){
    this.bc = bc;
    this.template = template;
    this.complevel = complevel;
    this.tempDir = tempDir;
    try {
      File file = new File(tempDir);
      if (!file.isDirectory()) 
	throw new Exception("");
    }
    catch(Exception e) {
      System.out.println("Could not open temp dir.." + tempDir);
      tempDir = System.getProperty("java.io.tmpdir");
      System.out.println("Setting temp dir to ....." + tempDir);
    }
  }
  
   /**
   * Sorts an inputfile and prints it to a designated outputfile. If these
   * are the same the inputfile will be overwritten.
   *
   * @param fName File to sort
   * @param outputFile Ouputfile
   * @param memorySize The amount of memory that can be used for the 
   * in-memory opertaions, must be at least as large as blockSize().
   * @return the number of objects sorted.
   */
  public int sort(String fName, String outputFile, int memorySize){
    try{
      FileChannel fc = (new FileInputStream(fName)).getChannel();
      FileChannel fo = (new FileOutputStream(outputFile)).getChannel();
      int ret = sort(fc, fo, memorySize);
      fc.close();
      fo.close();
      return ret;
    }
    catch(IOException e){
      System.out.println(e); 
      return -1;
    }
  }

  /**
   * Sorts a input stream and print the result to a designated output stream. If
   * these are the same the input channel will be overwritten. This method
   * simply creates appropriate channesl for the input and output and
   * calls sort on Channels.
   *
   * @param input input stream
   * @param output output stream
   * @param memorySize number of bytes used for in-memory sorting, 
   * must be at least as large as blockSize().
   * @return number of objects sorted
   * @see #sort(ReadableByteChannel, WritableByteChannel, int)
   */
  public int sort(InputStream input, OutputStream output,
		  int memorySize){
    return sort(Channels.newChannel(input), Channels.newChannel(output),
		memorySize);
  }
  /**
   * Sorts a byte channel and print the result to a designated byte channel. If
   * these are the same the input channel will be overwritten.
   *
   * @param input input channel
   * @param output output channel
   * @param memorySize the number of bytes that can be used for 
   * in-memory sorting, must be at least as large as blockSize().
   * @return the number of objects sorted
   */
  public int sort(ReadableByteChannel input, WritableByteChannel output,
		  int memorySize){
    
    if (memorySize < blockSize)
      memorySize = blockSize;
    ByteBuffer ob = ByteBuffer.allocateDirect(blockSize);
    ByteBuffer large = ByteBuffer.allocate(memorySize);
    
    if(tempDir == null)
      tempDir = ".";
    System.out.println("SORT:sort():Making runs.");
    int numFiles = makeRuns(input, large, ob, tempDir);
    

    if(numFiles <= 0)
    return -1;
    
    //now merge:
    File f = new File(tempDir);
    String[] fNames = f.list(new FFilter());
    try{
      System.out.println("SORT:sort():Merging runs");
      Merge.merge(fNames, template, large, ob, output, bc, tempDir, 
		  complevel > 0 ? true : false);
      //output.close();
    }
    catch(Exception e){e.printStackTrace();}
    try {
      System.out.println("SORT:sort():Removing temp files.");
      File fileDir = new File(tempDir);
      System.out.println(fileDir);
      File[] files = fileDir.listFiles(new FilenameFilter() {
	  public boolean accept(File file, String name) {
	    return name.indexOf(SORT_RUN_FILE) >= 0 ? true : false;
	  }
	});
      for (int i = 0; i < files.length; i++) {
	System.out.println("sort run " + files[i]);
      	System.out.println("delete   " + files[i].delete());
      }
    }
    catch(Exception e) {
      System.out.println(e);
    }
    return numFiles;
  }

  private int makeRuns(ReadableByteChannel input,
		       ByteBuffer large, ByteBuffer ob, String tempDir){
    try{
      EDBSContainer hb = new EDBSContainer(large, input, blockSize, 
					   template, bc);
      int[] offsets = new int[10000];
      int i = 0;
      int numSorts = 0;
      while(true){
	i++;
	if(!hb.prepareRun())
	  break;
	EDBSProducer p = new EDBSProducer(hb);
	EDBSConsumer c = new EDBSConsumer(hb, offsets);
	p.start();
	c.start();
	c.join();
		
	//done filling a file:
	offsets = c.getOffsets();
	numSorts += c.getNumOffsets();
	sortRun(hb.getBuffer(), input, offsets, c.getNumOffsets(), i, 
		tempDir, ob);
      }
      return numSorts;
    }
    catch(Exception e){e.printStackTrace(); return -1;}
    
  }

  
  private int sortRun(ByteBuffer bb, ReadableByteChannel c, int[] offsets, 
		      int numOffsets, int i, String dir, 
		      ByteBuffer output) throws Exception{
        
    output.clear(); //clear output buffer:

    //Create output channel:
    WritableByteChannel fc;
    if (complevel > 0)
      fc = Channels.newChannel(new DeflaterOutputStream(new FileOutputStream(dir+SEP+SORT_RUN_FILE+i), 
			       new Deflater(complevel)));
    else 
      fc = (new FileOutputStream(dir+SEP+SORT_RUN_FILE+i)).getChannel();
    int size = 0, numBytes = 0;

    //sort offsets:
    long l = System.currentTimeMillis();
    if(bb.hasArray())
      Sorters.quickSort(offsets, bc, bb.array(), numOffsets);
    else
      Sorters.quickSort(offsets, bc, bb, numOffsets);
    for(int j = 0; j < numOffsets; j++){
      bb.limit(bb.capacity());
      bb.position(offsets[j]);
      size = template.byteSize(bb);

      if(size > output.remaining()){
	output.flip();
	fc.write(output);
	output.clear();
      }
      bb.limit(offsets[j]+size);
      output.put(bb);
      numBytes += size;
    }
    //flush outputbuffer:
    output.flip();
    fc.write(output);
    fc.close();
    return numBytes;
  }

  
}

class EDBSProducer extends Thread {
    private EDBSContainer hb;
    
    public EDBSProducer(EDBSContainer hb){
      this.hb = hb;
    }
    
    public void run(){
      try{
	while(!hb.producedAll())
	  hb.produce();
      }
      catch(Exception e){e.printStackTrace();}
    }
  }
  
  class EDBSConsumer extends Thread{
    private EDBSContainer hb;
    private int offset;
    private int[] offsets;
    private int numOffsets = 0;
    
    public EDBSConsumer(EDBSContainer hb, int[] offsets){
      this.offsets = offsets;
      this.hb = hb;
    }
    
    public int getNumOffsets(){
      return numOffsets;
    }
    
    public int[] getOffsets(){
      return offsets;
    }
    
    public void run(){
      try{
	while(!hb.consumedAll()){
	  offset = hb.consume();
	  if(offset != -1){
	    //just a test:
	    if(numOffsets == offsets.length){
	      offsets = ArrayUtils.setSize(offsets, (int) (offsets.length * 1.75));
	    }
	    offsets[numOffsets++] = offset;
	  }
	}
      }
      catch(Exception e){e.printStackTrace();}
      System.out.println("SORT:consumer.run(): consumed all: "+hb.consumedAll()+" "+numOffsets);
    }
  }
  
  
  class EDBSContainer{
    
    ByteBuffer buffer;
    ByteBuffer consumerBuffer;
    ByteComparable bc;
    ReadableByteChannel c;
    boolean noMore = false, consumedAll = false, endOfStream = false; 
    int slack = -1, totConsumed, totProduced, blockSize, read;
    ByteStorable template;
    
    public EDBSContainer(ByteBuffer b, ReadableByteChannel c, int blockSize,
			 ByteStorable template, ByteComparable bc){
      
      this.blockSize = blockSize;
      this.bc = bc;
      this.template = template;
      totConsumed = totProduced = 0;
      this.c = c;
      buffer = b;
      consumerBuffer = b.asReadOnlyBuffer();
      consumerBuffer.limit(buffer.position());
      System.out.println("SORT:container():consumer buffer limit: "+consumerBuffer.limit());
      
    }
    
    public boolean prepareRun(){
      if(endOfStream)
	return false;
      noMore = false;
      consumedAll = false;
      buffer.limit(buffer.capacity());
      if(slack > 0){
	buffer.position(buffer.capacity() - slack);
	ByteStorable.copyToBeginning(buffer, slack);
	totProduced = slack;
      }
      else{
	buffer.clear();
	totProduced = 0;
      }
      totConsumed = 0;
      consumerBuffer.limit(buffer.position());
      consumerBuffer.position(0);
      return true;
    }
    
    public ByteBuffer getBuffer(){
      return buffer;
    }
    
    public boolean producedAll(){
      return noMore;
    }
    
    public int getTotalConsumed(){
      return totConsumed;
    }
    
    public int getTotalProduced(){
      return totProduced;
    }
    
    public boolean consumedAll(){
      return consumedAll;
    }
    
    public ByteBuffer getEDBSConsumerBuffer(){
      return consumerBuffer;
    }
    
    public synchronized int consume(){ //as much as possible
      try{
	if((noMore && slack >= 0) || consumedAll){
	  notifyAll();
	  consumedAll = true;
	  return -1;
	}

	if(slack != -1){
	  wait();
	}

	consumerBuffer.position(totConsumed);
	consumerBuffer.mark();
	int bSize = ByteStorable.slackOrSize(consumerBuffer, template);
	consumerBuffer.reset();
	if(bSize <= 0){
	  slack = Math.abs(bSize);
	  notifyAll();
	  return -1;
	}
	totConsumed += bSize;
	notifyAll();
	return totConsumed - bSize;
      }
      catch(InterruptedException e){
	;
      }
      return -1;
    }
    
    public synchronized int produce(){
      read = -1;
      //slack = -1;
      if(noMore){
	notifyAll();
	return -1;
      }
      try{
	int left = buffer.capacity() - buffer.position(); 
	int moveAhead = (left < blockSize) ? left : blockSize;
	buffer.limit(buffer.position() + moveAhead);
	read = c.read(buffer);
	if(read == -1){ //end of stream
	  noMore = true;
	  endOfStream = true;
	  slack = -1;
	}
	else{
	  totProduced += read;
	  consumerBuffer.limit(buffer.position());
	  if(totProduced == buffer.capacity())
	    noMore = true;
	  slack = -1;
	}
	notifyAll();
	return read;
      }
      catch(Exception e){e.printStackTrace();} 
      return -1;
    }
  }
  

class FFilter implements java.io.FilenameFilter{
  
  public boolean accept(File dir, String name){
    if(name.startsWith(EDiscBasedSort.SORT_RUN_FILE))
      return true;
    return false;
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -