📄 tailsource.java
字号:
package de.spieleck.app.jacson.source;
import java.io.RandomAccessFile;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import de.spieleck.util.ThreadUtil;
import de.spieleck.util.ThreadPool;
import de.spieleck.util.TaskQueue;
import de.spieleck.app.jacson.JacsonException;
/**
* Implement a {@link de.spieleck.app.jacson.JacsonChunkSource JacsonChunkSource}
* which is continuously running
* and monitoring files for changes.
* <BR /><b>Note1:</b> This works under the char = byte assumption, i.e.
* does not treat unicode right!
* <BR /><b>Note2:</b> This is Java and their might be problems which
* are so plattform dependand, that there is no Java solutions (e.g.
* I do not expect this to help with lock rollovers and inodes...)
* @author fsn
*/
public class TailSource
extends ChunkSourceBase
{
public final static String JACSON_STATE_CHUNK_TYPE
= "de.spieleck.app.jacson.source.TailSource/type";
/** The thread watching ALL files for change */
protected Thread metaThread = null;
/** A ThreadPool of assignments to read files. */
protected ThreadPool threads;
/** The threads are queued into above pool by the ThreadQueue. */
protected TaskQueue tQueue;
/** Delay between inspections of queue */
protected int deltaTime = 50;
/** The number of worker Threads */
protected int workers = 5;
/** The watched Resources */
protected List watched;
/** The queue of Chunks to be returned */
protected LinkedList chunkQueue;
public TailSource()
{
watched = new LinkedList();
}
/** Set the number of workers */
public void setWorkers(int workers)
{
synchronized(this)
{
if ( metaThread != null )
throw new IllegalStateException("Cannot change worker while running.");
this.workers = workers;
}
}
/** Set the delay between file checks */
public void setDeltaTime(int deltaTime)
{
this.deltaTime = deltaTime;
}
/**
* Add file to be watched.
*/
public void addWatch(String label, String fName)
throws IOException
{
synchronized(watched)
{
watched.add(new WatchedFile(label, fName));
}
}
/**
* Start watching on the files.
* Calling this more than once will create strange results.
*/
public void startWatching()
{
synchronized(this)
{
if ( metaThread != null )
throw new IllegalStateException("Cannot start Watching twice");
metaThread = new MetaThread();
}
chunkQueue = new LinkedList();
tQueue = new TaskQueue();
threads = new ThreadPool(workers, tQueue);
metaThread.start();
}
public String nextChunk()
throws JacsonException
{
// Init if not yet done.
synchronized(this)
{
if ( threads == null )
startWatching();
}
// Time to finish?!
if ( metaThread == null )
return null;
MarkedChunk chunk;
synchronized ( chunkQueue )
{
try
{
if ( chunkQueue.isEmpty() )
chunkQueue.wait();
}
catch(InterruptedException e)
{
e.printStackTrace();
return null;
}
chunk = (MarkedChunk) chunkQueue.removeFirst();
}
getRegState().put(JACSON_STATE_CHUNK_TYPE, chunk.getExtra());
return chunk.getChunk();
}
/**
* Wait for finish.
* Note this is not a well defined notion, since logfiles may
* every grow. Therefore this method can exhibt both failes:
* "never return" or "return prematurely" depending on application.
* It was good though for the JUnit test.
*/
public void join()
{
for(int i = 0; i < 2; i++)
{
threads.join();
synchronized(metaThread)
{
try
{
// We wait 2 times to be sure that every source has
// been asked at least once.
metaThread.wait();
metaThread.wait();
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
}
}
}
/**
* The MetaThread enqueues the changing files
*/
public class MetaThread
extends Thread
{
public MetaThread()
{
setDaemon(true);
}
public void run()
{
WatchedFile[] wfile = new WatchedFile[watched.size()];
//
while ( metaThread != null )
{
// Take a snapshot of the current watched files.
tQueue.setWorkExpected(true);
synchronized(watched)
{
wfile = (WatchedFile[]) watched.toArray(wfile);
}
for (int i = 0; i < wfile.length; i++)
{
WatchedFile w = wfile[i];
if ( w == null )
break;
synchronized(w)
{
if ( !w.isInProgress() && w.isChanged() )
{
w.setInProgress(true);
tQueue.addTask(w);
}
}
}
synchronized(metaThread)
{
metaThread.notifyAll();
tQueue.setWorkExpected(false);
}
ThreadUtil.sleep(deltaTime);
}
threads = null;
tQueue = null;
metaThread = null;
synchronized(chunkQueue)
{
chunkQueue.notify();
}
}
}
/**
* Encapsulate a file with the ability to check for new bytes
* and a Runnable to fetch new bytes.
*/
protected class WatchedFile
implements Runnable
{
/** A marker for abstraction purpose */
private String label;
/** The file underlying the Watch */
private RandomAccessFile file;
/** Are we in progress? */
private boolean inProgress = false;
/** Stringbuffer, allocated once for the sake of speed */
private StringBuffer lineBuffer = new StringBuffer(80);
/** handle this stupid old MS-DOS cr lf thingie */
private boolean lastWasCR = false;
public WatchedFile(String label, String fname)
throws IOException
{
this.label = label;
file = new RandomAccessFile(fname,"r");
long pos = file.length();
file.seek(pos);
}
public boolean isChanged()
{
try
{
return (file.getFilePointer() < file.length());
}
catch(IOException e )
{
// We hope it will work again later!
return false;
}
}
public void setInProgress(boolean inProgress)
{
this.inProgress = inProgress;
}
public boolean isInProgress()
{
return inProgress;
}
public void run()
{
try
{
readAhead();
}
catch(Exception e)
{
e.printStackTrace();
}
synchronized(this)
{
setInProgress(false);
}
}
/** Enqueue a finished line and clear the line buffer */
protected void putAChunk()
{
synchronized(chunkQueue)
{
// JDK1.4.1 workarround to avoid creation of huge
// Strings
// was: chunkQueue.add(new MarkedChunk(lineBuffer.toString(),label));
chunkQueue.add(new MarkedChunk(lineBuffer.substring(0),label));
chunkQueue.notify();
}
lineBuffer.setLength(0);
}
/**
* Modified handling of CR LF and EOF compared to Sun implementation.
* Efficient handling of partial reads.
*/
public void readAhead()
throws IOException
{
// XXX revisit the readLine() if this can be trusted...
// Below has tha advantage it can stop in the middle of the line
// in case of slow feeds...
int c;
long i = file.length() - file.getFilePointer();
while ( i > 0 )
{
while ( i-- > 0 )
{
c = file.read();
if ( c == '\n' )
{
// CR LF is the same as CR alone :-(
if ( !lastWasCR )
putAChunk();
lastWasCR = false;
}
else if ( c == '\r' || c == -1 )
{
putAChunk();
// Wrong for c == -1, but after c == -1 we are done.
lastWasCR = true;
}
else
{
lineBuffer.append((char)c);
lastWasCR = false;
}
}
i = file.length() - file.getFilePointer();
}
}
public String toString()
{
try
{
return label+"["+isInProgress()+"/"+file.getFilePointer()+"/"+file.length()+"/"+isChanged()+"]";
}
catch ( IOException io )
{
return label+"["+isInProgress()+"/"+isChanged()+"/"+io+"]";
}
}
}
public String message()
{
getRegReport().begin("lines");
return "TailSource";
}
}
//
// Jacson - Text Filtering with Java.
// Copyright (C) 2002 Frank S. Nestel (nestefan -at- users.sourceforge.net)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -