⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractmailthreads.java

📁 基于java的电子邮件群发系统,基于java的电子邮件群发系统
💻 JAVA
字号:
package com.cwq.batchmail.mail;

import java.io.File;
import java.io.IOException;
import java.net.Proxy;
import java.util.Iterator;
import java.util.Vector;

import com.cwq.batchmail.DetailPacket;
import com.cwq.batchmail.log.Logger;
import com.cwq.batchmail.util.LoadEmails;
import com.cwq.batchmail.util.LoadProxies;
import com.cwq.batchmail.util.LoadSmtpUsers;

public abstract class AbstractMailThreads implements MailThreads {

	
	public static final String mailRegex = "([a-z0-9+_]|\\-|\\.)+@(([a-z0-9_]|\\-|\\.)+\\.)+[a-z]{2,6}";
	
	private Iterator<String> emails;
	private Vector<Proxy> proxies;
	private Vector<SmtpUser> smtpusers;
	private Vector<String> succ_mails;
	private Vector<String> fail_mails;
	
	private Iterator<SmtpUser> it_smtpuser;
	
	private int current_email = 0;
	private int current_thread_count = 0;
	
	private boolean useproxy = false;
	private boolean useLocalsmtpserver = false;
	
	
	private Logger logger = null;
	
	AbstractMailThreads() throws IOException {
		succ_mails = new Vector<String>();
		fail_mails = new Vector<String>();
	}
	
	void runSmtpServer(String domain) throws IOException {
		logger.info("正在启动本地Smtp服务器 ...");
		SmtpServer smtpServer = new SmtpServer(logger, domain);
		if( !smtpServer.isRunning() ) {
			if(logger != null)
				logger.info("本地Smtp服务器运行出现错误 !");
			throw new IOException("本地Smtp服务器运行出现错误 !");
		}
		logger.info("本地Smtp服务器启动成功。");
		useLocalsmtpserver = true;
	}
	
	void loadEMails(File emails, int limitemails) throws IOException {
		this.setEmails(new LoadEmails().load(emails, limitemails).iterator());
		if( !this.getEmails().hasNext() ) {
			if(logger != null)
				logger.info("加载EMAIL文件错误, 没有数据。");
			throw new IOException("加载EMAIL文件错误, 没有数据。");
		}
	}
	
	public void loadProxies(File proxies) throws IOException {
		this.setProxies(new LoadProxies().load(proxies));
		if(this.getProxies().size() == 0) {
			if(logger != null)
				logger.info("加载代理服务器文件错误, 没有数据。");
			throw new IOException("加载代理服务器文件错误, 没有数据。");
		}
		useproxy = true;
	}
	
	void loadSmtpusers(File smtpusers) throws IOException {
		this.setSmtpusers(LoadSmtpUsers.load(smtpusers));
		this.setIt_smtpuser(this.getSmtpusers().iterator());
		if(this.getSmtpusers().size() == 0) {
			if(logger != null)
				logger.info("加载SMTP服务器格户配置错误, 没有数据。");
			throw new IOException("加载SMTP服务器格户配置错误, 没有数据。");
		}
		useLocalsmtpserver = false;
	}
	
	public void run(final String subject, final String body, final int max_thread_count) {
		
		DetailPacket.isRunning = true;
		//System.out.println("max_thread_count " + max_thread_count);
		while (DetailPacket.isRunning) {
			if(getCurrentThreadCount() < max_thread_count) {
				//System.out.println("--------- " + getCurrentThreadCount());
				
				new Thread() {
					private void end(SmtpConnection smtp, String mails) {
						smtp.finalize();
						putFailMails(mails); // 记下这些已经取出,但又未发的email
						setCurrentThreadCount(getCurrentThreadCount() - 1); // 线程结束
						return ;
					}
					
					public void run() {
						try {
							
							SmtpUser smtpuser = nextUser();
							String mails = nextEmails(smtpuser.getMaxRcpt());
							
							if(mails == null) {
								// 所有email已经取出并发送了,可以退出循环了
								// 在这里有另一种状态,就是当所有email都取出后,就不会再新建线程了,但是此时还未结束的线程有可能执行不成功,
								// 这时候,那些线程的email也放到fail的emails中,看成不成功的
								DetailPacket.isRunning = false;
								
								//if(logger != null) logger.info("群发完成,正在结束线程。");
								return ;
							}
							
							if(smtpuser == null) {
								DetailPacket.isRunning = false;
								
								if(logger != null)
									logger.info("没有可用的用户了,退出。");
								return ;
							}
							
							// 开始当前线程的发信 ---------------------------------
							
							Proxy proxy = null;
							if(useproxy) proxy = rndProxy();
							SmtpConnection smtp = new SmtpConnection(smtpuser.getSmtp(), smtpuser.getPort());
							if(logger != null)
								logger.info("线程(" + this.hashCode() + ") 用户:" + smtpuser.toString());
							int r = -1;
							int loopKnowErrorsCount = 3; // 已经错误,最多重试3次
							int loopOtherErrorsCount = 3; // 如果是其它错误,重试loopOtherErrorsCount次
							
							while ( (r = smtp.open(smtpuser.getEmail(), smtpuser.getPassword(), proxy, !useLocalsmtpserver)) != SmtpConnection.STATE_AUTH_SUCC) {
								if(logger != null)
									logger.info("出现错误<" + r + ">,用户" + smtpuser.getEmail() + ",错误为" + smtp.getErrorMessage());
								
								if(r == SmtpConnection.STATE_CONNECTION_TIMEOUT) {
									if(useproxy) {
										proxies.remove(proxy); // 代理超时,从vector中删除,
										if(logger != null) logger.info("代理:" + proxy.toString() + " 链接超时.");
										proxy = rndProxy(); // 下一个代理
									}
									else {
										loopKnowErrorsCount --;
										if(loopKnowErrorsCount < 0) {
											end(smtp, mails);
											return ;
										}
									}
								}
								else if(r == SmtpConnection.STATE_AUTH_FAIL) {
									//smtpusers.remove(smtpuser); // 登录不成功, 从vector中删除
									if(logger != null) logger.info("用户:" + smtpuser.getEmail() + " 登录SMTP服务器失败,请检查用户名和密码.");
									end(smtp, mails);
									return ;
								}
								else {
									loopOtherErrorsCount --;
									if(loopOtherErrorsCount < 0) {
										end(smtp, mails);
										return ;
									}
									// other errors
								}
							} // 在链接并登录成功后,开始发送邮件
							if(SmtpConnection.STATE_AUTH_SUCC == r) {
								r = smtp.sendMessage(new Message(
										smtpuser.getEmail(),
										mails,
										"text/html; charset=iso-8859-1",
										subject,
										body,
										null
								));
								if(r == SmtpConnection.STATE_SEND_OK) {
									putSuccMails(mails);
									
									if(logger != null) logger.info("用户:" + smtpuser.toString() + " ,发送到 " + mails + " 的邮件成功.");
								}
								else {
									putFailMails(mails);
									
									if(logger != null) logger.info("发送失败,用户" + smtpuser.getEmail() + ",错误为" + smtp.getErrorMessage());
								}
							}
							smtp.finalize();
							// 结束当前线程的发信 ---------------------------------
						
						}
						catch(Exception ex){ ex.printStackTrace(); }
						
						setCurrentThreadCount(getCurrentThreadCount() - 1); // 线程结束
					}
				}.start();
				
				setCurrentThreadCount(getCurrentThreadCount() + 1);
			}
			
			try { Thread.currentThread().sleep(50); } catch(Exception ex) {}
		}
		
		DetailPacket.isRunning = false;
		
		if(logger != null) logger.info("......操作已经完成,请等待所有线程结束......");
		
		// wait for all threads to dead, 1 because current_thread_count begin from 0
		while (getCurrentThreadCount() > 1) {
			//System.out.println("ff: " + getCurrentThreadCount());
			try { Thread.currentThread().sleep(50); } catch(Exception ex) {}
		}
		
		if(logger != null) logger.info("......完成,所有线程已经结束......");
	}
	
	public void setLogger(Logger logger) {
		this.logger = logger;
	}
	
	public int getSuccEmailsCount() {
		return succ_mails.size();
	}
	
	synchronized private int getCurrentThreadCount() {
		return current_thread_count;
	}

	synchronized private void setCurrentThreadCount(int current_thread_count) {
		this.current_thread_count = current_thread_count;
	}

	synchronized protected Proxy rndProxy() {
		//System.out.println("2: " + System.currentTimeMillis());
		int rnd = (int) (((this.proxies.size()-1) - 0 + 1) * 
				java.lang.Math.random()) + 0;
		return this.proxies.get(rnd);
	}
	
	abstract protected SmtpUser nextUser();
	
	/*
	synchronized protected SmtpUser nextUser() {
		//System.out.println("1: " + System.currentTimeMillis());
		
		if(! it_smtpuser.hasNext())
			it_smtpuser = this.smtpusers.iterator();
		if(! it_smtpuser.hasNext())
			return null;
		return it_smtpuser.next();
	}
	*/
	
	synchronized protected String nextEmails(final int count) {
		//System.out.println("3: " + System.currentTimeMillis());
		if(!this.emails.hasNext()
				&& this.fail_mails.isEmpty())
			return null;
		
		StringBuffer buf = new StringBuffer("");
		int c = 0;
		for(; this.emails.hasNext() && (c ++) < count; )
			buf.append(this.emails.next()).append(",");
		
		this.current_email = this.current_email + c;
		
		// 已经取完,而且不够count个email,从fail_mails中取出,补足count个
		for( ; c < (count - 1); c ++) {
			try {
				// 如果email少话,有可能出现ArrayIndexOutofRound错误,主要是因为这个时候,fail_mails中没有任何数据
				String s = this.fail_mails.remove(c);
				buf.append(s).append(",");
				//System.out.println("4: " + System.currentTimeMillis());
			} catch(Exception ex) {
			}
		}
		
		return buf.toString();
	}
	
	synchronized private void putFailMails(final String mails) {
		String[] s = mails.split(",");
		for(int c=0; c<s.length; c++) {
			if(s[c].trim().length() > 0 &&
					s[c].matches(mailRegex))
				this.fail_mails.add(s[c]);
		}
	}
	
	synchronized private void putSuccMails(final String mails) {
		String[] s = mails.split(",");
		for(int c=0; c<s.length; c++) {
			if(s[c].trim().length() > 0 &&
					s[c].matches(mailRegex))
				this.succ_mails.add(s[c]);
		}
	}

	protected void setUseLocalsmtpserver(boolean useLocalsmtpserver) {
		this.useLocalsmtpserver = useLocalsmtpserver;
	}

	protected void setUseproxy(boolean useproxy) {
		this.useproxy = useproxy;
	}

	protected void setEmails(Iterator<String> emails) {
		this.emails = emails;
	}

	protected Iterator<String> getEmails() {
		return emails;
	}

	protected Vector<Proxy> getProxies() {
		return proxies;
	}

	protected void setProxies(Vector<Proxy> proxies) {
		this.proxies = proxies;
	}

	protected Vector<SmtpUser> getSmtpusers() {
		return smtpusers;
	}

	protected void setSmtpusers(Vector<SmtpUser> smtpusers) {
		this.smtpusers = smtpusers;
	}

	public Logger getLogger() {
		return logger;
	}

	protected Iterator<SmtpUser> getIt_smtpuser() {
		return it_smtpuser;
	}

	protected void setIt_smtpuser(Iterator<SmtpUser> it_smtpuser) {
		this.it_smtpuser = it_smtpuser;
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -