📄 diskaccesscontrollerinstance.java
字号:
request_sem.release();
if ( thread == null ){
thread =
new AEThread("DiskAccessController:requestDispatcher[" + index + "]", true )
{
public void
runSupport()
{
tls.set( this );
while( true ){
if ( request_sem.reserve( 30000 )){
DiskAccessRequestImpl request;
List aggregated = null;
synchronized( requests ){
request = (DiskAccessRequestImpl)requests.remove(0);
if ( enable_aggregation && request.getPriority() < 0 ){
CacheFile file = request.getFile();
Map file_map = (Map)request_map.get( file );
file_map.remove( new Long( request.getOffset()));
if ( !request.isCancelled()){
DiskAccessRequestImpl current = request;
long aggregated_bytes = 0;
try{
while( true ){
int current_size = current.getSize();
long end = current.getOffset() + current_size;
// doesn't matter if we remove from this and don't end up using it
DiskAccessRequestImpl next = (DiskAccessRequestImpl)file_map.remove( new Long( end ));
if ( next == null || next.isCancelled() ||
!next.canBeAggregatedWith( request )){
break;
}
requests.remove( next );
if ( !request_sem.reserve( 30000 )){
// semaphore should already be > 0 as we've removed an element...
Debug.out( "shouldn't happen" );
}
if ( aggregated == null ){
aggregated = new ArrayList(8);
aggregated.add( current );
aggregated_bytes += current_size;
}
aggregated.add( next );
aggregated_bytes += next.getSize();
if ( aggregated.size() > aggregation_request_limit || aggregated_bytes >= aggregation_byte_limit ){
break;
}
current = next;
}
}finally{
if ( aggregated != null ){
total_aggregated_requests_made++;
/*
System.out.println(
"aggregated read: requests=" + aggregated.size() +
", size=" + aggregated_bytes +
", a_reqs=" + requests.size() +
", f_reqs=" + file_map.size());
*/
}else{
total_single_requests_made++;
}
}
}
}
}
try{
long io_start = SystemTime.getHighPrecisionCounter();
if ( aggregated == null ){
try{
request.runRequest();
}finally{
long io_end = SystemTime.getHighPrecisionCounter();
io_time += ( io_end - io_start );
total_single_bytes += request.getSize();
releaseSpaceAllowance( request );
}
}else{
DiskAccessRequestImpl[] requests = (DiskAccessRequestImpl[])aggregated.toArray( new DiskAccessRequestImpl[ aggregated.size()]);
try{
DiskAccessRequestImpl.runAggregated( request, requests );
}finally{
long io_end = SystemTime.getHighPrecisionCounter();
io_time += ( io_end - io_start );
for (int i=0;i<requests.length;i++){
DiskAccessRequestImpl r = requests[i];
total_aggregated_bytes += r.getSize();
releaseSpaceAllowance( r );
}
}
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}else{
synchronized( requests ){
if ( requests.size() == 0 ){
thread = null;
break;
}
}
}
}
}
};
thread.start();
}
}
}
}
protected long
getLastRequestTime()
{
return( last_request_time );
}
protected void
setLastRequestTime(
long l )
{
last_request_time = l;
}
protected int
size()
{
return( requests.size());
}
}
protected static class
groupSemaphore
{
private int value;
private List waiters = new LinkedList();
private long blocks;
protected
groupSemaphore(
int _value )
{
value = _value;
}
protected long
getBlockCount()
{
return( blocks );
}
protected void
reserveGroup(
int num )
{
mutableInteger wait;
synchronized( this ){
// for fairness we only return immediately if we can and there are no waiters
if ( num <= value && waiters.size() == 0 ){
value -= num;
return;
}else{
blocks++;
wait = new mutableInteger( num - value );
value = 0;
waiters.add( wait );
}
}
wait.reserve();
}
protected void
releaseGroup(
int num )
{
synchronized( this ){
if ( waiters.size() == 0 ){
// no waiters we just increment the value
value += num;
}else{
// otherwise we share num out amongst the waiters in order
while( waiters.size() > 0 ){
mutableInteger wait = (mutableInteger)waiters.get(0);
int wait_num = wait.getValue();
if ( wait_num <= num ){
// we've got enough now to release this waiter
wait.release();
waiters.remove(0);
num -= wait_num;
}else{
wait.setValue( wait_num - num );
num = 0;
break;
}
}
// if we have any left over then save it
value = num;
}
}
}
protected static class
mutableInteger
{
private int i;
private boolean released;
protected
mutableInteger(
int _i )
{
i = _i;
}
protected int
getValue()
{
return( i );
}
protected void
setValue(
int _i )
{
i = _i;
}
protected void
release()
{
synchronized( this ){
released = true;
notify();
}
}
protected void
reserve()
{
synchronized( this ){
if ( released ){
return;
}
try{
int spurious_count = 0;
while( true ){
wait();
if ( released ){
break;
}else{
spurious_count++;
if ( spurious_count > 1024 ){
Debug.out( "DAC::mutableInteger: spurious wakeup limit exceeded" );
throw( new RuntimeException( "die die die" ));
}else{
Debug.out("DAC::mutableInteger: spurious wakeup, ignoring" );
}
}
}
}catch( InterruptedException e ){
throw( new RuntimeException("Semaphore: operation interrupted" ));
}
}
}
}
}
public static void
main(
String[] args )
{
final groupSemaphore sem = new groupSemaphore( 9 );
for (int i=0;i<10;i++){
new Thread()
{
public void
run()
{
int count = 0;
while( true ){
int group =RandomUtils.generateRandomIntUpto( 10 );
System.out.println( Thread.currentThread().getName() + " reserving " + group );
sem.reserveGroup( group );
try{
Thread.sleep(5 + RandomUtils.generateRandomIntUpto(5));
}catch( Throwable e ){
}
sem.releaseGroup( group );
count++;
if ( count %100 == 0 ){
System.out.println( Thread.currentThread().getName() + ": " + count + " ops" );
}
}
}
}.start();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -