📄 http.cc
字号:
int newEntry = 1; Tcl_HashEntry *he = Tcl_CreateHashEntry(&sstate_, (const char *)sid, &newEntry); if (he == NULL) return; if (newEntry) Tcl_SetHashValue(he, (ClientData)sst);}HttpMInvalCache::SState* HttpMInvalCache::lookup_sstate(int sid){ Tcl_HashEntry *he = Tcl_FindHashEntry(&sstate_, (const char *)sid); if (he == NULL) return NULL; return (SState *)Tcl_GetHashValue(he);}NeighborCache* HttpMInvalCache::lookup_nbr(int id){ Tcl_HashEntry *he = Tcl_FindHashEntry(&nbr_, (const char *)id); if (he == NULL) return NULL; return (NeighborCache *)Tcl_GetHashValue(he);}// Add a new neighbor cachevoid HttpMInvalCache::add_nbr(HttpMInvalCache *cache){ int newEntry = 1; Tcl_HashEntry *he = Tcl_CreateHashEntry(&nbr_, (const char *)cache->id(), &newEntry); if (he == NULL) return; // If this cache already exists, don't do anything if (!newEntry) return; // Start a timer for the neighbor LivenessTimer *timer = new LivenessTimer(this,HTTP_HBEXPIRE_COUNT*hb_interval_, cache->id()); double time = Scheduler::instance().clock(); NeighborCache *c = new NeighborCache(cache, time, timer); Tcl_SetHashValue(he, (ClientData)c);}// Two ways to receive a heartbeat: (1) via HttpInvalAgent; (2) via TCP // connection between a server and a primary cache. (See "server-hb" handling// in command().void HttpMInvalCache::recv_heartbeat(int id){ // Receive time of the heartbeat double time = Scheduler::instance().clock(); NeighborCache *c = lookup_nbr(id); if (c == NULL) { // XXX // The only possible place for this to happen is in the TLC // group, where no JOIN could ever reach. Moreover, // we don't even have an entry for that cache yet, so here // we add that cache into our entry, and later on we'll add // corresponding servers there. if (id == id_) return; add_nbr(map_cache(id));#ifdef WEBCACHE_DEBUG fprintf(stderr, "TLC %d discovered TLC %d\n", id_, id);#endif return; } else if (c->is_down()) { // Neighbor cache recovers. Don't do anything special and // let invalid entries recover themselves c->up();#ifdef WEBCACHE_DEBUG fprintf(stderr, "[%g] Cache %d reconnected to cache %d\n", Scheduler::instance().clock(), id_, id);#endif Tcl::instance().evalf("%s mark-rejoin", name_); } else // Update heartbeat time c->reset_timer(time);}void HttpMInvalCache::invalidate_server(int sid){ SState *sst = lookup_sstate(sid); if (sst->is_down()) // If this server is already marked down, return return; sst->down(); pool_->invalidate_server(sid);}void HttpMInvalCache::handle_node_failure(int cid){#ifdef WEBCACHE_DEBUG fprintf(stderr, "[%g] Cache %d disconnected from cache %d\n", Scheduler::instance().clock(), id_, cid);#endif Tcl::instance().evalf("%s mark-leave", name_); NeighborCache *c = lookup_nbr(cid); if (c == NULL) { fprintf(stderr, "%s: An unknown neighbor cache %d failed.\n", name_, cid); } // Mark the cache down c->down(); // Invalidate entries of all servers related to that cache // XXX We don't have an iterator for all servers in NeighborCache! c->invalidate(this); // Send leave message to all children HttpLeaveData* data = new HttpLeaveData(id_, c->num()); c->pack_leave(*data); send_leave(data);}void HttpMInvalCache::recv_leave(HttpLeaveData *d){#ifdef WEBCACHE_DEBUG fprintf(stderr, "[%g] Cache %d gets a LEAVE from cache %d\n", Scheduler::instance().clock(), id_, d->id());#endif if (d->num() == 0) { fprintf(stderr, "%s (%g) gets a leave from cache without server!\n", name_, Scheduler::instance().clock()); return; } SState *sst; HttpLeaveData* data = new HttpLeaveData(id_, d->num()); NeighborCache *c = lookup_nbr(d->id()); int i, j; for (i = 0, j = 0; i < d->num(); i++) { sst = lookup_sstate(d->rec_id(i)); // If we haven't heard of that server, which means we don't // have any page of that server, ignore the leave message. if (sst == NULL) continue; // If it's already marked down, don't bother again. if (sst->is_down()) continue; // If we hear a LEAVE about a server from one of // our child in the virtual distribution tree // of the server, ignore it. if (c != sst->cache()) continue; // We have the page, and we hold inval contract. Invalidate // the page and inform our children of it. sst->down(); data->add(j++, d->rec_id(i)); pool_->invalidate_server(d->rec_id(i)); Tcl::instance().evalf("%s mark-leave", name_); } // Delete it if it's not sent out if (j > 0) send_leave(data); delete data;}void HttpMInvalCache::send_leave(HttpLeaveData *d){ send_hb_helper(d->cost(), d);}void HttpMInvalCache::timeout(int reason){ switch (reason) { case HTTP_INVALIDATION: // Send an invalidation message send_heartbeat(); break; case HTTP_UPDATE: // XXX do nothing. May put client selective joining update // group here. break; default: fprintf(stderr, "%s: Unknown reason %d", name_, reason); break; }}void HttpMInvalCache::process_data(int size, AppData* data){ if (data == NULL) return; switch (data->type()) { case HTTP_INVALIDATION: { // Update timer for the source of the heartbeat HttpHbData *inv = (HttpHbData*)data; recv_heartbeat(inv->id()); recv_inv(inv); break; } case HTTP_UPDATE: { // Replace all updated pages HttpUpdateData *pg = (HttpUpdateData*)data; recv_upd(pg); break; } // JOIN messages are sent via TCP and direct TCL callback. case HTTP_LEAVE: { HttpLeaveData *l = (HttpLeaveData*)data; recv_leave(l); break; } default: HttpApp::process_data(size, data); return; }}void HttpMInvalCache::add_inv(const char *name, double mtime){ InvalidationRec *p = get_invrec(name); if ((p != NULL) && (p->mtime() < mtime)) { p->detach(); delete p; p = NULL; num_inv_--; } if (p == NULL) { p = new InvalidationRec(name, mtime); p->insert(&invlist_); num_inv_++; }}InvalidationRec* HttpMInvalCache::get_invrec(const char *name){ // XXX What should we do if we already have an // invalidation record of this page in our // invlist_? --> We should replace it with the new one InvalidationRec *r = invlist_; for (r = invlist_; r != NULL; r = r->next()) if (strcmp(name, r->pg()) == 0) return r; return NULL;}HttpHbData* HttpMInvalCache::pack_heartbeat(){ HttpHbData *data = new HttpHbData(id_, num_inv_); InvalidationRec *p = invlist_, *q; int i = 0; while (p != NULL) { data->add(i++, p); // Clearing up invalidation sending list if (!p->dec_scount()) { // Each invalidation is sent to its children // for at most HTTP_HBEXPIRE times. After that // the invalidation record is removed from // the list q = p; p = p->next(); q->detach(); delete q; num_inv_--; } else p = p->next(); } return data;}int HttpMInvalCache::recv_inv(HttpHbData *data){ if (data->num_inv() == 0) return 0; InvalidationRec *head; data->extract(head); int old_inv = num_inv_; process_inv(data->num_inv(), head, data->id()); //log("E GINV z %d\n", data->size()); if (old_inv < num_inv_) // This invalidation is valid return 1; else return 0;}// Get an invalidation, check invalidation modtimes, then setup // invalidation forwarding entries// The input invalidation record list is destroyed.void HttpMInvalCache::process_inv(int, InvalidationRec *ivlist, int cache){ InvalidationRec *p = ivlist, *q, *r; //int upd = 0; while (p != NULL) { ClientPage* pg = (ClientPage *)pool_->get_page(p->pg()); // XXX Establish server states. Server states only gets // established when we have a page (no matter if we have its // content), and we have got an invalidation for the page. // Then we know we've got an invalidation contract for the // page. if (pg != NULL) { check_sstate(pg->server()->id(), cache); // Count this invalidation no matter whether we're // going to drop it. But if we doesn't get it // from our virtual parent, don't count it SState *sst = lookup_sstate(pg->server()->id()); if (sst == NULL) { // How come we doesn't know the server??? fprintf(stderr, "%s %d: couldn't find the server.\n", __FILE__, __LINE__); abort(); } if ((sst->cache()->cache()->id() == cache) && (pg->mtime() > p->mtime())) { // Don't count repeated invalidations. pg->count_inval(Ca_, push_low_bound_); log("E NTF p %s v %d\n",p->pg(),pg->counter()); } } // Hook for filters of derived classes if (recv_inv_filter(pg, p) == HTTP_INVALCACHE_FILTERED) { // If we do not have the page, or we have (or know // about) a newer page, ignore this invalidation // record and keep going. // // If we have this version of the page, and it's // already invalid, ignore this extra invalidation q = p; p = p->next(); q->detach(); delete q; } else { // Otherwise we invalidate our page and setup a // invalidation sending record for the page pg->invalidate(p->mtime()); // Delete existing record for that page if any q = get_invrec(p->pg()); if ((q != NULL) && (q->mtime() < p->mtime())) { q->detach(); delete q; q = NULL; num_inv_--; } r = p; p = p->next(); r->detach(); // Insert it if necessary if (q == NULL) { r->insert(&invlist_); num_inv_++; // XXX Tcl::instance().evalf("%s mark-invalid",name_); log("E GINV p %s m %.17g\n", r->pg(), r->mtime()); } else delete r; } }}void HttpMInvalCache::send_hb_helper(int size, AppData *data){ if (inv_parent_ != NULL) inv_parent_->send(size, data->copy()); for (int i = 0; i < num_sender_; i++) inv_sender_[i]->send(size, data->copy());}void HttpMInvalCache::send_heartbeat(){ if ((num_sender_ == 0) && (inv_parent_ == NULL)) return; HttpHbData* d = pack_heartbeat(); send_hb_helper(d->cost(), d); delete d;}int HttpMInvalCache::recv_upd(HttpUpdateData *d){ if (d->num() != 1) { fprintf(stderr, "%d gets an update which contain !=1 pages.\n", id_); abort(); } ClientPage *pg = pool_->get_page(d->rec_page(0)); if (pg != NULL) if (pg->mtime() >= d->rec_mtime(0)) { // If we've already had this version, or a newer // version, ignore this old push// fprintf(stderr, "[%g] %d gets an old push\n", // Scheduler::instance().clock(), id_);// log("E OLD m %g p %g\n", d->rec_mtime(0), pg->mtime()); return 0; } else { // Our old page is invalidated by this new push, // set up invalidation records for our children add_inv(d->rec_page(0), d->rec_mtime(0)); pg->count_inval(Ca_, push_low_bound_); log("E NTF p %s v %d\n", d->rec_page(0),pg->counter()); } // Add the new page into our pool ClientPage *q = pool_->enter_page(d->rec_page(0), d->rec_size(0), d->rec_mtime(0), Scheduler::instance().clock(), d->rec_age(0)); // By default the page is valid and read. Set it as unread q->set_unread(); log("E GUPD m %.17g z %d\n", d->rec_mtime(0), d->pgsize()); Tcl::instance().evalf("%s mark-valid", name_); // XXX If the page was previously marked as MandatoryPush, then // we need to check if it's timed out if (q->is_mpush() && (Scheduler::instance().clock() - q->mpush_time() > HTTP_HBEXPIRE_COUNT*hb_interval_)) { // If mandatory push timer expires, stop push q->clear_mpush(); Tcl::instance().evalf("%s cancel-mpush-refresh %s", name_, d->rec_page(0)); } if (enable_upd_ && (q->counter() >= push_thresh_) || q->is_mpush()) // XXX Continue pushing if we either select to push, or // were instructed to do so. return 1; else return 0;}HttpUpdateData* HttpMInvalCache::pack_upd(ClientPage* page){ HttpUpdateData *data = new HttpUpdateData(id_, 1); data->add(0, page); return data;}void HttpMInvalCache::send_upd_helper(int pgsize, AppData* data){ for (int i = 0; i < num_updater_; i++) upd_sender_[i]->send(pgsize, data->copy());}void HttpMInvalCache::send_upd(ClientPage *page){ if ((num_updater_ == 0) || !enable_upd_) return; HttpUpdateData* d = pack_upd(page); send_upd_helper(d->pgsize(), d); delete d;}//----------------------------------------------------------------------// Multicast invalidation + two way liveness messages + // invalidation filtering. //----------------------------------------------------------------------static class HttpPercInvalCacheClass : public TclClass {public: HttpPercInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast/Perc") {} TclObject* create(int, const char*const*) { return (new HttpPercInvalCache()); }} class_HttpPercInvalCache_app;HttpPercInvalCache::HttpPercInvalCache() { bind("direct_request_", &direct_request_);}int HttpPercInvalCache::command(int argc, const char*const* argv){ Tcl& tcl = Tcl::instance(); if (strcmp(argv[1], "is-header-valid") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } tcl.resultf("%d", pg->is_header_valid()); return TCL_OK; } else if (strcmp(argv[1], "enter-metadata") == 0) { /* * <cache> enter-metadata <args...> * The same arguments as enter-page, but set the page status * as HTTP_VALID_HEADER, i.e., if we get a request, we need * to fetch the actual valid page content */ ClientPage *pg = pool_->enter_metadata(argc, argv); if (pg == NULL) return TCL_ERROR; else return TCL_OK; } return HttpMInvalCache::command(argc, argv);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -