⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 discbasedsort.java

📁 实现数据库的storage manager 功能
💻 JAVA
字号:
/*
 * Copyright (c) 2000-2004, Rickard C鰏ter, Martin Svensson
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions are met:
 * 
 * Redistributions of source code must retain the above copyright notice, 
 * this list of conditions and the following disclaimer. 
 * Redistributions in binary form must reproduce the above copyright notice, 
 * this list of conditions and the following disclaimer in the documentation 
 * and/or other materials provided with the distribution. 
 * Neither the name of SICS nor the names of its contributors 
 * may be used to endorse or promote products derived from this software 
 * without specific prior written permission. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 * POSSIBILITY OF SUCH DAMAGE.
 *
 *
 */

package com.mellowtech.disc.sort;

import com.mellowtech.disc.*;

import java.nio.*;
import java.io.*;
import java.nio.channels.*;
import java.util.*;
import se.sics.util.*;

/**
 * DiscBased sort sorts large amounts of data by combining in-memory 
 * sorting with disc-based merging. It uses quicksort for the 
 * in-memory sorting step. This sort also uses overlapping 
 * IO-opertations to maximize speed (i.e. the creation of the array of 
 * objects to sort works in parallel with reading input data).
 * The Sorter operates on ByteStorable objects.
 * <p>
 * For sorting with an optimal memory utilization and 
 * minimal object conversion turn to EDiscBasedSort.
 * </p>
 *
 * @author Martin Svensson
 * @version 1.0
 * @see com.mellowtech.disc.sort.EDiscBasedSort
 */
public class DiscBasedSort{
  public static final String SORT_RUN_FILE = "disc_sort_d_run.";
  private static final String SEP 
    = System.getProperties().getProperty("file.separator");
  
  private static int blockSize = 1024;
  private ByteStorable template;
  private int complevel = 0;
  private String tempDir = null;
  /**
   * Set the blocksize. How much data to read/write from disc. This
   * number should seldom be no more than 4096.
   *
   * @param size block size. (a multiple of 1024)
   */
  public static void setBlockSize(int size){
    blockSize = size;
  }

  /**
   * Create a new DiscBased sorter that will operate
   * on a specific type of objects.
   *
   * @param template the type of object to sort
   * @param tempDir temporary directory for sort runs
   */
  public DiscBasedSort(ByteStorable template, String tempDir){
    this(template, 0, tempDir);
  }
  
  /**
   * Create a new DiscBased sorter that will operate
   * on a specific type of objects.
   *
   * @param template the type of object to sort
   * @param complevel the level of GZIP compression for runs (1-9, where 
   * 1 is fastest) 
   * @param tempDir temporary directory for sort runs
   */
  public DiscBasedSort(ByteStorable template, int complevel, String tempDir){
    this.template = template;
    this.complevel = complevel;
    this.tempDir = tempDir;
    try {
      File file = new File(tempDir);
      if (!file.isDirectory()) 
	throw new Exception("");
    }
    catch(Exception e) {
      System.out.println("Could not open temp dir.." + tempDir);
      tempDir = System.getProperty("java.io.tmpdir");
      System.out.println("Setting temp dir to ....." + tempDir);
    }
  }
  
  /**
   * Sorts an inputfile and prints it to a designated outputfile. If these
   * are the same the inputfile will be overwritten.
   *
   * @param fName File to sort
   * @param outputFile Ouputfile
   * @param memorySize The amount of memory that can be used for the in-memory sorting step
   * @return the number of objects sorted.
   */
  public int sort(String fName, String outputFile, int memorySize){
    try{
      FileChannel fc = (new FileInputStream(fName)).getChannel();
      FileChannel fo = (new FileOutputStream(outputFile)).getChannel();
      int ret = sort(fc, fo, memorySize);
      fc.close();
      fo.close();
      return ret;
    }
    catch(IOException e){System.out.println(e); return -1;}
  }

  /**
   * Sorts a byte channel and print the result to a designated byte channel. If
   * these are the same the input channel will be overwritten.
   *
   * @param input input channel
   * @param output output channel
   * @param memorySize the number of bytes that can be used for in-memory sorting
   * @return the number of objects sorted
   */
  public int sort(ReadableByteChannel input, WritableByteChannel output,
		  int memorySize){
    
    ByteBuffer ob = ByteBuffer.allocateDirect(blockSize);
    int inputSize = blockSize * 100;
    ByteBuffer large = ByteBuffer.allocate(inputSize);
    //String tmpDir = System.getProperty("java.io.tmpdir");
    //if(tmpDir == null)
    //tmpDir = ".";

    int numFiles = makeRuns(input, memorySize, large, ob, tempDir);

    if(numFiles <= 0)
    return -1;
    
    //now merge:
    File f = new File(tempDir);
    String[] fNames = f.list(new DBFFilter());
    try{
      Merge.merge(fNames, template, large, ob, output, tempDir,
		  complevel > 0 ? true : false);
    }
    catch(Exception e){e.printStackTrace();}
    try {
      System.out.println("SORT:sort():Removing temp files.");
      File fileDir = new File(tempDir);
      System.out.println(fileDir);
      File[] files = fileDir.listFiles(new FilenameFilter() {
	  public boolean accept(File file, String name) {
	    return name.indexOf(SORT_RUN_FILE) >= 0 ? true : false;
	  }
	});
      for (int i = 0; i < files.length; i++) {
	System.out.println("sort run " + files[i]);
      	System.out.println("delete   " + files[i].delete());
      }
    }
    catch(Exception e) {
      System.out.println(e);
    }
    return numFiles;
    
  }

  
  private int makeRuns(ReadableByteChannel input, int heapSize,
			ByteBuffer large, ByteBuffer ob, String tmpDir){
    try{
      DBSContainer hb = new DBSContainer(large, input, blockSize, template, heapSize);

      ByteStorable[] objs = new ByteStorable[10000];
      int i = 0;
      int numObjs = 0;
      
      while(true){
	i++;
	if(!hb.prepareRun())
	  break;
	DBSProducer p = new DBSProducer(hb);
	DBSConsumer c = new DBSConsumer(hb, objs);
	p.start();
	c.start();
	c.join();
		
	//done filling a file:
	objs = c.getObjects();
	numObjs += c.getNumObjs();
	sortRun(input, objs, c.getNumObjs(), i, tmpDir, ob);
      }
      return numObjs;
    }
    catch(Exception e){e.printStackTrace(); return -1;}
  }
  
  
  private int sortRun(ReadableByteChannel c, ByteStorable[] objs, int numObjs, int i,
		      String dir, ByteBuffer output) throws Exception{
        
    output.clear(); //clear output buffer:

    //Create output channel:
    FileChannel fc = (new FileOutputStream(dir+SEP+SORT_RUN_FILE+i)).getChannel();
    int size = 0, numBytes = 0;

    //sort offsets:
    long l = System.currentTimeMillis();
    Sorters.quickSort(objs, numObjs);
    
    
    for(int j = 0; j < numObjs; j++){
      //System.out.println(j + " " + objs[j]);
      if(objs[j].byteSize() > output.remaining()){
	output.flip();
	fc.write(output);
	output.clear();
      }
      objs[j].toBytes(output);
      numBytes += objs[j].byteSize();
    }
    //flush outputbuffer:
    output.flip();
    fc.write(output);
    fc.close();
    return numBytes;
  }
}

class DBSProducer extends Thread {
  private DBSContainer hb;
  
  public DBSProducer(DBSContainer hb){
    this.hb = hb;
  }
  
  public void run(){
    try{
      while(!hb.producedAll())
	hb.produce();
    }
    catch(Exception e){System.out.println(e);}
  }
}

class DBSConsumer extends Thread{
  private DBSContainer hb;
  private ByteStorable tmp;
  private ByteStorable[] objs;
  private int numObjs = 0;
  
  public DBSConsumer(DBSContainer hb, ByteStorable[] objs){
    this.objs = objs;
    this.hb = hb;
  }
  
  public int getNumObjs(){
    return numObjs;
  }
  
  public ByteStorable[] getObjects(){
    return objs;
  }
  
  public void run(){
    try{
      while(!hb.consumedAll()){
	tmp = hb.consume();
	if(tmp != null){
	  //just a test:
	  if(numObjs == objs.length){
	    objs = (ByteStorable[])ArrayUtils.setSize(objs, (int) (objs.length * 1.75));
	  }
	  objs[numObjs++] = tmp;
	}
      }
    }
    catch(Exception e){e.printStackTrace();}
    System.out.println("consumed all: "+hb.consumedAll());
  }
}


class DBSContainer{
  
  ByteBuffer buffer;
  ByteBuffer consumerBuffer;
  ReadableByteChannel c;
  boolean noMore = false, consumedAll = false, endOfStream = false; 
  int slack = -1, totConsumed, totProduced, blockSize, maxRead;
  ByteStorable template;
  
  public DBSContainer(ByteBuffer b, ReadableByteChannel c, int blockSize,
		      ByteStorable template, int maxRead){
    
    this.blockSize = blockSize;
    this.maxRead = maxRead;
    this.template = template;
    totConsumed = totProduced = 0;
    this.c = c;
    buffer = b;
    consumerBuffer = b.asReadOnlyBuffer();
    consumerBuffer.limit(buffer.position());
    System.out.println("consumer buffer limit: "+consumerBuffer.limit());
    
  }
  
  public boolean prepareRun(){
    System.out.println("slack: "+slack+" "+buffer.limit()+" "+buffer.capacity());
    if(endOfStream)
      return false;
    noMore = false;
    consumedAll = false;
    if(slack > 0){
      buffer.position(buffer.limit() - slack);
      ByteStorable.copyToBeginning(buffer, slack);
      totProduced = slack;
      buffer.position(slack);
      buffer.limit(slack);
      slack = -1;
    }
    else{
      buffer.clear();
      totProduced = 0;
      buffer.limit(buffer.position());
    }
    totConsumed = 0;
    consumerBuffer.position(0);
    consumerBuffer.limit(buffer.position());
    
    /*buffer.limit(buffer.capacity());
      if(slack > 0){
      buffer.position(buffer.capacity() - slack);
      ByteStorable.copyToBeginning(buffer, slack);
      totProduced = slack;
      }
      else{
      buffer.clear();
      totProduced = 0;
      }
      totConsumed = 0;
      consumerBuffer.limit(buffer.position());
      consumerBuffer.position(0);
    */
    return true;
  }
  
  public ByteBuffer getBuffer(){
    return buffer;
  }
  
  public boolean producedAll(){
    return noMore;
  }
  
  public int getTotalConsumed(){
    return totConsumed;
  }
  
  public int getTotalProduced(){
    return totProduced;
  }
  
  public boolean consumedAll(){
    return consumedAll;
  }
  
  public synchronized ByteStorable consume(){ //as much as possible
    try{
      if((noMore && slack >= 0) || consumedAll){
	notifyAll();
	consumedAll = true;
	return null;
      }
      while(slack != -1){
	wait();
      }
      ByteStorable tmp;
      //int left = consumerBuffer.remaining();
      int bSize = ByteStorable.slackOrSize(consumerBuffer, template);
      //System.out.println(bSize + " " + consumerBuffer.limit());
      if(bSize <= 0){
	slack = Math.abs(bSize);
	notifyAll();
	return null;
      }
      tmp = template.fromBytes(consumerBuffer);
      totConsumed += bSize;
      notifyAll();
      return tmp;
    }
    catch(InterruptedException e){
      ;
    }
    return null;
  }
  
  public synchronized int produce(){
    int read = -1;
    if(noMore){
      notifyAll();
      return -1;
    }
    try{
      int left = buffer.capacity() - buffer.position(); 
      int moveAhead = (left < blockSize) ? left : blockSize;
      while(slack == -1 && left == 0){
	wait();
      }
      if(left > 0){
	buffer.limit(buffer.position() + moveAhead);
	read = c.read(buffer);
      }
      else if(slack > 0){ //read buffer completely
	consumerBuffer.clear();
	buffer.position(buffer.capacity() - slack);
	ByteStorable.copyToBeginning(buffer, slack);
	left = buffer.capacity() - buffer.position();
	buffer.limit(left < blockSize ? left : buffer.position() + blockSize);
	read = c.read(buffer);
	slack = -1;
      }
      else{ //read buffer completely
	consumerBuffer.clear();
	buffer.clear();
	buffer.limit(blockSize);
	read = c.read(buffer);
      }
      
      if(read == -1){ //end of stream
	endOfStream = true;
	noMore = true;
      }
      else{
	totProduced += read;
	consumerBuffer.limit(buffer.position());
	if(totProduced >= maxRead)
	  noMore = true;
	slack = -1;
      }
      notifyAll();
      return read;
      
    }
    catch(Exception e){System.err.println(slack);} 
    return -1;
  }
  
  public ByteBuffer getDBSConsumerBuffer(){
    return consumerBuffer;
  }
}



class DBFFilter implements java.io.FilenameFilter{
  
  public boolean accept(File dir, String name){
    if(name.startsWith(DiscBasedSort.SORT_RUN_FILE))
      return true;
    return false;
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -