⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 merge.java

📁 实现数据库的storage manager 功能
💻 JAVA
字号:
/*
 * Copyright (c) 2000-2004, Rickard C鰏ter, Martin Svensson
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions are met:
 * 
 * Redistributions of source code must retain the above copyright notice, 
 * this list of conditions and the following disclaimer. 
 * Redistributions in binary form must reproduce the above copyright notice, 
 * this list of conditions and the following disclaimer in the documentation 
 * and/or other materials provided with the distribution. 
 * Neither the name of SICS nor the names of its contributors 
 * may be used to endorse or promote products derived from this software 
 * without specific prior written permission. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 * POSSIBILITY OF SUCH DAMAGE.
 *
 *
 */

package com.mellowtech.disc.sort;

import com.mellowtech.disc.*;
import java.nio.*;
import java.nio.channels.*;
import java.io.*;
import java.util.zip.*;

/**
 * Merge a set of sorted files into one large file. This Merge
 * operates on ByteStorable objects.
 *
 * @author Martin Svensson
 * @version 1.0
 */
public class Merge{
  
  private static int mBlockSize = 4096;
  
  private static class Container implements Comparable{
    ByteStorable store;
    int node;
    
    public Container(ByteStorable bs, int node){
      store = bs;
      this.node = node;
    }
    
    public int compareTo(Object o){
      return this.store.compareTo(((Container)o).store);
    }

    public String toString(){
      return node+" "+store.toString();
    }
  }

  /**
   * Set the blocksize. How much data to read/write from disc. This
   * number should seldom be no more than 4096.
   *
   * @param blockSize the size
   */
  public static void setBlockSize(int blockSize){
    mBlockSize = blockSize;
  }

  /**
   * Merges a set of files into one large file. Merge will split the
   * input buffer into chuncks of size blockSize, each corresponding
   * to one input file. If the buffer is not large enough to hold a block
   * for each input file the merge will be carried out in several steps
   * (this to guarantee that sufficient data are read during every disc read).
   *
   * @param fNames[] the list of files to merge
   * @param template the type of objects to merge
   * @param input the input buffer
   * @param output the output buffer
   * @param outputChannel the channel to print the merged data
   * @param dir the dir where to find the input files
   * @param compressed true if the set of files are compressed using java.util.zip.DeflaterOutputStream.
   * @exception Exception if an error occurs
   */
  public static void merge(String fNames[], ByteStorable template, ByteBuffer input,
			   ByteBuffer output, WritableByteChannel outputChannel,
			   String dir, boolean compressed) throws Exception{

    ReadableByteChannel[] channels = new ReadableByteChannel[fNames.length];
    //FileChannel[] channels = new FileChannel[fNames.length];
    ByteBuffer[] buffers = new ByteBuffer[fNames.length];
    input.clear(); output.clear();
    Heap heap = new Heap(fNames.length);
    
    //fill local containers:
    for(int i = 0; i < fNames.length; i++){
      System.out.println(dir+"/"+fNames[i]);
      if (compressed)
	channels[i] 
	  = Channels.newChannel(new InflaterInputStream(new FileInputStream(dir+"/"+fNames[i]),
							new Inflater()));
      else
	channels[i] = (new FileInputStream(dir+"/"+fNames[i])).getChannel();
      //slice buffer:
      input.position(i * mBlockSize);
      input.limit((i+1) * mBlockSize);
      buffers[i] = input.slice();
      buffers[i].clear();
      buffers[i].flip();

      //input buffer:
      input(channels, buffers, i, heap, template);
    }

    //System.out.println(heap.get(0));
    //System.out.println(heap.get(1));


    //merge files:
    Container low;
    while(true){
      low = (Container) heap.delete();
      if(low == null)
	break;
      writeOutput(low.store, outputChannel, output);
      input(channels, buffers, low.node, heap, template);
    }
    //flush remaining:
    output.flip();
    outputChannel.write(output);
  }

  /**
   * Merges a set of files into one large file. This Merge does no conversion
   * from the input bytes to objects. Merge will split the
   * input buffer into chuncks of size blockSize, each corresponding
   * to one input file. If the buffer is not large enough to hold a block
   * for each input file the merge will be carried out in several steps
   * (this to guarantee that sufficient data are read during every disc read).
   *
   * @param fNames[] the list of files to merge
   * @param template the type of objects to merge
   * @param input the input buffer
   * @param output the output buffer
   * @param outputChannel the channel to print the merged data
   * @param bc The byte comparator
   * @param dir the dir where to find the input files
   * @param compressed true if the runs are compressed using java.util.zip.DeflaterOutputStream
   * @exception Exception if an error occurs
   */
  public static void merge(String fNames[], ByteStorable template, ByteBuffer input,
			   ByteBuffer output, WritableByteChannel outputChannel,
			   ByteComparable bc, String dir, boolean compressed) throws Exception{
    //create local containers:
    ReadableByteChannel[] channels = new ReadableByteChannel[fNames.length];
    ByteBuffer[] buffers = new ByteBuffer[fNames.length];
    //int[] offsets = new int[fNames.length];
    input.clear(); output.clear();
    BufferHeap bufferHeap;
    try{
      bufferHeap = new ByteHeap(input.array(), bc);
    }
    catch(Exception e){
      bufferHeap = new ByteBufferHeap(input, bc);
    }

    //fill local containers:
    input.clear();
    for(int i = 0; i < fNames.length; i++){
      
      if (compressed)
	channels[i] 
	  = Channels.newChannel(new InflaterInputStream(new FileInputStream(dir+"/"+fNames[i]),
							new Inflater()));
      else
	channels[i] = (new FileInputStream(dir+"/"+fNames[i])).getChannel();
      //slice buffer:
      input.position(i * mBlockSize);
      input.limit((i+1) * mBlockSize);
      buffers[i] = input.slice();
      buffers[i].clear();
      buffers[i].flip();

      //input buffer:
      input(channels, buffers, i*mBlockSize, bufferHeap, template, bc);
      
    }
    
    //merge files:
    int low;
    while(true){
      low = bufferHeap.delete();
      if(low == -1)
	break;
      writeOutput(low, input, outputChannel, output, template);
      input(channels, buffers, low, bufferHeap, template, bc);
    }
    //flush remaining:
    output.flip();
    outputChannel.write(output);
  }
  
  
  private static void writeOutput(int offset, ByteBuffer input, 
				  WritableByteChannel outChannel, ByteBuffer output,
				  ByteStorable template) throws Exception{
    input.position(offset);
    int size = template.byteSize(input);
    if(size > output.remaining()){
      output.flip();
      outChannel.write(output);
      output.clear();
    }
    input.limit(offset+size);
    output.put(input);
    input.limit(input.capacity());
  }

  private static void writeOutput(ByteStorable low, WritableByteChannel outChannel, 
				  ByteBuffer output) throws Exception{
    
    if(low.byteSize() > output.remaining()){
      output.flip();
      outChannel.write(output);
      output.clear();
    }
    low.toBytes(output);
  }

  private static void input(ReadableByteChannel[] channels, ByteBuffer[] buffers,
			    int node, Heap heap, ByteStorable template) throws Exception{
    if(!channels[node].isOpen())
      return;
    int slack = ByteStorable.slackOrSize(buffers[node], template);
    if(slack <= 0){
      ByteStorable.copyToBeginning(buffers[node], Math.abs(slack));
      if(channels[node].read(buffers[node])==-1){
	System.out.println("closing channel: "+slack);
	channels[node].close();
	return;
      }
      buffers[node].flip();
      //slack = template.byteSize(buffers[node]);
    }
    heap.insert(new Container(template.fromBytes(buffers[node]), node));
  }

  private static void input(ReadableByteChannel[] channels, ByteBuffer[] buffers,
			    int offset, BufferHeap heap, ByteStorable template, 
			    ByteComparable bc) throws Exception{
    
    int node = offset / mBlockSize; 
    int slack = ByteStorable.slackOrSize(buffers[node], template);

    if(slack <= 0){
      ByteStorable.copyToBeginning(buffers[node], Math.abs(slack));
      if(channels[node].read(buffers[node])==-1){
	channels[node].close();
	return;
      }
      buffers[node].flip();
      slack = template.byteSize(buffers[node]);
    }
    int pos = buffers[node].position();
    heap.insert((node * mBlockSize) + pos);
    buffers[node].position(pos+slack);
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -