📄 notifier.java
字号:
} else {
h.put(p.getPrincipalName(), new Recipient(Recipient.USER, p.getPrincipalName()));
}
}
} else if (r.getRecipientType() == Recipient.ROLE) {
Role role = CoreServlet.getServlet().getUserDatabase().getRole(r.getRecipientAlias());
User[] users = CoreServlet.getServlet().getUserDatabase().listAllUsers("*");
for (int x = 0; x < users.length; x++) {
Role[] roles = users[x].getRoles();
for (int y = 0; y < roles.length; y++) {
if (roles[y].getPrincipalName().equals(role.getPrincipalName())) {
h.put(users[x].getPrincipalName(), new Recipient(Recipient.USER, users[x].getPrincipalName()));
break;
}
}
}
} else if (r.getRecipientType() == Recipient.ADMINS) {
User[] users = CoreServlet.getServlet().getUserDatabase().listAllUsers("*");
for (int j = 0; j < users.length; j++) {
if (CoreServlet.getServlet().getLogonController().isAdministrator(users[j])) {
h.put(users[j].getPrincipalName(), new Recipient(Recipient.USER, users[j].getPrincipalName()));
}
}
}
}
List l = new ArrayList();
for (Iterator i = h.entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Map.Entry) i.next();
l.add(entry.getValue());
}
return l;
}
MessageSink getSink(String sinkName) {
MessageSink s;
for (Iterator i = sinks.iterator(); i.hasNext();) {
s = (MessageSink) i.next();
if (s.getName().equals(sinkName)) {
return s;
}
}
return null;
}
void write(Message message) throws IOException {
if (log.isDebugEnabled())
log.debug("Writing message " + message.getId() + " '" + message.getSubject() + "' to disk");
FileOutputStream fout = new FileOutputStream(new File(queueDirectory, String.valueOf(message.getId()) + ".msg"));
try {
DataOutputStream dout = new DataOutputStream(fout);
dout.writeLong(message.getId());
dout.writeUTF(message.getSinkName());
dout.writeBoolean(message.isUrgent());
dout.writeUTF(message.getSubject());
for (Iterator i = message.getRecipients().iterator(); i.hasNext();) {
Recipient r = (Recipient) i.next();
dout.writeInt(r.getRecipientType());
dout.writeUTF(r.getRecipientAlias() == null ? "" : r.getRecipientAlias());
}
dout.writeInt(0);
for (Iterator i = message.getParameterNames(); i.hasNext();) {
String key = (String) i.next();
dout.writeInt(1);
dout.writeUTF(key);
dout.writeUTF(message.getParameter(key));
}
dout.writeInt(0);
dout.writeUTF(message.getContent());
dout.writeUTF(message.getLastMessage());
} finally {
fout.close();
}
}
void queue(Message message) {
if (log.isDebugEnabled())
log.debug("Queueing message " + message.getId() + " '" + message.getSubject() + "'");
MessageWrapper wrapper = new MessageWrapper(message);
if (message.isUrgent()) {
messages.add(0, wrapper);
queueNotify();
} else {
messages.add(wrapper);
}
}
void queueNotify() {
if (log.isDebugEnabled())
log.debug("Notify queue");
synchronized (messages) {
messages.notify();
}
if (log.isDebugEnabled())
log.debug("Queue notified");
}
public boolean doSend(String sinkName, Message message) {
message.setSinkName(sinkName);
return doSend(message);
}
boolean doSend(Message message) {
if(log.isDebugEnabled())
log.debug("Sending message with subject of " + message.getSubject() + " urgent = " + message.isUrgent());
if (log.isDebugEnabled()) {
for (Iterator i = message.getRecipients().iterator(); i.hasNext();) {
Recipient r = (Recipient) i.next();
log.debug(" " + r.getRecipientType() + "/" + r.getRecipientAlias());
}
log.debug("Content = " + message.getContent());
log.debug("Sink name = " + message.getSinkName());
}
boolean sent = false;
if (message.getSinkName().equals("*")) {
for (Iterator i = sinks.iterator(); i.hasNext();) {
MessageSink sink = (MessageSink) i.next();
if (Boolean.TRUE == sinkEnabled.get(sink.getName())) {
try {
if (sink.send(message)) {
sent = true;
}
} catch (Exception e) {
log.error("Failed to send message " + message.getId() + ".", e);
}
}
}
} else if (message.getSinkName().startsWith("!")) {
String[] except = message.getSinkName().substring(1).split(",");
for (Iterator i = sinks.iterator(); i.hasNext();) {
MessageSink sink = (MessageSink) i.next();
boolean found = false;
for (int j = 0; j < except.length && !found; j++) {
if (sink.getName().equals(except[j])) {
found = true;
}
}
if (!found && Boolean.TRUE == sinkEnabled.get(sink.getName())) {
try {
if (sink.send(message)) {
sent = true;
}
} catch (Exception e) {
log.error("Failed to send message " + message.getId() + ".", e);
}
}
}
} else if (message.getSinkName().equals("^")) {
for (Iterator i = sinks.iterator(); !sent && i.hasNext();) {
MessageSink sink = (MessageSink) i.next();
if (Boolean.TRUE == sinkEnabled.get(sink.getName())) {
try {
if (sink.send(message)) {
sent = true;
}
} catch (Exception e) {
log.error("Failed to send message " + message.getId() + ".", e);
}
}
}
} else {
MessageSink s = getSink(message.getSinkName());
if (s == null) {
log.error("No message sink named " + message.getSinkName());
} else {
if (Boolean.TRUE == sinkEnabled.get(s.getName())) {
try {
sent = s.send(message);
} catch (Exception e) {
message.setLastMessage(e.getMessage());
log.error("Failed to send message " + message.getId() + ".", e);
;
}
}
}
}
if (!sent) {
log.error("No message sink sent message " + message.getId());
}
return sent;
}
class MessageConsumer extends Thread {
MessageConsumer() {
super("Notification Message Consumer");
}
public void run() {
stop = false;
MessageWrapper msg = null;
int valid = -1;
boolean waitOnce = false;
while (!stop) {
synchronized (messages) {
while ((waitOnce || messages.size() == 0) && !stop) {
try {
messages.wait(30000);
} catch (InterruptedException ie) {
log.error("MessageConsumer interrupted.", ie);
}
waitOnce = false;
}
if (!stop) {
if (log.isDebugEnabled())
log.debug("Checking message queue");
int i = 0;
valid = -1;
while (i < messages.size()) {
msg = (MessageWrapper) messages.get(i);
if (log.isDebugEnabled())
log.debug("Checking if message " + msg.getMessage().getId() + " is valid");
// If the message has been attempted before, check
// whether its ready for retry
if (msg.attempt == 0 || (msg.attempt > 0 && (msg.time + 60000) < System.currentTimeMillis())) {
valid = i;
break;
} else {
i++;
}
}
if (valid != -1) {
messages.remove(valid);
}
}
}
if (!stop) {
boolean sent = false;
if (valid != -1) {
sent = doSend(msg.message);
if (!sent) {
msg.attempt++;
msg.time = System.currentTimeMillis();
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
}
log.error("Failed to send message. " + msg.getMessage().getLastMessage());
messages.add(msg);
} else {
new File(queueDirectory, msg.message.getId() + ".msg").delete();
}
} else {
waitOnce = true;
}
}
}
}
}
public class MessageWrapper {
Message message;
long time;
int attempt;
public MessageWrapper(Message message) {
this.message = message;
this.time = System.currentTimeMillis();
}
public Message getMessage() {
return message;
}
public int getAttempt() {
return attempt;
}
public long getTime() {
return time;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -