📄 abstractmailthreads.java
字号:
package com.cwq.batchmail.mail;
import java.io.File;
import java.io.IOException;
import java.net.Proxy;
import java.util.Iterator;
import java.util.Vector;
import com.cwq.batchmail.DetailPacket;
import com.cwq.batchmail.log.Logger;
import com.cwq.batchmail.util.LoadEmails;
import com.cwq.batchmail.util.LoadProxies;
import com.cwq.batchmail.util.LoadSmtpUsers;
public abstract class AbstractMailThreads implements MailThreads {
public static final String mailRegex = "([a-z0-9+_]|\\-|\\.)+@(([a-z0-9_]|\\-|\\.)+\\.)+[a-z]{2,6}";
private Iterator<String> emails;
private Vector<Proxy> proxies;
private Vector<SmtpUser> smtpusers;
private Vector<String> succ_mails;
private Vector<String> fail_mails;
private Iterator<SmtpUser> it_smtpuser;
private int current_email = 0;
private int current_thread_count = 0;
private boolean useproxy = false;
private boolean useLocalsmtpserver = false;
private Logger logger = null;
AbstractMailThreads() throws IOException {
succ_mails = new Vector<String>();
fail_mails = new Vector<String>();
}
void runSmtpServer(String domain) throws IOException {
logger.info("正在启动本地Smtp服务器 ...");
SmtpServer smtpServer = new SmtpServer(logger, domain);
if( !smtpServer.isRunning() ) {
if(logger != null)
logger.info("本地Smtp服务器运行出现错误 !");
throw new IOException("本地Smtp服务器运行出现错误 !");
}
logger.info("本地Smtp服务器启动成功。");
useLocalsmtpserver = true;
}
void loadEMails(File emails, int limitemails) throws IOException {
this.setEmails(new LoadEmails().load(emails, limitemails).iterator());
if( !this.getEmails().hasNext() ) {
if(logger != null)
logger.info("加载EMAIL文件错误, 没有数据。");
throw new IOException("加载EMAIL文件错误, 没有数据。");
}
}
public void loadProxies(File proxies) throws IOException {
this.setProxies(new LoadProxies().load(proxies));
if(this.getProxies().size() == 0) {
if(logger != null)
logger.info("加载代理服务器文件错误, 没有数据。");
throw new IOException("加载代理服务器文件错误, 没有数据。");
}
useproxy = true;
}
void loadSmtpusers(File smtpusers) throws IOException {
this.setSmtpusers(LoadSmtpUsers.load(smtpusers));
this.setIt_smtpuser(this.getSmtpusers().iterator());
if(this.getSmtpusers().size() == 0) {
if(logger != null)
logger.info("加载SMTP服务器格户配置错误, 没有数据。");
throw new IOException("加载SMTP服务器格户配置错误, 没有数据。");
}
useLocalsmtpserver = false;
}
public void run(final String subject, final String body, final int max_thread_count) {
DetailPacket.isRunning = true;
//System.out.println("max_thread_count " + max_thread_count);
while (DetailPacket.isRunning) {
if(getCurrentThreadCount() < max_thread_count) {
//System.out.println("--------- " + getCurrentThreadCount());
new Thread() {
private void end(SmtpConnection smtp, String mails) {
smtp.finalize();
putFailMails(mails); // 记下这些已经取出,但又未发的email
setCurrentThreadCount(getCurrentThreadCount() - 1); // 线程结束
return ;
}
public void run() {
try {
SmtpUser smtpuser = nextUser();
String mails = nextEmails(smtpuser.getMaxRcpt());
if(mails == null) {
// 所有email已经取出并发送了,可以退出循环了
// 在这里有另一种状态,就是当所有email都取出后,就不会再新建线程了,但是此时还未结束的线程有可能执行不成功,
// 这时候,那些线程的email也放到fail的emails中,看成不成功的
DetailPacket.isRunning = false;
//if(logger != null) logger.info("群发完成,正在结束线程。");
return ;
}
if(smtpuser == null) {
DetailPacket.isRunning = false;
if(logger != null)
logger.info("没有可用的用户了,退出。");
return ;
}
// 开始当前线程的发信 ---------------------------------
Proxy proxy = null;
if(useproxy) proxy = rndProxy();
SmtpConnection smtp = new SmtpConnection(smtpuser.getSmtp(), smtpuser.getPort());
if(logger != null)
logger.info("线程(" + this.hashCode() + ") 用户:" + smtpuser.toString());
int r = -1;
int loopKnowErrorsCount = 3; // 已经错误,最多重试3次
int loopOtherErrorsCount = 3; // 如果是其它错误,重试loopOtherErrorsCount次
while ( (r = smtp.open(smtpuser.getEmail(), smtpuser.getPassword(), proxy, !useLocalsmtpserver)) != SmtpConnection.STATE_AUTH_SUCC) {
if(logger != null)
logger.info("出现错误<" + r + ">,用户" + smtpuser.getEmail() + ",错误为" + smtp.getErrorMessage());
if(r == SmtpConnection.STATE_CONNECTION_TIMEOUT) {
if(useproxy) {
proxies.remove(proxy); // 代理超时,从vector中删除,
if(logger != null) logger.info("代理:" + proxy.toString() + " 链接超时.");
proxy = rndProxy(); // 下一个代理
}
else {
loopKnowErrorsCount --;
if(loopKnowErrorsCount < 0) {
end(smtp, mails);
return ;
}
}
}
else if(r == SmtpConnection.STATE_AUTH_FAIL) {
//smtpusers.remove(smtpuser); // 登录不成功, 从vector中删除
if(logger != null) logger.info("用户:" + smtpuser.getEmail() + " 登录SMTP服务器失败,请检查用户名和密码.");
end(smtp, mails);
return ;
}
else {
loopOtherErrorsCount --;
if(loopOtherErrorsCount < 0) {
end(smtp, mails);
return ;
}
// other errors
}
} // 在链接并登录成功后,开始发送邮件
if(SmtpConnection.STATE_AUTH_SUCC == r) {
r = smtp.sendMessage(new Message(
smtpuser.getEmail(),
mails,
"text/html; charset=iso-8859-1",
subject,
body,
null
));
if(r == SmtpConnection.STATE_SEND_OK) {
putSuccMails(mails);
if(logger != null) logger.info("用户:" + smtpuser.toString() + " ,发送到 " + mails + " 的邮件成功.");
}
else {
putFailMails(mails);
if(logger != null) logger.info("发送失败,用户" + smtpuser.getEmail() + ",错误为" + smtp.getErrorMessage());
}
}
smtp.finalize();
// 结束当前线程的发信 ---------------------------------
}
catch(Exception ex){ ex.printStackTrace(); }
setCurrentThreadCount(getCurrentThreadCount() - 1); // 线程结束
}
}.start();
setCurrentThreadCount(getCurrentThreadCount() + 1);
}
try { Thread.currentThread().sleep(50); } catch(Exception ex) {}
}
DetailPacket.isRunning = false;
if(logger != null) logger.info("......操作已经完成,请等待所有线程结束......");
// wait for all threads to dead, 1 because current_thread_count begin from 0
while (getCurrentThreadCount() > 1) {
//System.out.println("ff: " + getCurrentThreadCount());
try { Thread.currentThread().sleep(50); } catch(Exception ex) {}
}
if(logger != null) logger.info("......完成,所有线程已经结束......");
}
public void setLogger(Logger logger) {
this.logger = logger;
}
public int getSuccEmailsCount() {
return succ_mails.size();
}
synchronized private int getCurrentThreadCount() {
return current_thread_count;
}
synchronized private void setCurrentThreadCount(int current_thread_count) {
this.current_thread_count = current_thread_count;
}
synchronized protected Proxy rndProxy() {
//System.out.println("2: " + System.currentTimeMillis());
int rnd = (int) (((this.proxies.size()-1) - 0 + 1) *
java.lang.Math.random()) + 0;
return this.proxies.get(rnd);
}
abstract protected SmtpUser nextUser();
/*
synchronized protected SmtpUser nextUser() {
//System.out.println("1: " + System.currentTimeMillis());
if(! it_smtpuser.hasNext())
it_smtpuser = this.smtpusers.iterator();
if(! it_smtpuser.hasNext())
return null;
return it_smtpuser.next();
}
*/
synchronized protected String nextEmails(final int count) {
//System.out.println("3: " + System.currentTimeMillis());
if(!this.emails.hasNext()
&& this.fail_mails.isEmpty())
return null;
StringBuffer buf = new StringBuffer("");
int c = 0;
for(; this.emails.hasNext() && (c ++) < count; )
buf.append(this.emails.next()).append(",");
this.current_email = this.current_email + c;
// 已经取完,而且不够count个email,从fail_mails中取出,补足count个
for( ; c < (count - 1); c ++) {
try {
// 如果email少话,有可能出现ArrayIndexOutofRound错误,主要是因为这个时候,fail_mails中没有任何数据
String s = this.fail_mails.remove(c);
buf.append(s).append(",");
//System.out.println("4: " + System.currentTimeMillis());
} catch(Exception ex) {
}
}
return buf.toString();
}
synchronized private void putFailMails(final String mails) {
String[] s = mails.split(",");
for(int c=0; c<s.length; c++) {
if(s[c].trim().length() > 0 &&
s[c].matches(mailRegex))
this.fail_mails.add(s[c]);
}
}
synchronized private void putSuccMails(final String mails) {
String[] s = mails.split(",");
for(int c=0; c<s.length; c++) {
if(s[c].trim().length() > 0 &&
s[c].matches(mailRegex))
this.succ_mails.add(s[c]);
}
}
protected void setUseLocalsmtpserver(boolean useLocalsmtpserver) {
this.useLocalsmtpserver = useLocalsmtpserver;
}
protected void setUseproxy(boolean useproxy) {
this.useproxy = useproxy;
}
protected void setEmails(Iterator<String> emails) {
this.emails = emails;
}
protected Iterator<String> getEmails() {
return emails;
}
protected Vector<Proxy> getProxies() {
return proxies;
}
protected void setProxies(Vector<Proxy> proxies) {
this.proxies = proxies;
}
protected Vector<SmtpUser> getSmtpusers() {
return smtpusers;
}
protected void setSmtpusers(Vector<SmtpUser> smtpusers) {
this.smtpusers = smtpusers;
}
public Logger getLogger() {
return logger;
}
protected Iterator<SmtpUser> getIt_smtpuser() {
return it_smtpuser;
}
protected void setIt_smtpuser(Iterator<SmtpUser> it_smtpuser) {
this.it_smtpuser = it_smtpuser;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -