⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 acknowledgedmessagesclientextension.java

📁 是离开的肌肤了卡机是离开的就富利卡及是了的开发及拉考试及的福利科技阿斯利康的肌肤莱卡及时的离开福建阿斯顿发
💻 JAVA
字号:
package org.mortbay.cometd.ext;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Queue;import javax.servlet.UnavailableException;import org.cometd.Bayeux;import org.cometd.Client;import org.cometd.Extension;import org.cometd.Message;import org.mortbay.cometd.MessageImpl;import org.mortbay.util.ArrayQueue;/** * Acknowledged Message Client extension. *  * Tracks the batch id of messages sent to a client. * */public class AcknowledgedMessagesClientExtension implements Extension{    private final Client _client;    private final ArrayIdQueue<Message> _unackedQueue;        public AcknowledgedMessagesClientExtension(Client client)    {        _client=client;        _unackedQueue=new ArrayIdQueue<Message>(8,16,client);        _unackedQueue.setCurrentId(1);    }        public Message rcv(Client from, Message message)    {        return message;    }    /**     * Handle received meta messages.     * Looks for meta/connect messages with ext/ack fields.     * If present, delete all messages that have been acked and requeue messages that     * have not been acked.     */    public Message rcvMeta(Client from, Message message)    {              if (message.getChannel().equals(Bayeux.META_CONNECT))        {              synchronized (_client)            {                Map<String, Object> ext=message.getExt(false);                if (ext != null)                {                    Long acked=(Long) ext.get("ack");                    if (acked != null)                    {                        // We have received an ack ID, so delete the acked messages.                        final int s=_unackedQueue.size();                        if (s>0)                        {                            if (_unackedQueue.getAssociatedIdUnsafe(s-1)<=acked)                            {                                // we can just clear the queue                                for (int i=0;i<s;i++)                                {                                    final Message q = _unackedQueue.getUnsafe(i);                                    if (q instanceof MessageImpl)                                        ((MessageImpl)q).decRef();                                }                                _unackedQueue.clear();                            }                            else                            {                                // we need to remove elements until we see unacked                                for (int i=0;i<s;i++)                                {                                    if (_unackedQueue.getAssociatedIdUnsafe(0)<=acked)                                    {                                        final Message q = _unackedQueue.remove();                                        if (q instanceof MessageImpl)                                            ((MessageImpl)q).decRef();                                        continue;                                    }                                    break;                                }                            }                        }                    }                }                // requeue all unacked messages.                final ArrayQueue<Message> messages=(ArrayQueue)from.getQueue();                final int cid=_unackedQueue.getCurrentId();                final int s=_unackedQueue.size();                for (int i=0;i<s;i++)                {                    if (_unackedQueue.getAssociatedIdUnsafe(0)<cid)                        messages.add(i,_unackedQueue.remove());                    else                        break;                }            }        }        return message;    }    public Message send(Client from, Message message)    {        synchronized (_client)        {            _unackedQueue.add(message);            // prevent the message from being erased            ((MessageImpl) message).incRef();        }        return message;    }    public Message sendMeta(Client from, Message message)    {        if ( message.getChannel().equals(Bayeux.META_CONNECT) )        {            synchronized (_client)            {                Map<String, Object> ext= message.getExt(true);                ext.put("ack",_unackedQueue.getCurrentId());                _unackedQueue.incrementCurrentId();            }        }        return message;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -