📄 merge.java
字号:
/*
* Copyright (c) 2000-2004, Rickard C鰏ter, Martin Svensson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of SICS nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*
*/
package com.mellowtech.disc.sort;
import com.mellowtech.disc.*;
import java.nio.*;
import java.nio.channels.*;
import java.io.*;
import java.util.zip.*;
/**
* Merge a set of sorted files into one large file. This Merge
* operates on ByteStorable objects.
*
* @author Martin Svensson
* @version 1.0
*/
public class Merge{
private static int mBlockSize = 4096;
private static class Container implements Comparable{
ByteStorable store;
int node;
public Container(ByteStorable bs, int node){
store = bs;
this.node = node;
}
public int compareTo(Object o){
return this.store.compareTo(((Container)o).store);
}
public String toString(){
return node+" "+store.toString();
}
}
/**
* Set the blocksize. How much data to read/write from disc. This
* number should seldom be no more than 4096.
*
* @param blockSize the size
*/
public static void setBlockSize(int blockSize){
mBlockSize = blockSize;
}
/**
* Merges a set of files into one large file. Merge will split the
* input buffer into chuncks of size blockSize, each corresponding
* to one input file. If the buffer is not large enough to hold a block
* for each input file the merge will be carried out in several steps
* (this to guarantee that sufficient data are read during every disc read).
*
* @param fNames[] the list of files to merge
* @param template the type of objects to merge
* @param input the input buffer
* @param output the output buffer
* @param outputChannel the channel to print the merged data
* @param dir the dir where to find the input files
* @param compressed true if the set of files are compressed using java.util.zip.DeflaterOutputStream.
* @exception Exception if an error occurs
*/
public static void merge(String fNames[], ByteStorable template, ByteBuffer input,
ByteBuffer output, WritableByteChannel outputChannel,
String dir, boolean compressed) throws Exception{
ReadableByteChannel[] channels = new ReadableByteChannel[fNames.length];
//FileChannel[] channels = new FileChannel[fNames.length];
ByteBuffer[] buffers = new ByteBuffer[fNames.length];
input.clear(); output.clear();
Heap heap = new Heap(fNames.length);
//fill local containers:
for(int i = 0; i < fNames.length; i++){
System.out.println(dir+"/"+fNames[i]);
if (compressed)
channels[i]
= Channels.newChannel(new InflaterInputStream(new FileInputStream(dir+"/"+fNames[i]),
new Inflater()));
else
channels[i] = (new FileInputStream(dir+"/"+fNames[i])).getChannel();
//slice buffer:
input.position(i * mBlockSize);
input.limit((i+1) * mBlockSize);
buffers[i] = input.slice();
buffers[i].clear();
buffers[i].flip();
//input buffer:
input(channels, buffers, i, heap, template);
}
//System.out.println(heap.get(0));
//System.out.println(heap.get(1));
//merge files:
Container low;
while(true){
low = (Container) heap.delete();
if(low == null)
break;
writeOutput(low.store, outputChannel, output);
input(channels, buffers, low.node, heap, template);
}
//flush remaining:
output.flip();
outputChannel.write(output);
}
/**
* Merges a set of files into one large file. This Merge does no conversion
* from the input bytes to objects. Merge will split the
* input buffer into chuncks of size blockSize, each corresponding
* to one input file. If the buffer is not large enough to hold a block
* for each input file the merge will be carried out in several steps
* (this to guarantee that sufficient data are read during every disc read).
*
* @param fNames[] the list of files to merge
* @param template the type of objects to merge
* @param input the input buffer
* @param output the output buffer
* @param outputChannel the channel to print the merged data
* @param bc The byte comparator
* @param dir the dir where to find the input files
* @param compressed true if the runs are compressed using java.util.zip.DeflaterOutputStream
* @exception Exception if an error occurs
*/
public static void merge(String fNames[], ByteStorable template, ByteBuffer input,
ByteBuffer output, WritableByteChannel outputChannel,
ByteComparable bc, String dir, boolean compressed) throws Exception{
//create local containers:
ReadableByteChannel[] channels = new ReadableByteChannel[fNames.length];
ByteBuffer[] buffers = new ByteBuffer[fNames.length];
//int[] offsets = new int[fNames.length];
input.clear(); output.clear();
BufferHeap bufferHeap;
try{
bufferHeap = new ByteHeap(input.array(), bc);
}
catch(Exception e){
bufferHeap = new ByteBufferHeap(input, bc);
}
//fill local containers:
input.clear();
for(int i = 0; i < fNames.length; i++){
if (compressed)
channels[i]
= Channels.newChannel(new InflaterInputStream(new FileInputStream(dir+"/"+fNames[i]),
new Inflater()));
else
channels[i] = (new FileInputStream(dir+"/"+fNames[i])).getChannel();
//slice buffer:
input.position(i * mBlockSize);
input.limit((i+1) * mBlockSize);
buffers[i] = input.slice();
buffers[i].clear();
buffers[i].flip();
//input buffer:
input(channels, buffers, i*mBlockSize, bufferHeap, template, bc);
}
//merge files:
int low;
while(true){
low = bufferHeap.delete();
if(low == -1)
break;
writeOutput(low, input, outputChannel, output, template);
input(channels, buffers, low, bufferHeap, template, bc);
}
//flush remaining:
output.flip();
outputChannel.write(output);
}
private static void writeOutput(int offset, ByteBuffer input,
WritableByteChannel outChannel, ByteBuffer output,
ByteStorable template) throws Exception{
input.position(offset);
int size = template.byteSize(input);
if(size > output.remaining()){
output.flip();
outChannel.write(output);
output.clear();
}
input.limit(offset+size);
output.put(input);
input.limit(input.capacity());
}
private static void writeOutput(ByteStorable low, WritableByteChannel outChannel,
ByteBuffer output) throws Exception{
if(low.byteSize() > output.remaining()){
output.flip();
outChannel.write(output);
output.clear();
}
low.toBytes(output);
}
private static void input(ReadableByteChannel[] channels, ByteBuffer[] buffers,
int node, Heap heap, ByteStorable template) throws Exception{
if(!channels[node].isOpen())
return;
int slack = ByteStorable.slackOrSize(buffers[node], template);
if(slack <= 0){
ByteStorable.copyToBeginning(buffers[node], Math.abs(slack));
if(channels[node].read(buffers[node])==-1){
System.out.println("closing channel: "+slack);
channels[node].close();
return;
}
buffers[node].flip();
//slack = template.byteSize(buffers[node]);
}
heap.insert(new Container(template.fromBytes(buffers[node]), node));
}
private static void input(ReadableByteChannel[] channels, ByteBuffer[] buffers,
int offset, BufferHeap heap, ByteStorable template,
ByteComparable bc) throws Exception{
int node = offset / mBlockSize;
int slack = ByteStorable.slackOrSize(buffers[node], template);
if(slack <= 0){
ByteStorable.copyToBeginning(buffers[node], Math.abs(slack));
if(channels[node].read(buffers[node])==-1){
channels[node].close();
return;
}
buffers[node].flip();
slack = template.byteSize(buffers[node]);
}
int pos = buffers[node].position();
heap.insert((node * mBlockSize) + pos);
buffers[node].position(pos+slack);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -