📄 datastore.c
字号:
/* * Copyright (c) 2003-2005 Frank Dabek (fdabek@mit.edu) * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include "datastore.h"#include "observers/datastoreobserver.h"#include "observers/chordobserver.h"#include "protocols/consistenthash.h"#include "p2psim/p2psim_hashmap.h"#include <iostream>#include <string.h>using namespace std;DataStore::DataStore (IPAddress i, Args& a, LocTable *l /* = NULL, see .h*/) : ChordFingerPNS(i, a, l){ _nreplicas = a.nget<uint>("replicas", 3, 10);}voidDataStore::initstate (){ ChordFingerPNS::initstate(); //find the range of keys we should be storing using global knowledge vector<IDMap> ids = ChordObserver::Instance(NULL)->get_sorted_nodes(); IDMap pred; int index = -1; assert (ids.size () > 0); for (int i = ids.size () - 1; i >= 0; i--) { //found ourselves, if (ids[i].id == me.id) { //count back _nreplicas index = i - _nreplicas; if (index < 0) index = ids.size () + index; pred = ids[index]; } } assert (index >= 0); cerr << me.id << ": storing items between " << pred.id << " and " << me.id << "\n"; vector<DataItem> vd = DataStoreObserver::Instance(NULL)->get_data(); for (unsigned int i = 0; i < vd.size (); i++) { DataItem d = vd[i]; if (ConsistentHash::between (pred.id, me.id, d.key)) store (d); }}void DataStore::join (Args* a){ //start stabilization for data int start = random () % 5000; delaycb (start, &DataStore::stabilize_data, (void *) 0); ChordFingerPNS::join (a);}voidDataStore::fetch_handler (fetch_args *args, fetch_res *ret){ if (data_present (args->key)) ret->present = true; else ret->present = false;}voidDataStore::lookup (Args *args){ lookup_args *a = New lookup_args; bzero (a, sizeof (lookup_args)); a->key = args->nget<CHID>("key"); a->start = now(); IDMap lasthop; vector<IDMap> v; if (_recurs) v = find_successors_recurs(a->key, _nreplicas, TYPE_USER_LOOKUP, &lasthop, a); else v = find_successors (a->key, _nreplicas, TYPE_USER_LOOKUP, &lasthop, a); if (!alive()) return; if (v.size() > 0) { bool done = false; int nsucc = 0; while (!done) { IDMap succ = v[nsucc]; //send an RPC to get the data // we should really have it returned directly fetch_args args; args.key = a->key; fetch_res res; res.present = false; doRPC (succ.ip, &DataStore::fetch_handler, &args, &res); if (!alive()) break; if (res.present) { cerr << ip () << " " << now () << " : data object " << a->key << " found successfully at node " << succ.id << " (" << nsucc << ")\n"; record_lookup_stat (me.ip, succ.ip, now () - a->start, true, true); done = true; } else { nsucc++; if (nsucc >= (uint) _nreplicas || nsucc >= (uint) v.size ()) { cerr << ip () << ": error looking up " << a->key << " " << nsucc << " " << _nreplicas << " " << v.size () << "\n"; record_lookup_stat (me.ip, succ.ip, now () - a->start, true, false); done = true; } } } } delete a;}void DataStore::stabilize_data (void *a){ if (!alive() || _nreplicas < 2) return; cerr << ip () << " " << now () << " starting stabilize\n"; //merkle-like scheme vector<IDMap> succs = loctable->succs(me.id+1,_nsucc,LOC_ONCHECK); //get a successor's database getdb_args args; getdb_res res; doRPC (succs[_curr_succ].ip, &DataStore::getdb_handler, &args, &res); if (!alive ()) return; senddata_args sd_arg; senddata_res sd_res; //make sure that he has all of the keys that we do hash_map<CHID, DataItem>::iterator it = db.begin (); for (; it != db.end (); ++it) { uint i = 0; for (; i < res.keys.size (); i++) { // cerr << "comparing " << (*it).first << " and " << res.keys[i].key << "\n"; if ((*it).first == res.keys[i].key) break; } if (i == res.keys.size ()) { //add it to the database cerr << "sending " << (*it).first << " to succ # " << _curr_succ << "\n"; sd_arg.keys.push_back ((*it).second); } } //send the message if (sd_arg.keys.size () > 0) { cerr << "sending " << sd_arg.keys.size () << " keys to succ # " << _curr_succ << " of " << db.size () << "\n"; doRPC (succs[_curr_succ].ip, &DataStore::senddata_handler, &sd_arg, &sd_res); if (!alive ()) return; } _curr_succ++; if (_curr_succ >= _nreplicas - 1) _curr_succ = 0; int start = random () % 5000; delaycb (start, &DataStore::stabilize_data, (void *) 0);}voidDataStore::senddata_handler (senddata_args *args, senddata_res *res){ for (uint i = 0; i < args->keys.size (); i++) { store (args->keys[i]); }}voidDataStore::getdb_handler (getdb_args *args, getdb_res *res){ hash_map<CHID, DataItem>::iterator i = db.begin (); for (; i != db.end (); ++i) { res->keys.push_back ((*i).second); } cerr << ip () << " getdb_handler: returned " << res->keys.size () << " keys of " << db.size () << "\n";}//----- database routinesvoidDataStore::store (DataItem d){ db[d.key] = d; db_size += d.size;}voidDataStore::remove (CHID k){ if (data_present (k)) { hash_map<CHID, DataItem>::iterator i = db.find (k); db.erase (i); }}boolDataStore::data_present (CHID k) { return (db.find (k) != db.end ());}intDataStore::data_size (CHID k){ if (data_present (k)) return db[k].size; else return -1;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -