📄 roundrobindispatcher.java
字号:
/**
* $RCSfile$
* $Revision: 32833 $
* $Date: 2006-08-02 15:52:36 -0700 (Wed, 02 Aug 2006) $
*
* Copyright (C) 2004-2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution, or a commercial license
* agreement with Jive.
*/
package org.jivesoftware.xmpp.workgroup.dispatcher;
import org.jivesoftware.openfire.fastpath.util.TaskEngine;
import org.jivesoftware.openfire.fastpath.util.WorkgroupUtils;
import org.jivesoftware.xmpp.workgroup.*;
import org.jivesoftware.xmpp.workgroup.request.Request;
import org.jivesoftware.xmpp.workgroup.request.UserRequest;
import org.jivesoftware.xmpp.workgroup.spi.JiveLiveProperties;
import org.jivesoftware.xmpp.workgroup.spi.dispatcher.DbDispatcherInfoProvider;
import org.jivesoftware.util.*;
import org.xmpp.component.ComponentManagerFactory;
import org.xmpp.component.Log;
import java.util.*;
import java.util.LinkedList;
/**
* <p>Implements simple round robin dispatching of offers to agents.</p>
* <p>Agents are offered requests one at a time with no agent being offer
* the same request twice (unless their current-chats status changes).</p>
*
* @author Derek DeMoro
* @author Iain Shigeoka
*/
public class RoundRobinDispatcher implements Dispatcher, AgentSessionListener {
/**
* <p>The circular list of agents in the pool.</p>
*/
private List<AgentSession> agentList;
private RequestQueue queue;
/**
* <p>Prop manager for the dispatcher.</p>
*/
private JiveLiveProperties properties;
private DispatcherInfo info;
private DispatcherInfoProvider infoProvider = new DbDispatcherInfoProvider();
private AgentSelector agentSelector = WorkgroupUtils.getAvailableAgentSelectors().get(0);
/**
* A set of all outstanding offers in the workgroup<p>
*
* Let's the server route offer responses to the correct offer.
*/
private ConcurrentHashSet<Offer> offers = new ConcurrentHashSet<Offer>();
private Log logger = ComponentManagerFactory.getComponentManager().getLog();
/**
* Creates a new dispatcher for the queue. The dispatcher will have a Timer with a unique task
* that will get the requests from the queue and will try to send an offer to the agents.
*
* @param queue the queue that contains the requests and the agents that may attend the
* requests.
*/
public RoundRobinDispatcher(RequestQueue queue) {
this.queue = queue;
agentList = new LinkedList<AgentSession>();
properties = new JiveLiveProperties("fpDispatcherProp", queue.getID());
try {
info = infoProvider.getDispatcherInfo(queue.getWorkgroup(), queue.getID());
}
catch (NotFoundException e) {
logger.error("Queue ID " + queue.getID(), e);
}
// Recreate the agentSelector to use for selecting the best agent to receive the offer
loadAgentSelector();
// Fill the list of AgentSessions that are active in the queue. Once the list has been
// filled this dispatcher will be notified when new AgentSessions join the queue or leave
// the queue
fillAgentsList();
TaskEngine.getInstance().scheduleAtFixedRate(new TimerTask() {
public void run() {
checkForNewRequests();
}
}, 2000, 2000);
}
private void checkForNewRequests() {
for(Request request : queue.getRequests()){
// While there are requests pendings try to dispatch an offer for the request to an agent
// Skip this request if there exists an offer for this requests that is being processed
if (request.getOffer() != null && offers.contains(request.getOffer())) {
continue;
}
injectRequest(request);
}
}
public void injectRequest(Request request) {
// Create a new Offer for the request and add it to the list of active offers
final Offer offer = new Offer(request, queue, getAgentRejectionTimeout());
offer.setTimeout(info.getOfferTimeout());
offers.add(offer);
// Process this offer in another thread
Thread offerThread = new Thread("Dispatch offer - queue: " + queue.getName()) {
public void run() {
dispatch(offer);
// Remove this offer from the list of active offers
offers.remove(offer);
}
};
offerThread.start();
}
/**
* Dispatch the given request to one or more agents in the agent pool.<p>
*
* If this method returns, it is assumed that the request was properly
* dispatched.The only exception is if an agent is not in the pool for routing
* within the agent timeout period, the dispatch will throw an AgentNotFoundException
* so the request can be re-routed.
*
* @param offer the offer to send to the best agent available.
*/
public void dispatch(Offer offer) {
// The time when the request should timeout
long timeoutTime = System.currentTimeMillis() + info.getRequestTimeout();
final Request request = offer.getRequest();
boolean canBeInQueue = request instanceof UserRequest;
Map<String,List<String>> map = request.getMetaData();
String initialAgent = map.get("agent") == null || map.get("agent").isEmpty() ? null : map.get("agent").get(0);
String ignoreAgent = map.get("ignore") == null || map.get("ignore").isEmpty() ? null : map.get("ignore").get(0);
// Log debug trace
logger.debug("RR - Dispatching request: " + request + " in queue: " + queue.getAddress());
// Send the offer to the best agent. While the offer has not been accepted send it to the
// next best agent. If there aren't any agent available then skip this section and proceed
// to overflow the current request
if (!agentList.isEmpty()) {
for (long timeRemaining = timeoutTime - System.currentTimeMillis();
!offer.isAccepted() && timeRemaining > 0 && !offer.isCancelled();
timeRemaining = timeoutTime - System.currentTimeMillis()) {
try {
AgentSession session = getBestNextAgent(initialAgent, ignoreAgent, offer);
if (session == null && agentList.isEmpty()) {
// Stop looking for an agent since there are no more agent available
break;
}
else if (session == null || offer.isRejector(session)) {
initialAgent = null;
Thread.sleep(1000);
}
else {
// Recheck for changed maxchat setting
Workgroup workgroup = request.getWorkgroup();
if (session.getCurrentChats(workgroup) < session.getMaxChats(workgroup)) {
// Set the timeout of the offer based on the remaining time of the
// initial request and the default offer timeout
timeRemaining = timeoutTime - System.currentTimeMillis();
offer.setTimeout(timeRemaining < info.getOfferTimeout() ?
timeRemaining : info.getOfferTimeout());
// Make the offer and wait for a resolution to the offer
if (!request.sendOffer(session, queue)) {
// Log debug trace
logger.debug("RR - Offer for request: " + offer.getRequest() +
" FAILED TO BE SENT to agent: " +
session.getJID());
continue;
}
// Log debug trace
logger.debug("RR - Offer for request: " + offer.getRequest() + " SENT to agent: " +
session.getJID());
offer.waitForResolution();
// If the offer was accepted, we send out the invites
// and reset the offer
if (offer.isAccepted()) {
// Get the first agent that accepted the offer
AgentSession selectedAgent = offer.getAcceptedSessions().get(0);
// Log debug trace
logger.debug("RR - Agent: " + selectedAgent.getJID() +
" ACCEPTED request: " +
request);
// Create the room and send the invitations
offer.invite(selectedAgent);
// Notify the agents that accepted the offer that the offer process
// has finished
for (AgentSession agent : offer.getAcceptedSessions()) {
agent.removeOffer(offer);
}
if (canBeInQueue) {
// Remove the user from the queue since his request has
// been accepted
queue.removeRequest((UserRequest) request);
}
}
}
else {
// Log debug trace
logger.debug("RR - Selected agent: " + session.getJID() +
" has reached max number of chats");
}
}
}
catch (Exception e) {
logger.error(e);
}
}
}
if (!offer.isAccepted() && !offer.isCancelled()) {
// Calculate the maximum time limit for an unattended request before cancelling it
long limit = request.getCreationTime().getTime() +
(info.getRequestTimeout() * (getOverflowTimes() + 1));
if (limit - System.currentTimeMillis() <= 0 || !canBeInQueue) {
// Log debug trace
logger.debug("RR - Cancelling request that maxed out overflow limit or cannot be queued: " + request);
// Cancel the request if it has overflowed 'n' times
request.cancel(Request.CancelType.AGENT_NOT_FOUND);
}
else {
// Overflow if request timed out and was not dispatched and max number of overflows
// has not been reached yet
overflow(offer);
// If there is no other queue to overflow then cancel the request
if (!offer.isAccepted() && !offer.isCancelled()) {
// Log debug trace
logger.debug("RR - Cancelling request that didn't overflow: " + request);
request.cancel(Request.CancelType.AGENT_NOT_FOUND);
}
}
}
}
/**
* <p>Overflow the current request into another queue if possible.</p>
* <p/>
* <p>Future versions of the dispatcher may wish to overflow in
* more sophisticated ways. Currently we do it according to overflow
* rules: none (no overflow), backup (to a backup if it exists and is
* available, or randomly.</p>
*
* @param offer the offer to place in the overflow queue.
*/
private void overflow(Offer offer) {
RequestQueue backup = null;
if (RequestQueue.OverflowType.OVERFLOW_BACKUP.equals(queue.getOverflowType())) {
backup = queue.getBackupQueue();
// Check that the backup queue has agents available otherwise discard it
if (backup != null && !backup.getAgentSessionList().containsAvailableAgents()) {
backup = null;
}
}
else if (RequestQueue.OverflowType.OVERFLOW_RANDOM.equals(queue.getOverflowType())) {
backup = getRandomQueue();
}
// If a backup queue was found then cancel this offer, remove the request from the queue
// and add the request in the backup queue
if (backup != null) {
offer.cancel();
UserRequest request = (UserRequest) offer.getRequest();
// Remove the request from the queue since it is going to be added to another
// queue
queue.removeRequest(request);
// Log debug trace
logger.debug("RR - Overflowing request: " + request + " to queue: " +
backup.getAddress());
backup.addRequest(request);
}
}
/**
* Returns a queue that was randomly selected.
*
* @return a queue that was randomly selected.
*/
private RequestQueue getRandomQueue() {
int qCount = queue.getWorkgroup().getRequestQueueCount();
if (qCount > 1) {
// Build a list of all queues eligible for overflow
LinkedList<RequestQueue> overflowQueueList = new LinkedList<RequestQueue>();
for (RequestQueue overflowQueue : queue.getWorkgroup().getRequestQueues()) {
if (!queue.equals(overflowQueue) && overflowQueue.getAgentSessionList().containsAvailableAgents()) {
overflowQueueList.addLast(overflowQueue);
}
}
// If there are any eligible queues
if (overflowQueueList.size() > 0) {
// choose the random index of the overflow queue to use
int targetIndex = (int) Math.floor(((float) (overflowQueueList.size())) * Math.random());
if (targetIndex < overflowQueueList.size()) {
return overflowQueueList.get(targetIndex);
}
}
}
return null;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -