⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nntpspooler.java

📁 邮件服务器系统 支持SMTP POP3 是著名的Apache写 有一定的参考价值
💻 JAVA
字号:
/*********************************************************************** * Copyright (c) 2000-2004 The Apache Software Foundation.             * * All rights reserved.                                                * * ------------------------------------------------------------------- * * Licensed under the Apache License, Version 2.0 (the "License"); you * * may not use this file except in compliance with the License. You    * * may obtain a copy of the License at:                                * *                                                                     * *     http://www.apache.org/licenses/LICENSE-2.0                      * *                                                                     * * Unless required by applicable law or agreed to in writing, software * * distributed under the License is distributed on an "AS IS" BASIS,   * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or     * * implied.  See the License for the specific language governing       * * permissions and limitations under the License.                      * ***********************************************************************/package org.apache.james.nntpserver.repository;import org.apache.avalon.excalibur.io.IOUtil;import org.apache.avalon.framework.activity.Initializable;import org.apache.avalon.framework.configuration.Configurable;import org.apache.avalon.framework.configuration.Configuration;import org.apache.avalon.framework.configuration.ConfigurationException;import org.apache.avalon.framework.context.Context;import org.apache.avalon.framework.context.ContextException;import org.apache.avalon.framework.context.Contextualizable;import org.apache.avalon.framework.logger.AbstractLogEnabled;import org.apache.avalon.framework.logger.LogEnabled;import org.apache.james.context.AvalonContextUtilities;import org.apache.james.util.Lock;import javax.mail.internet.MimeMessage;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.InputStreamReader;import java.io.IOException;import java.util.Properties;import java.util.StringTokenizer;/** * Processes entries and sends to appropriate groups. * Eats up inappropriate entries. * */class NNTPSpooler extends AbstractLogEnabled        implements Contextualizable, Configurable, Initializable {    /**     * The spooler context     */    private Context context;    /**     * The array of spooler runnables, each associated with a Worker thread     */    private SpoolerRunnable[] worker;    /**     * The directory containing entries to be spooled.     */    private File spoolPath;    /**     * The String form of the spool directory.     */    private String spoolPathString;    /**     * The time the spooler threads sleep between processing     */    private int threadIdleTime = 0;    /**     * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)     */    public void contextualize(final Context context)            throws ContextException {        this.context = context;    }    /**     * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)     */    public void configure( Configuration configuration ) throws ConfigurationException {        int threadCount = configuration.getChild("threadCount").getValueAsInteger(1);        threadIdleTime = configuration.getChild("threadIdleTime").getValueAsInteger(60 * 1000);        spoolPathString = configuration.getChild("spoolPath").getValue();        worker = new SpoolerRunnable[threadCount];    }    /**     * @see org.apache.avalon.framework.activity.Initializable#initialize()     */    public void initialize() throws Exception {        //System.out.println(getClass().getName()+": init");        try {            spoolPath = AvalonContextUtilities.getFile(context, spoolPathString);            if ( spoolPath.exists() == false ) {                spoolPath.mkdirs();            } else if (!(spoolPath.isDirectory())) {                StringBuffer errorBuffer =                    new StringBuffer(128)                        .append("Spool directory is improperly configured.  The specified path ")                        .append(spoolPathString)                        .append(" is not a directory.");                throw new ConfigurationException(errorBuffer.toString());            }        } catch (Exception e) {            getLogger().fatalError(e.getMessage(), e);            throw e;        }        for ( int i = 0 ; i < worker.length ; i++ ) {            worker[i] = new SpoolerRunnable(threadIdleTime,spoolPath);            if ( worker[i] instanceof LogEnabled ) {                ((LogEnabled)worker[i]).enableLogging(getLogger());            }        }        // TODO: Replace this with a standard Avalon thread pool        for ( int i = 0 ; i < worker.length ; i++ ) {            new Thread(worker[i],"NNTPSpool-"+i).start();        }    }    /**     * Sets the repository used by this spooler.     *     * @param repo the repository to be used     */    void setRepository(NNTPRepository repo) {        for ( int i = 0 ; i < worker.length ; i++ ) {            worker[i].setRepository(repo);        }    }    /**     * Sets the article id repository used by this spooler.     *     * @param articleIDRepo the article id repository to be used     */    void setArticleIDRepository(ArticleIDRepository articleIDRepo) {        for ( int i = 0 ; i < worker.length ; i++ ) {            worker[i].setArticleIDRepository(articleIDRepo);        }    }    /**     * Returns (and creates, if the directory doesn't already exist) the     * spool directory     *     * @return the spool directory     */    File getSpoolPath() {        return spoolPath;    }    /**     * A static inner class that provides the body for the spool     * threads.     */    static class SpoolerRunnable extends AbstractLogEnabled implements Runnable {        private static final Lock lock = new Lock();        /**         * The directory containing entries to be spooled.         */        private final File spoolPath;        /**         * The time the spooler thread sleeps between processing         */        private final int threadIdleTime;        /**         * The article ID repository used by this spooler thread         */        private ArticleIDRepository articleIDRepo;        /**         * The NNTP repository used by this spooler thread         */        private NNTPRepository repo;        SpoolerRunnable(int threadIdleTime,File spoolPath) {            this.threadIdleTime = threadIdleTime;            this.spoolPath = spoolPath;        }        /**         * Sets the article id repository used by this spooler thread.         *         * @param articleIDRepo the article id repository to be used         */        void setArticleIDRepository(ArticleIDRepository articleIDRepo) {            this.articleIDRepo = articleIDRepo;        }        /**         * Sets the repository used by this spooler thread.         *         * @param repo the repository to be used         */        void setRepository(NNTPRepository repo) {            this.repo = repo;        }        /**         * The threads race to grab a lock. if a thread wins it processes the article,         * if it loses it tries to lock and process the next article.         */        public void run() {            getLogger().debug(Thread.currentThread().getName() + " is the NNTP spooler thread.");            try {                while ( Thread.currentThread().interrupted() == false ) {                    String[] list = spoolPath.list();                    if (list.length > 0) getLogger().debug("Files to process: "+list.length);                    for ( int i = 0 ; i < list.length ; i++ ) {                        if ( lock.lock(list[i]) ) {                            File f = new File(spoolPath,list[i]).getAbsoluteFile();                            getLogger().debug("Processing file: "+f.getAbsolutePath());                            try {                                process(f);                            } catch(Throwable ex) {                                getLogger().debug("Exception occured while processing file: "+                                                  f.getAbsolutePath(),ex);                            } finally {                                lock.unlock(list[i]);                            }                        }                        list[i] = null; // release the string entry;                    }                    list = null; // release the array;                    // this is good for other non idle threads                    try {                        Thread.currentThread().sleep(threadIdleTime);                    } catch(InterruptedException ex) {                        // Ignore and continue                    }                }            } finally {                Thread.currentThread().interrupted();            }        }        /**         * Process a file stored in the spool.         *         * @param f the spool file being processed         */        private void process(File spoolFile) throws Exception {            StringBuffer logBuffer =                new StringBuffer(160)                        .append("process: ")                        .append(spoolFile.getAbsolutePath())                        .append(",")                        .append(spoolFile.getCanonicalPath());            getLogger().debug(logBuffer.toString());            final MimeMessage msg;            String articleID;            // TODO: Why is this a block?            {   // Get the message for copying to destination groups.                FileInputStream fin = new FileInputStream(spoolFile);                try {                    msg = new MimeMessage(null,fin);                } finally {                    IOUtil.shutdownStream(fin);                }                String lineCount = null;                String[] lineCountHeader = msg.getHeader("Lines");                if (lineCountHeader == null || lineCountHeader.length == 0) {                    BufferedReader rdr = new BufferedReader(new InputStreamReader(msg.getDataHandler().getInputStream()));                    int lines = 0;                    while (rdr.readLine() != null) {                        lines++;                    }                    lineCount = Integer.toString(lines);                    rdr.close();                    msg.setHeader("Lines", lineCount);                }                // ensure no duplicates exist.                String[] idheader = msg.getHeader("Message-Id");                articleID = ((idheader != null && (idheader.length > 0))? idheader[0] : null);                if ((articleID != null) && ( articleIDRepo.isExists(articleID))) {                    getLogger().debug("Message already exists: "+articleID);                    if (spoolFile.delete() == false)                        getLogger().error("Could not delete duplicate message from spool: " + spoolFile.getAbsolutePath());                    return;                }                if ( articleID == null || lineCount != null) {                    if (articleID == null) {                        articleID = articleIDRepo.generateArticleID();                        msg.setHeader("Message-Id", articleID);                    }                    FileOutputStream fout = new FileOutputStream(spoolFile);                    try {                        msg.writeTo(fout);                    } finally {                        IOUtil.shutdownStream(fout);                    }                }            }            String[] headers = msg.getHeader("Newsgroups");            Properties prop = new Properties();            if (headers != null) {                for ( int i = 0 ; i < headers.length ; i++ ) {                    StringTokenizer tokenizer = new StringTokenizer(headers[i],",");                    while ( tokenizer.hasMoreTokens() ) {                        String groupName = tokenizer.nextToken().trim();                        getLogger().debug("Copying message to group: "+groupName);                        NNTPGroup group = repo.getGroup(groupName);                        if ( group == null ) {                            getLogger().error("Couldn't add article with article ID " + articleID + " to group " + groupName + " - group not found.");                            continue;                        }                        FileInputStream newsStream = new FileInputStream(spoolFile);                        try {                            NNTPArticle article = group.addArticle(newsStream);                            prop.setProperty(group.getName(),article.getArticleNumber() + "");                        } finally {                            IOUtil.shutdownStream(newsStream);                        }                    }                }            }            articleIDRepo.addArticle(articleID,prop);            boolean delSuccess = spoolFile.delete();            if ( delSuccess == false ) {                getLogger().error("Could not delete file: " + spoolFile.getAbsolutePath());            }        }    } // class SpoolerRunnable}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -