📄 tailsource.java.old
字号:
package de.spieleck.app.jacson.source;
import java.io.*;
import java.util.LinkedList;
import java.util.List;
import de.spieleck.util.ThreadUtil;
import de.spieleck.app.jacson.*;
/**
* Implement a Source for @link Jacson which is continuously running
* and monitoring files for changes.
* <BR /><b>Note1:</b> This works under the char = byte assumption, i.e.
* does not treat unicode right!
* <BR /><b>Note2:</b> This is Java and their might be problems which
* are so plattform dependand, that there is no Java solutions (e.g.
* I do not expect this to help with lock rollovers and inodes...)
*/
public class TailSource
implements JacsonChunkSource
{
public final static String JACSON_STATE_CHUNK_TYPE
= "de.spieleck.app.jacson.source.TailSource/type";
/** The thread watching ALL files for change */
protected Thread metaThread = null;
/** Delay between inspections of queue */
protected int deltaTimeQueue = 1000;
/** The number of worker Threads */
protected int workers = 5;
/** The watched Resources */
protected List watched;
/** The queue of Files which need to be processed */
protected LinkedList fileQueue;
/** The queue of Chunks to be returned */
protected LinkedList chunkQueue;
public TailSource()
{
watched = new LinkedList();
}
/**
* Add file to be watched.
*/
public void addWatch(String label, String fName)
throws IOException
{
synchronized(watched)
{
watched.add(new WatchedFile(label, fName));
}
}
/**
* Start watching on the files.
* Calling this more than once will create strange results.
*/
public void startWatching()
{
synchronized(this)
{
if ( metaThread != null )
throw new IllegalStateException("Cannot start Watching twice");
metaThread = new MetaThread(workers);
}
chunkQueue = new LinkedList();
fileQueue = new LinkedList();
metaThread.start();
}
public String nextChunk(JacsonState state)
throws JacsonException
{
// Init if not yet done.
synchronized(this)
{
if ( metaThread == null )
startWatching();
}
String chunk = fetchChunk(state);
while(chunk == null && metaThread.isAlive())
{
if ( chunk == null )
ThreadUtil.sleep(deltaTimeQueue);
chunk = fetchChunk(state);
}
return chunk;
}
/**
* Get a chunk properly dequeued.
*/
protected String fetchChunk(JacsonState state)
{
synchronized( chunkQueue )
{
if(!chunkQueue.isEmpty())
{
MarkedChunk chunk = (MarkedChunk) chunkQueue.removeFirst();
state.put(JACSON_STATE_CHUNK_TYPE, chunk.getExtra());
return chunk.getChunk();
}
}
return null;
}
/**
* The MetaThread is actually both a Thread and a ThreadPool.
*/
public class MetaThread
extends Thread
{
Thread[] threads;
public MetaThread(int workers)
{
setDaemon(true);
threads = new WorkerThread[workers];
for(int i = 0; i< threads.length; i++ )
{
threads[i] = new WorkerThread(this);
threads[i].setDaemon(true);
}
}
public void run()
{
for(int i = 0; i< threads.length; i++ )
{
threads[i].start();
}
WatchedFile[] wfile = new WatchedFile[watched.size()];
//
while ( metaThread != null )
{
// Take a snapshot of the current watched files.
synchronized(watched)
{
wfile = (WatchedFile[]) watched.toArray(wfile);
}
for (int i = 0; i < wfile.length; i++)
{
if ( wfile[i] == null )
break;
synchronized(wfile[i])
{
if ( !wfile[i].isEnqueued() && wfile[i].isChanged() )
{
synchronized(fileQueue)
{
fileQueue.addLast(wfile[i]);
fileQueue.notify();
}
wfile[i].setEnqueued(true);
}
}
}
ThreadUtil.sleep(deltaTimeQueue);
}
finalize();
}
public Runnable getTask()
{
try
{
synchronized(fileQueue)
{
while ( fileQueue.isEmpty() )
{
fileQueue.wait();
}
Runnable r = (Runnable) fileQueue.removeFirst();
return r;
}
}
catch(InterruptedException e)
{
return null;
}
}
protected void finalize()
{
for(int i = 0; i < threads.length; i++)
threads[i].interrupt();
}
}
protected static class WorkerThread
extends Thread
{
private MetaThread meta;
public WorkerThread(MetaThread meta)
{
super();
this.meta = meta;
}
public void run()
{
while ( true )
{
Runnable task = meta.getTask();
task.run();
}
}
}
/**
* Encapsulate a file with the ability to check for new bytes
* and a Runnable to fetch new bytes.
*/
protected class WatchedFile
implements Runnable
{
/** A marker for abstraction purpose */
private String label;
/** The file underlying the Watch */
private RandomAccessFile file;
/** Are we enqueued? */
private boolean enqueued = false;
/** Stringbuffer, allocated once for the sake of speed */
private StringBuffer lineBuffer = new StringBuffer(80);
/** handle this stupid old MS-DOS cr lf thingie */
private boolean lastWasCR = false;
public WatchedFile(String label, String fname)
throws IOException
{
this.label = label;
file = new RandomAccessFile(fname,"r");
long pos = file.length();
file.seek(pos);
}
public boolean isChanged()
{
try
{
return (file.getFilePointer() < file.length());
}
catch(IOException e )
{
// We hope it will work again later!
return false;
}
}
public void setEnqueued(boolean enqueued)
{
this.enqueued = enqueued;
}
public boolean isEnqueued()
{
return enqueued;
}
public void run()
{
try
{
readAhead();
}
catch(Exception e)
{
e.printStackTrace();
}
synchronized(this)
{
setEnqueued(false);
}
}
/** Enqueue a finished line and clear the line buffer */
protected void putAChunk()
{
synchronized(chunkQueue)
{
chunkQueue.add(new MarkedChunk(lineBuffer.toString(),label));
lineBuffer.setLength(0);
}
}
/**
* Modified handling of CR LF and EOF compared to Sun implementation.
* Efficient handling of partial reads.
*/
public void readAhead()
throws IOException
{
// XXX revisit the readLine() if this can be trusted...
// This has tha advantage it can stop in the middle of the line
// in case of slow feeds...
int c;
long i = file.length() - file.getFilePointer();
while ( i > 0 )
{
while ( i-- > 0 )
{
c = file.read();
if ( c == '\n' )
{
// CR LF is the same as CR alone :-(
if ( !lastWasCR )
putAChunk();
lastWasCR = false;
}
else if ( c == '\r' || c == -1 )
{
putAChunk();
lastWasCR = true;
}
else
{
lineBuffer.append((char)c);
lastWasCR = false;
}
}
i = file.length() - file.getFilePointer();
}
}
public String toString()
{
return label;
}
}
public String message(JacsonReport jsr)
{
return "TailSource";
}
public void summary(JacsonReport jsr)
{
}
}
//
// Jacson - Text Filtering with Java.
// Copyright (C) 2002 Frank S. Nestel (nestefan -at- users.sourceforge.net)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -