📄 indexserver.java
字号:
package it.unimi.dsi.mg4j.index.remote;/* * MG4J: Managing Gigabytes for Java * * Copyright (C) 2006-2007 Sebastiano Vigna * * This library is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by the Free * Software Foundation; either version 2.1 of the License, or (at your option) * any later version. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License * for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */import it.unimi.dsi.fastutil.io.BinIO;import it.unimi.dsi.mg4j.index.BitStreamIndex;import it.unimi.dsi.mg4j.index.Index;import it.unimi.dsi.mg4j.index.CompressionFlags.Coding;import it.unimi.dsi.Util;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.net.InetAddress;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketAddress;import java.net.URISyntaxException;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import org.apache.commons.configuration.ConfigurationException;import org.apache.log4j.Logger;import com.martiansoftware.jsap.FlaggedOption;import com.martiansoftware.jsap.JSAP;import com.martiansoftware.jsap.JSAPException;import com.martiansoftware.jsap.JSAPResult;import com.martiansoftware.jsap.Parameter;import com.martiansoftware.jsap.SimpleJSAP;import com.martiansoftware.jsap.Switch;import com.martiansoftware.jsap.UnflaggedOption;/** A daemon exposing an index remotely. * * <p>MG4J provides remote access to indices using a client/server pair. A thread * running the {@link #start(Index, InetAddress, int, boolean)} method of this * class acts as a server. * * <p>Once a server is up and running, clients can connect by calling the * {@link #getIndex(String, int, boolean, boolean)} method, which sends to * the server a {@link #GET_INDEX} command and returns a (serialised) {@link it.unimi.dsi.mg4j.index.Index} * that will handle all communication with the server transparently. The static * factory methods in {@link it.unimi.dsi.mg4j.index.Index} (such as {@link it.unimi.dsi.mg4j.index.Index#getInstance(CharSequence)}) * will turn URIs such as <samp>mg4j://localhost</samp> into calls * to {@link #getIndex(String, int, boolean, boolean)}. * * <p>Presently there are two main kinds of remote indices: those exposing the local * bitstream remotely, and those exposing the results of document iterators remotely. * In the first case, the bitstream is passed over the net, and it is decoded locally * (it is <em>very</em> advisable not using a compression method requiring document sizes in that case, * as the size list will be serialised into the returned {@link it.unimi.dsi.mg4j.index.Index} instance). * When started, an index server will by default try to expose the local bitstream, * if possible, but it can be forced not to do so with a suitable parameter. Depending * on the kind of access requested, the index server will return instances of * {@link it.unimi.dsi.mg4j.index.remote.RemoteBitStreamIndex} * or instances of {@link it.unimi.dsi.mg4j.index.remote.RemoteIndex}. The two approaches vary * wildly in performance, and some profiling with specific applications is advisable * before choosing one method over the other. * * <p>All other server commands available spawn a new thread that will handle a specific * data structure over the newly created socket. Thus, after writing the command * {@link #GET_SIZE_LIST} the server will start a {@link it.unimi.dsi.mg4j.index.remote.RemoteSizeList.ServerThread} * and answer to queries about the size list, whereas after writing the command * {@link #GET_INDEX_READER} the server will start a * {@link it.unimi.dsi.mg4j.index.remote.RemoteIndexReader.ServerThread} * and answer to queries about the underlying local {@link it.unimi.dsi.mg4j.index.IndexReader}. * * <p>For simplicity and easiness in code editing, every client class starting a thread * (e.g., {@link it.unimi.dsi.mg4j.index.remote.RemoteTermMap}, {@link it.unimi.dsi.mg4j.index.remote.RemoteIndexReader}, * …) sports a nested static class called <code>ServerThread</code> * containing the code of the corresponding server thread. In this * way client and server code sit in the same source file, making editing and debugging simpler. * * @author Alessandro Arrabito * @author Sebastiano Vigna */public class IndexServer { public final static int DEFAULT_PORT = 9090; private static final Logger LOGGER = Util.getLogger( IndexServer.class ); public static final byte GET_INDEX = 0; public static final byte GET_INDEX_READER = 1; public static final byte GET_TERM_MAP = 2; public static final byte GET_PREFIX_MAP = 3; public static final byte GET_SIZE_LIST = 4; public static final byte GET_OFFSET_LIST = 5; public static final byte GET_CLIENT_INPUT_STREAM = 6; /** Returns an index object corresponding a given index server specified by host and port. * @param host the server host. * @param port the server port, or -1 for {@link #DEFAULT_PORT}. * @param randomAccess whether the index should be accessible randomly. * @param documentSizes if true, document sizes will be loaded (note that sometimes document sizes * might be loaded anyway because the compression method for positions requires it). */ public static Index getIndex( final String host, final int port, final boolean randomAccess, final boolean documentSizes ) throws IOException, ClassNotFoundException { final Socket socket = new Socket( host, port == -1 ? DEFAULT_PORT : port ); LOGGER.debug( "Accessing remote index at " + host + ":" + port + "..." ); final DataOutputStream outputStream = new DataOutputStream( socket.getOutputStream() ); outputStream.writeByte( GET_INDEX ); outputStream.writeBoolean( randomAccess ); outputStream.writeBoolean( documentSizes ); outputStream.flush(); Index index = (Index)BinIO.loadObject( socket.getInputStream() ); socket.close(); LOGGER.debug( "Index at " + socket + " downloaded: " + index ); return index; } /** Starts an index server. * @param index the underlying index. * @param serverSocket the daemon socket. * @param forceRemoteIndex force the thread to use a {@link RemoteIndex} instead * of a {@link RemoteBitStreamIndex}, even if the latter would be usable. */ public static void start( final Index index, final ServerSocket serverSocket, boolean forceRemoteIndex ) throws IOException { LOGGER.info( "Index server started at " + serverSocket.getLocalSocketAddress() ); ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newCachedThreadPool(); Socket socket; final SocketAddress localSocketAddress = serverSocket.getLocalSocketAddress(); int command; for ( ;; ) { socket = serverSocket.accept(); command = socket.getInputStream().read(); LOGGER.debug( "Remote command: " + command ); switch( command ) { case GET_INDEX: DataInputStream dis = new DataInputStream( socket.getInputStream() ); DataOutputStream dos = new DataOutputStream( socket.getOutputStream() ); boolean randomAccess = dis.readBoolean(); boolean documentSizes = dis.readBoolean(); if ( index instanceof BitStreamIndex && ! forceRemoteIndex ) { BitStreamIndex localIndex = (BitStreamIndex)index; if ( randomAccess && localIndex.offsets == null ) { randomAccess = false; LOGGER.warn( "Random access will not be available for index " + localIndex ); } /** Note that in case of Golomb or interpolative position coding * we are forced to serialise and transfer the entire size list, * or decoding would be too slow. */ BinIO.storeObject( new RemoteBitStreamIndex( localSocketAddress, index.numberOfDocuments, index.numberOfTerms, index.numberOfPostings, index.numberOfOccurrences, index.maxCount, localIndex.payload, localIndex.frequencyCoding, localIndex.pointerCoding, localIndex.countCoding, localIndex.positionCoding, localIndex.quantum, localIndex.height, localIndex.bufferSize, localIndex.termProcessor, localIndex.field, localIndex.properties, localIndex.termMap != null ? new RemoteTermMap( localSocketAddress, index.numberOfTerms ) : null, localIndex.prefixMap != null ? new RemotePrefixMap( localSocketAddress, index.numberOfTerms ) : null, localIndex.positionCoding == Coding.GOLOMB || localIndex.positionCoding == Coding.INTERPOLATIVE ? localIndex.sizes : ( documentSizes ? new RemoteSizeList( localSocketAddress, localIndex.numberOfDocuments ) : null ), randomAccess ? new RemoteOffsetList( localSocketAddress, localIndex.offsets.size() ) : null ), dos ); } else BinIO.storeObject( new RemoteIndex( localSocketAddress, index.numberOfDocuments, index.numberOfTerms, index.numberOfPostings, index.numberOfOccurrences, index.maxCount, index.payload, index.hasCounts, index.hasPositions, index.termProcessor, index.field, ( documentSizes ? new RemoteSizeList( localSocketAddress, index.numberOfDocuments ) : null ), index.properties ), dos ); dos.flush(); break; case GET_INDEX_READER: threadPool.execute( new RemoteIndexReader.ServerThread( socket, index ) ); break; case GET_TERM_MAP: threadPool.execute( new RemoteTermMap.ServerThread( socket, ((BitStreamIndex)index).termMap ) ); break; case GET_PREFIX_MAP: threadPool.execute( new RemotePrefixMap.ServerThread( socket, ((BitStreamIndex)index).prefixMap ) ); break; case GET_SIZE_LIST: threadPool.execute( new RemoteSizeList.ServerThread( socket, index.sizes ) ); break; case GET_OFFSET_LIST: threadPool.execute( new RemoteOffsetList.ServerThread( socket, ((BitStreamIndex)index).offsets ) ); break; case GET_CLIENT_INPUT_STREAM: threadPool.execute( new RemoteInputStream.ServerThread( socket, ((BitStreamIndex)index).getInputStream() ) ); break; } } } /** Starts an index-server daemon thread. * @param index the underlying index. * @param address the IP address for the daemon socket. * @param port the IP port for the daemon socket. * @param forceRemoteIndex force the thread to use a {@link RemoteIndex} instead * of a {@link RemoteBitStreamIndex}, even if the latter would be usable. */ public static void start( final Index index, final InetAddress address, final int port, boolean forceRemoteIndex ) throws IOException { start( index, new ServerSocket( port, 0, address ), forceRemoteIndex ); } public static void main( final String[] arg ) throws ConfigurationException, IOException, URISyntaxException, ClassNotFoundException, JSAPException, SecurityException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { SimpleJSAP jsap = new SimpleJSAP( IndexServer.class.getName(), "Starts a server index daemon.", new Parameter[] { new FlaggedOption( "port", JSAP.INTEGER_PARSER, "9090", JSAP.NOT_REQUIRED, 'p', "port", "The server port." ), new Switch( "forceremote", 'f', "force-remote", "Forces a remote index instead of a remote bitstream index." ), new UnflaggedOption( "ipaddr", JSAP.INETADDRESS_PARSER, JSAP.REQUIRED, "The server address." ), new UnflaggedOption( "basename", JSAP.STRING_PARSER, JSAP.REQUIRED, "The basename or uri of the index" ) } ); JSAPResult jsapResult = jsap.parse( arg ); if ( jsap.messagePrinted() ) return; int port = jsapResult.getInt( "port" ); String basename = jsapResult.getString( "basename" ); boolean forceRemote = jsapResult.getBoolean( "forceremote" ); start( Index.getInstance( basename ), jsapResult.getInetAddress( "ipaddr"), port, forceRemote ); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -