⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 roundrobindispatcher.java

📁 openfire 服务器源码下载
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 * $RCSfile$
 * $Revision: 32833 $
 * $Date: 2006-08-02 15:52:36 -0700 (Wed, 02 Aug 2006) $
 *
 * Copyright (C) 2004-2008 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution, or a commercial license
 * agreement with Jive.
 */

package org.jivesoftware.xmpp.workgroup.dispatcher;

import org.jivesoftware.openfire.fastpath.util.TaskEngine;
import org.jivesoftware.openfire.fastpath.util.WorkgroupUtils;
import org.jivesoftware.xmpp.workgroup.*;
import org.jivesoftware.xmpp.workgroup.request.Request;
import org.jivesoftware.xmpp.workgroup.request.UserRequest;
import org.jivesoftware.xmpp.workgroup.spi.JiveLiveProperties;
import org.jivesoftware.xmpp.workgroup.spi.dispatcher.DbDispatcherInfoProvider;
import org.jivesoftware.util.*;
import org.xmpp.component.ComponentManagerFactory;
import org.xmpp.component.Log;

import java.util.*;
import java.util.LinkedList;

/**
 * <p>Implements simple round robin dispatching of offers to agents.</p>
 * <p>Agents are offered requests one at a time with no agent being offer
 * the same request twice (unless their current-chats status changes).</p>
 *
 * @author Derek DeMoro
 * @author Iain Shigeoka
 */
public class RoundRobinDispatcher implements Dispatcher, AgentSessionListener {
    /**
     * <p>The circular list of agents in the pool.</p>
     */
    private List<AgentSession> agentList;

    private RequestQueue queue;

    /**
     * <p>Prop manager for the dispatcher.</p>
     */
    private JiveLiveProperties properties;
    private DispatcherInfo info;
    private DispatcherInfoProvider infoProvider = new DbDispatcherInfoProvider();
    private AgentSelector agentSelector = WorkgroupUtils.getAvailableAgentSelectors().get(0);
    /**
     * A set of all outstanding offers in the workgroup<p>
     *
     * Let's the server route offer responses to the correct offer.
     */
    private ConcurrentHashSet<Offer> offers = new ConcurrentHashSet<Offer>();

    private Log logger = ComponentManagerFactory.getComponentManager().getLog();

    /**
     * Creates a new dispatcher for the queue. The dispatcher will have a Timer with a unique task
     * that will get the requests from the queue and will try to send an offer to the agents.
     *
     * @param queue the queue that contains the requests and the agents that may attend the
     *        requests.
     */
    public RoundRobinDispatcher(RequestQueue queue) {
        this.queue = queue;
        agentList = new LinkedList<AgentSession>();
        properties = new JiveLiveProperties("fpDispatcherProp", queue.getID());
        try {
            info = infoProvider.getDispatcherInfo(queue.getWorkgroup(), queue.getID());
        }
        catch (NotFoundException e) {
            logger.error("Queue ID " + queue.getID(), e);
        }
        // Recreate the agentSelector to use for selecting the best agent to receive the offer
        loadAgentSelector();

        // Fill the list of AgentSessions that are active in the queue.  Once the list has been
        // filled this dispatcher will be notified when new AgentSessions join the queue or leave
        // the queue
        fillAgentsList();

        TaskEngine.getInstance().scheduleAtFixedRate(new TimerTask() {
            public void run() {
                checkForNewRequests();
            }
        }, 2000, 2000);
    }

    private void checkForNewRequests() {
        for(Request request : queue.getRequests()){
            // While there are requests pendings try to dispatch an offer for the request to an agent
            // Skip this request if there exists an offer for this requests that is being processed
            if (request.getOffer() != null && offers.contains(request.getOffer())) {
                continue;
            }
            injectRequest(request);
        }
    }

    public void injectRequest(Request request) {
        // Create a new Offer for the request and add it to the list of active offers
        final Offer offer = new Offer(request, queue, getAgentRejectionTimeout());
        offer.setTimeout(info.getOfferTimeout());
        offers.add(offer);
        // Process this offer in another thread
        Thread offerThread = new Thread("Dispatch offer - queue: " + queue.getName()) {
            public void run() {
                dispatch(offer);
                // Remove this offer from the list of active offers
                offers.remove(offer);
            }
        };
        offerThread.start();
    }

    /**
     * Dispatch the given request to one or more agents in the agent pool.<p>
     *
     * If this method returns, it is assumed that the request was properly
     * dispatched.The only exception is if an agent is not in the pool for routing
     * within the agent timeout period, the dispatch will throw an AgentNotFoundException
     * so the request can be re-routed.
     *
     * @param offer the offer to send to the best agent available.
     */
    public void dispatch(Offer offer) {
        // The time when the request should timeout
        long timeoutTime = System.currentTimeMillis() + info.getRequestTimeout();
        final Request request = offer.getRequest();
        boolean canBeInQueue = request instanceof UserRequest;
        Map<String,List<String>> map = request.getMetaData();
        String initialAgent = map.get("agent") == null || map.get("agent").isEmpty() ? null : map.get("agent").get(0);
        String ignoreAgent = map.get("ignore") == null || map.get("ignore").isEmpty() ? null : map.get("ignore").get(0);
        // Log debug trace
        logger.debug("RR - Dispatching request: " + request + " in queue: " + queue.getAddress());

        // Send the offer to the best agent. While the offer has not been accepted send it to the
        // next best agent. If there aren't any agent available then skip this section and proceed
        // to overflow the current request
        if (!agentList.isEmpty()) {
            for (long timeRemaining = timeoutTime - System.currentTimeMillis();
                 !offer.isAccepted() && timeRemaining > 0 && !offer.isCancelled();
                 timeRemaining = timeoutTime - System.currentTimeMillis()) {

                try {
                    AgentSession session = getBestNextAgent(initialAgent, ignoreAgent, offer);
                    if (session == null && agentList.isEmpty()) {
                        // Stop looking for an agent since there are no more agent available
                         break;
                    }
                    else if (session == null || offer.isRejector(session)) {
                        initialAgent = null;
                        Thread.sleep(1000);
                    }
                    else {
                        // Recheck for changed maxchat setting
                        Workgroup workgroup = request.getWorkgroup();
                        if (session.getCurrentChats(workgroup) < session.getMaxChats(workgroup)) {
                            // Set the timeout of the offer based on the remaining time of the
                            // initial request and the default offer timeout
                            timeRemaining = timeoutTime - System.currentTimeMillis();
                            offer.setTimeout(timeRemaining < info.getOfferTimeout() ?
                                    timeRemaining : info.getOfferTimeout());

                            // Make the offer and wait for a resolution to the offer
                            if (!request.sendOffer(session, queue)) {
                                // Log debug trace
                                logger.debug("RR - Offer for request: " + offer.getRequest() +
                                        " FAILED TO BE SENT to agent: " +
                                        session.getJID());
                                continue;
                            }
                            // Log debug trace
                            logger.debug("RR - Offer for request: " + offer.getRequest() + " SENT to agent: " +
                                    session.getJID());

                            offer.waitForResolution();
                            // If the offer was accepted, we send out the invites
                            // and reset the offer
                            if (offer.isAccepted()) {
                                // Get the first agent that accepted the offer
                                AgentSession selectedAgent = offer.getAcceptedSessions().get(0);
                                // Log debug trace
                                logger.debug("RR - Agent: " + selectedAgent.getJID() +
                                        " ACCEPTED request: " +
                                        request);
                                // Create the room and send the invitations
                                offer.invite(selectedAgent);
                                // Notify the agents that accepted the offer that the offer process
                                // has finished
                                for (AgentSession agent : offer.getAcceptedSessions()) {
                                    agent.removeOffer(offer);
                                }
                                if (canBeInQueue) {
                                    // Remove the user from the queue since his request has
                                    // been accepted
                                    queue.removeRequest((UserRequest) request);
                                }
                            }
                        }
                        else {
                            // Log debug trace
                            logger.debug("RR - Selected agent: " + session.getJID() +
                                    " has reached max number of chats");
                        }
                    }
                }
                catch (Exception e) {
                    logger.error(e);
                }
            }
        }
        if (!offer.isAccepted() && !offer.isCancelled()) {
            // Calculate the maximum time limit for an unattended request before cancelling it
            long limit = request.getCreationTime().getTime() +
                    (info.getRequestTimeout() * (getOverflowTimes() + 1));
            if (limit - System.currentTimeMillis() <= 0 || !canBeInQueue) {
                // Log debug trace
                logger.debug("RR - Cancelling request that maxed out overflow limit or cannot be queued: " + request);
                // Cancel the request if it has overflowed 'n' times
                request.cancel(Request.CancelType.AGENT_NOT_FOUND);
            }
            else {
                // Overflow if request timed out and was not dispatched and max number of overflows
                // has not been reached yet
                overflow(offer);
                // If there is no other queue to overflow then cancel the request
                if (!offer.isAccepted() && !offer.isCancelled()) {
                    // Log debug trace
                    logger.debug("RR - Cancelling request that didn't overflow: " + request);
                    request.cancel(Request.CancelType.AGENT_NOT_FOUND);
                }
            }
        }
    }

    /**
     * <p>Overflow the current request into another queue if possible.</p>
     * <p/>
     * <p>Future versions of the dispatcher may wish to overflow in
     * more sophisticated ways. Currently we do it according to overflow
     * rules: none (no overflow), backup (to a backup if it exists and is
     * available, or randomly.</p>
     *
     * @param offer the offer to place in the overflow queue.
     */
    private void overflow(Offer offer) {
        RequestQueue backup = null;
        if (RequestQueue.OverflowType.OVERFLOW_BACKUP.equals(queue.getOverflowType())) {
            backup = queue.getBackupQueue();
            // Check that the backup queue has agents available otherwise discard it
            if (backup != null && !backup.getAgentSessionList().containsAvailableAgents()) {
                backup = null;
            }
        }
        else if (RequestQueue.OverflowType.OVERFLOW_RANDOM.equals(queue.getOverflowType())) {
            backup = getRandomQueue();
        }
        // If a backup queue was found then cancel this offer, remove the request from the queue
        // and add the request in the backup queue
        if (backup != null) {
            offer.cancel();
            UserRequest request = (UserRequest) offer.getRequest();
            // Remove the request from the queue since it is going to be added to another
            // queue
            queue.removeRequest(request);
            // Log debug trace
            logger.debug("RR - Overflowing request: " + request + " to queue: " +
                    backup.getAddress());
            backup.addRequest(request);
        }
    }

    /**
     * Returns a queue that was randomly selected.
     *
     * @return a queue that was randomly selected.
     */
    private RequestQueue getRandomQueue() {
        int qCount = queue.getWorkgroup().getRequestQueueCount();
        if (qCount > 1) {
            // Build a list of all queues eligible for overflow
            LinkedList<RequestQueue> overflowQueueList = new LinkedList<RequestQueue>();
            for (RequestQueue overflowQueue : queue.getWorkgroup().getRequestQueues()) {
                if (!queue.equals(overflowQueue) && overflowQueue.getAgentSessionList().containsAvailableAgents()) {
                    overflowQueueList.addLast(overflowQueue);
                }
            }

            // If there are any eligible queues
            if (overflowQueueList.size() > 0) {
                // choose the random index of the overflow queue to use
                int targetIndex = (int) Math.floor(((float) (overflowQueueList.size())) * Math.random());
                if (targetIndex < overflowQueueList.size()) {
                    return overflowQueueList.get(targetIndex);
                }
            }
        }
        return null;
    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -