📄 ddbaseimpl.java
字号:
/*
* Created on 18-Feb-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package org.gudy.azureus2.pluginsimpl.local.ddb;
import java.net.InetSocketAddress;
import java.util.*;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.SHA1Simple;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.plugins.PluginInterface;
import org.gudy.azureus2.plugins.ddb.*;
import com.aelitis.azureus.core.AzureusCore;
import com.aelitis.azureus.plugins.dht.DHTPlugin;
import com.aelitis.azureus.plugins.dht.DHTPluginContact;
import com.aelitis.azureus.plugins.dht.DHTPluginKeyStats;
import com.aelitis.azureus.plugins.dht.DHTPluginOperationListener;
import com.aelitis.azureus.plugins.dht.DHTPluginProgressListener;
import com.aelitis.azureus.plugins.dht.DHTPluginTransferHandler;
import com.aelitis.azureus.plugins.dht.DHTPluginValue;
/**
* @author parg
*
*/
public class
DDBaseImpl
implements DistributedDatabase
{
private static DDBaseImpl singleton;
protected static AEMonitor class_mon = new AEMonitor( "DDBaseImpl:class");
protected static Map transfer_map = new HashMap();
private static DDBaseTTTorrent torrent_transfer;
public static DistributedDatabase
getSingleton(
AzureusCore azureus_core )
{
try{
class_mon.enter();
if ( singleton == null ){
singleton = new DDBaseImpl( azureus_core );
}
}finally{
class_mon.exit();
}
return( singleton );
}
private AzureusCore azureus_core;
private DHTPlugin dht_use_accessor;
protected
DDBaseImpl(
final AzureusCore _azureus_core )
{
azureus_core = _azureus_core;
torrent_transfer = new DDBaseTTTorrent( _azureus_core, this );
grabDHT();
}
protected DHTPlugin
grabDHT()
{
if ( dht_use_accessor != null ){
return( dht_use_accessor );
}
try{
class_mon.enter();
if( dht_use_accessor == null ){
PluginInterface dht_pi =
azureus_core.getPluginManager().getPluginInterfaceByClass(
DHTPlugin.class );
if ( dht_pi != null ){
dht_use_accessor = (DHTPlugin)dht_pi.getPlugin();
if ( dht_use_accessor.isEnabled()){
try{
addTransferHandler( torrent_transfer, torrent_transfer );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
}
}finally{
class_mon.exit();
}
return( dht_use_accessor );
}
public boolean
isAvailable()
{
DHTPlugin dht = grabDHT();
if ( dht == null ){
return( false );
}
return( dht.isEnabled());
}
public boolean
isExtendedUseAllowed()
{
DHTPlugin dht = grabDHT();
if ( dht == null ){
return( false );
}
return( dht.isExtendedUseAllowed());
}
protected void
throwIfNotAvailable()
throws DistributedDatabaseException
{
if ( !isAvailable()){
throw( new DistributedDatabaseException( "DHT not available" ));
}
}
protected DHTPlugin
getDHT()
throws DistributedDatabaseException
{
throwIfNotAvailable();
return( grabDHT());
}
protected void
log(
String str )
{
DHTPlugin dht = grabDHT();
if ( dht != null ){
dht.log( str );
}
}
public DistributedDatabaseKey
createKey(
Object key )
throws DistributedDatabaseException
{
throwIfNotAvailable();
return( new DDBaseKeyImpl( key ));
}
public DistributedDatabaseKey
createKey(
Object key,
String description )
throws DistributedDatabaseException
{
throwIfNotAvailable();
return( new DDBaseKeyImpl( key, description ));
}
public DistributedDatabaseValue
createValue(
Object value )
throws DistributedDatabaseException
{
throwIfNotAvailable();
return( new DDBaseValueImpl( new DDBaseContactImpl( this, getDHT().getLocalAddress()), value, SystemTime.getCurrentTime(), -1));
}
public DistributedDatabaseContact
importContact(
InetSocketAddress address )
throws DistributedDatabaseException
{
throwIfNotAvailable();
DHTPluginContact contact = getDHT().importContact( address );
if ( contact == null ){
throw( new DistributedDatabaseException( "import of '" + address + "' failed" ));
}
return( new DDBaseContactImpl( this, contact));
}
public void
write(
DistributedDatabaseListener listener,
DistributedDatabaseKey key,
DistributedDatabaseValue value )
throws DistributedDatabaseException
{
write( listener, key, new DistributedDatabaseValue[]{ value } );
}
public void
write(
final DistributedDatabaseListener listener,
final DistributedDatabaseKey key,
final DistributedDatabaseValue values[] )
throws DistributedDatabaseException
{
throwIfNotAvailable();
for (int i=0;i<values.length;i++){
if (((DDBaseValueImpl)values[i]).getBytes().length > DDBaseValueImpl.MAX_VALUE_SIZE ){
throw( new DistributedDatabaseException("Value size limited to " + DDBaseValueImpl.MAX_VALUE_SIZE + " bytes" ));
}
}
if ( values.length == 0 ){
delete( listener, key );
}else if ( values.length == 1 ){
getDHT().put(
((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
((DDBaseValueImpl)values[0]).getBytes(),
DHTPlugin.FLAG_SINGLE_VALUE,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_WRITTEN, key, 0, false, false ));
}else{
// TODO: optimise re-publishing to avoid republishing everything each time
/*
DHTPluginValue old_value = dht.getLocalValue( ((DDBaseKeyImpl)key).getBytes());
List old_values = new ArrayList();
if ( old_value != null ){
if (( old_value.getFlags() & DHTPlugin.FLAG_MULTI_VALUE ) == 0 ){
old_values.add( old_value.getValue());
}else{
byte[] encoded = old_value.getValue();
}
}
*/
byte[] current_key = ((DDBaseKeyImpl)key).getBytes();
// format is: <continuation> <len><len><data>
byte[] payload = new byte[DHTPlugin.MAX_VALUE_SIZE];
int payload_length = 1;
int pos = 0;
while( pos < values.length ){
DDBaseValueImpl value = (DDBaseValueImpl)values[pos];
byte[] bytes = value.getBytes();
int len = bytes.length;
if ( payload_length + len < payload.length - 2 ){
payload[payload_length++] = (byte)(( len & 0x0000ff00 ) >> 8);
payload[payload_length++] = (byte) ( len & 0x000000ff );
System.arraycopy( bytes, 0, payload, payload_length, len );
payload_length += len;
pos++;
}else{
payload[0] = 1;
final byte[] copy = new byte[payload_length];
System.arraycopy( payload, 0, copy, 0, copy.length );
final byte[] f_current_key = current_key;
getDHT().put(
f_current_key,
key.getDescription(),
copy,
DHTPlugin.FLAG_MULTI_VALUE,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_WRITTEN, key, 0, false, false ));
payload_length = 1;
current_key = new SHA1Simple().calculateHash( current_key );
}
}
if ( payload_length > 1 ){
payload[0] = 0;
final byte[] copy = new byte[payload_length];
System.arraycopy( payload, 0, copy, 0, copy.length );
final byte[] f_current_key = current_key;
getDHT().put(
f_current_key,
key.getDescription(),
copy,
DHTPlugin.FLAG_MULTI_VALUE,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_WRITTEN, key, 0, false, false ));
}
}
}
public void
read(
DistributedDatabaseListener listener,
DistributedDatabaseKey key,
long timeout )
throws DistributedDatabaseException
{
read( listener, key, timeout, OP_NONE );
}
public void
read(
final DistributedDatabaseListener listener,
final DistributedDatabaseKey key,
final long timeout,
int options )
throws DistributedDatabaseException
{
throwIfNotAvailable();
boolean exhaustive = (options&OP_EXHAUSTIVE_READ)!=0;
boolean high_priority = (options&OP_PRIORITY_HIGH)!=0;
// TODO: max values?
getDHT().get(
((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
(byte)0,
256,
timeout,
exhaustive,
high_priority,
new listenerMapper( listener, DistributedDatabaseEvent.ET_VALUE_READ, key, timeout, exhaustive, high_priority ));
}
public void
readKeyStats(
DistributedDatabaseListener listener,
DistributedDatabaseKey key,
long timeout )
throws DistributedDatabaseException
{
throwIfNotAvailable();
getDHT().get(
((DDBaseKeyImpl)key).getBytes(),
key.getDescription(),
DHTPlugin.FLAG_STATS,
256,
timeout,
false,
false,
new listenerMapper( listener, DistributedDatabaseEvent.ET_KEY_STATS_READ, key, timeout, false, false ));
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -