📄 conversationeventsqueue.java
字号:
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution, or a commercial license
* agreement with Jive.
*/
package org.jivesoftware.openfire.archive;
import org.jivesoftware.openfire.archive.cluster.SendConversationEventsTask;
import org.jivesoftware.openfire.reporting.util.TaskEngine;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.cache.CacheFactory;
import java.util.*;
/**
* Queue conversation events generated by this JVM and send them to the senior cluster
* member every 3 seconds. This is an optimization to reduce traffic between the cluster
* nodes specialy when under heavy conversations load.
*
* @author Gaston Dombiak
*/
public class ConversationEventsQueue {
private ConversationManager conversationManager;
/**
* Chat events that are pending to be sent to the senior cluster member.
* Key: Conversation Key; Value: List of conversation events.
*/
private final Map<String, List<ConversationEvent>> chatEvents = new HashMap<String, List<ConversationEvent>>();
/**
* Group chat events that are pending to be sent to the senior cluster member.
* Key: Conversation Key; Value: List of conversation events.
*/
private final Map<String, List<ConversationEvent>> roomEvents = new HashMap<String, List<ConversationEvent>>();
public ConversationEventsQueue(ConversationManager conversationManager, TaskEngine taskEngine) {
this.conversationManager = conversationManager;
// Schedule a task to do conversation archiving.
TimerTask sendTask = new TimerTask() {
public void run() {
// Move queued events to a temp place
List<ConversationEvent> eventsToSend = new ArrayList<ConversationEvent>();
synchronized (chatEvents) {
for (List<ConversationEvent> list : chatEvents.values()) {
// Just send the first and last event if we are not archiving messages
if (!ConversationEventsQueue.this.conversationManager.isMessageArchivingEnabled() &&
list.size() > 2) {
eventsToSend.add(list.get(0));
eventsToSend.add(list.get(list.size() - 1));
}
else {
// Send all events
eventsToSend.addAll(list);
}
}
// We can empty the queue now
chatEvents.clear();
}
synchronized (roomEvents) {
for (List<ConversationEvent> list : roomEvents.values()) {
eventsToSend.addAll(list);
}
// We can empty the queue now
roomEvents.clear();
}
// Send the queued events (from the temp place) to the senior cluster member
CacheFactory.doClusterTask(new SendConversationEventsTask(eventsToSend),
ClusterManager.getSeniorClusterMember().toByteArray());
}
};
taskEngine.scheduleAtFixedRate(sendTask, JiveConstants.SECOND * 3, JiveConstants.SECOND * 3);
}
/**
* Queues the one-to-one chat event to be later sent to the senior cluster member.
*
* @param conversationKey unique key that identifies the conversation.
* @param event conversation event.
*/
public void addChatEvent(String conversationKey, ConversationEvent event) {
synchronized (chatEvents) {
List<ConversationEvent> events = chatEvents.get(conversationKey);
if (events == null) {
events = new ArrayList<ConversationEvent>();
chatEvents.put(conversationKey, events);
}
events.add(event);
}
}
/**
* Queues the group chat event to be later sent to the senior cluster member.
*
* @param conversationKey unique key that identifies the conversation.
* @param event conversation event.
*/
public void addGroupChatEvent(String conversationKey, ConversationEvent event) {
synchronized (roomEvents) {
List<ConversationEvent> events = roomEvents.get(conversationKey);
if (events == null) {
events = new ArrayList<ConversationEvent>();
roomEvents.put(conversationKey, events);
}
events.add(event);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -