⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 basicprovider.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                    logger.error(new LogMessage(new Object[]{                        "Hash collision on item ",                        theItem.ns, ".",                        theItem.rid, ".",                        String.valueOf(theItem.iid),                        ", item was NOT added to LSCAN ",                        prefix}));                }            }        }        // Pass back an iterator to the array        return items;    }    private void cleanupStorage() {        // lscan automatically cleans up old entries in search range, so a complete lscan would clean up all old enties        scanStorage("");        LocalNode.myTimer.schedule((cleanupPeriod / 1000), SIGNAL_CLEANUP,                                   this);    }    private void checkForNewDataCallbacks(StoredItem theItem, boolean renew) {        // Retrieve part of the call back tree        ArrayList clientArrays = new ArrayList();        String commonID = theItem.ns + delimiter + theItem.rid;        Iterator entries =            newDataCallbacks.headMap(commonID).entrySet().iterator();        while (entries.hasNext()) {            // Check if the client's request actually matches the new item            Map.Entry theEntry = (Map.Entry) entries.next();            if (commonID.startsWith((String) theEntry.getKey())) {                clientArrays.add(theEntry.getValue());            }        }        for (int i = 0; i < clientArrays.size(); i++) {            // Call the client            ArrayList clients = (ArrayList) clientArrays.get(i);            for (int j = 0; j < clients.size(); j++) {                ((ProviderClient) clients.get(j)).newDataResult(theItem.ns,                                                                theItem.rid,                                                                theItem.object,                                                                renew);            }        }    }    private String addItemToStorage(String ns, String rid, int iid,                                    Payload item, long lifetime,                                    BitID locationID) {        // Create storage container and insert into storage        double expiration = LocalNode.myTimer.getCurrentTime() + lifetime;        BitID storageID = computeStorageID(ns, rid, iid);        StoredItem theItem = StoredItem.allocate(ns, rid, iid, item,                                                 expiration, locationID,                                                 storageID);        storageManager.store(storageID, theItem);        // Add entry to the item directory        String commonID = ns + delimiter + rid + delimiter + iid;        storageDirectory.put(commonID, storageID);        // Check for newData requests        checkForNewDataCallbacks(theItem, false);        return commonID;    }    private void sendMessage(InetSocketAddress responsibleNode,                             ProviderMessage theMessage, int statgroup) {        StatCollector.addSample(            StatVars.NETWORK_OUT, StatVars.PROVIDER, statgroup,            SerializationManager.getPayloadSize(theMessage));        LocalNode.myUDPMessenger.send(listeningSocketAddress, responsibleNode,                                      theMessage);    }    private void processGetMessage(InetSocketAddress remoteAddr,                                   GetMessage theMessage) {        GetResponseMessage resultMessage;        String requestItemID = theMessage.getNS() + delimiter                               + theMessage.getRID() + delimiter;        StoredItemCollection results = scanStorage(requestItemID);        if (Output.debuggingEnabled) {            logger.debug(new LogMessage(new Object[]{"Item located ",                                                     requestItemID,                                                     " from node ",                                                     remoteAddr}));        }        resultMessage = GetResponseMessage.allocate(theMessage.getID(),                                                    theMessage.getNS(),                                                    theMessage.getRID(),                                                    theMessage.getLocationID(),                                                    true, results);        if (remoteAddr != null) {            sendMessage(remoteAddr, resultMessage,                        StatVars.PROVIDER_GETRESPONSE);        } else {            processGetResponseMessage(resultMessage);        }    }    private void processGetResponseMessage(GetResponseMessage theMessage) {        GetMessage theRequest =            (GetMessage) waitingRequests.remove(theMessage.getID());        if (theRequest != null) {            ProviderClient client =                (ProviderClient) requestClients.remove(theMessage.getID());            if (theMessage.getResult()) {                // Get was processed, return results to client                StoredItemCollection collection =                    (StoredItemCollection) theMessage.getItem();                Payload items[] = new Payload[collection.size()];                for (int i = 0; i < collection.size(); i++) {                    items[i] = collection.getItem(i).object;                }                client.getResult(theMessage.getNS(), theMessage.getRID(),                                 items);            } else {                // Error, resubmit                get(theMessage.getNS(), theMessage.getRID(), client);            }            GetMessage.free(theRequest);        }        GetResponseMessage.free(theMessage);    }    private void processPutMessage(InetSocketAddress remoteAddr,                                   PutMessage theMessage) {        // Check if the message is really for this node else send failure response        BitID locationID =            theMessage.getLocationID();    // should be computed on demand, but for efficiency stored        // Send success response        if (Output.debuggingEnabled) {            logger.debug(new StructuredLogMessage(theMessage, "Processed Put",                                                  new Object[] {                "n", theMessage.getNS(), "r", theMessage.getRID(), "i",                String.valueOf(theMessage.getIID()), "p",                Integer.toHexString(theMessage.getItem().hashCode()), "l",                String.valueOf(theMessage.getLifetime()), "d", locationID, "s",                remoteAddr, "m", theMessage.getID(),            }, new Object[]{"c", theMessage.getItem()}));        }        addItemToStorage(theMessage.getNS(), theMessage.getRID(),                         theMessage.getIID(), theMessage.getItem(),                         theMessage.getLifetime(), locationID);        PutResponseMessage resultMessage =            PutResponseMessage.allocate(theMessage.getID(), theMessage.getNS(),                                        theMessage.getRID(), locationID,                                        theMessage.getIID(), true);        if (remoteAddr != null) {            sendMessage(remoteAddr, resultMessage,                        StatVars.PROVIDER_PUTRESPONSE);        } else {            processPutResponseMessage(resultMessage);        }    }    private void processPutResponseMessage(PutResponseMessage theMessage) {        PutMessage theRequest =            (PutMessage) waitingRequests.remove(theMessage.getID());        if (theRequest != null) {            if (Output.debuggingEnabled) {                logger.debug(new LogMessage(new Object[]{                    "Recevied PutResponse:",                    theMessage.getID(),                    " NS: ",                    theRequest.getNS(),                    " RID: ",                    theRequest.getRID()}));            }            ProviderClient client =                (ProviderClient) requestClients.remove(theMessage.getID());            if ( !(theMessage.getResult())) {                // Error, resubmit                put(theRequest.getNS(), theRequest.getRID(),                    theRequest.getIID(), theRequest.getItem(),                    theRequest.getLifetime(), client);            } else {                // May be null if due to locationMapChange                if (client != null) {                    client.putResult(theRequest.getNS(), theRequest.getRID(),                                     theRequest.getIID(), true);                }                PutMessage.free(theRequest);            }        }        PutResponseMessage.free(theMessage);    }    private void processRenewMessage(InetSocketAddress remoteAddr,                                     RenewMessage theMessage) {        // Check if the message is really for this node else send failure response        BitID locationID =            theMessage.getLocationID();    // should be computed on demand, but for efficiency stored        // Find existing item        BitID storageID = computeStorageID(theMessage.getNS(),                                           theMessage.getRID(),                                           theMessage.getIID());        StoredItem theItem = (StoredItem) storageManager.retrieve(storageID);        RenewResponseMessage resultMessage;        if (theItem != null) {            // Update expiration            theItem.expiration = LocalNode.myTimer.getCurrentTime()                                 + theMessage.getLifetime();            // Check for newData requests            checkForNewDataCallbacks(theItem, true);            // Send success response            resultMessage = RenewResponseMessage.allocate(theMessage.getID(),                                                          theMessage.getNS(),                                                          theMessage.getRID(),                                                          locationID,                                                          theMessage.getIID(),                                                          true);        } else {            // Item was not found, send failure response            resultMessage = RenewResponseMessage.allocate(theMessage.getID(),                                                          theMessage.getNS(),                                                          theMessage.getRID(),                                                          locationID,                                                          theMessage.getIID(),                                                          false);        }        // Send the message or process locally if appropriate        if (remoteAddr != null) {            sendMessage(remoteAddr, resultMessage,                        StatVars.PROVIDER_RENEWRESPONSE);        } else {            processRenewResponseMessage(resultMessage);        }    }    private void processRenewResponseMessage(RenewResponseMessage theMessage) {        RenewMessage theRequest =            (RenewMessage) waitingRequests.remove(theMessage.getID());        if (theRequest != null) {            ProviderClient client =                (ProviderClient) requestClients.remove(theMessage.getID());            if (theMessage.getResult()) {                client.renewalResult(theMessage.getNS(), theMessage.getRID(),                                     theMessage.getIID(), true);            } else {                client.renewalResult(theMessage.getNS(), theMessage.getRID(),                                     theMessage.getIID(), false);            }            RenewMessage.free(theRequest);        }        RenewResponseMessage.free(theMessage);    }    /** Removes all data and state */    public void reset() {        // reinitialize data structures        requestClients = new HashMap();        waitingRequests = new HashMap();        newDataCallbacks = new TreeMap();        messageCallbacks = new TreeMap();        messageUpCallCallbacks = new TreeMap();        storageDirectory = new TreeMap();        // reregister for location map changes        locationService.registerClient(this, applicationID,                                       listeningSocketAddress);    }    /**     * Method guid_to_string     *     * @param i     * @return     */    public static String guid_to_string(BigInteger i) {        // Print only the high-order 8 hexedecimal digits.        String result = i.toString(16);        while (result.length() < 40) {            result = "0" + result;        }        result = result.substring(0, 8);        return result;    }    /**     * Method handleClock     *     * @param clockData     */    public void handleClock(Object clockData) {        if (clockData instanceof DataRequest) {            DataRequest request = (DataRequest) clockData;            if (request.action == DataRequest.ACTION_LSCAN) {                processLscanRequest((DataRequest) clockData);            }            if (request.action == DataRequest.ACTION_NEWDATA) {                processNewDataRequest((DataRequest) clockData);            }            if (request.action == DataRequest.ACTION_UPCALLS) {                processUpcallsRequest((DataRequest) clockData);            }            if (request.action == DataRequest.ACTION_MESSAGES) {                processMessagesRequest((DataRequest) clockData);            }        }        if (clockData == SIGNAL_CLEANUP) {            cleanupStorage();        }    }    private class DataRequest {        private static final int ACTION_LSCAN = 0;        private static final int ACTION_NEWDATA = 1;        private static final int ACTION_UPCALLS = 2;        private static final int ACTION_MESSAGES = 3;        public String ns;        public ProviderClient client;        public int action;        /**         * Constructor DataRequest         *         * @param ns         * @param client         * @param action         */        public DataRequest(String ns, ProviderClient client, int action) {            this.ns = ns;            this.client = client;            this.action = action;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -