📄 http.cc
字号:
} tcl.resultf("%d", (enable_upd_ && (pg->counter() >= push_thresh_) || pg->is_mpush())); return TCL_OK; } break; case 'r': if ((strcmp(argv[1], "request-mpush") == 0) || (strcmp(argv[1], "refresh-mpush") == 0)) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } pg->set_mpush(Scheduler::instance().clock()); return TCL_OK; } break; case 's': if (strcmp(argv[1], "send-hb") == 0) { send_heartbeat(); return TCL_OK; } else if (strcmp(argv[1], "stop-mpush") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } pg->clear_mpush(); fprintf(stderr, "server %d stopped mpush\n", id_); return TCL_OK; } break; } return HttpApp::command(argc, argv);}void HttpYucInvalServer::add_inv(const char *name, double mtime){ InvalidationRec *p = get_invrec(name); if ((p != NULL) && (p->mtime() < mtime)) { p->detach(); delete p; p = NULL; num_inv_--; } if (p == NULL) { p = new InvalidationRec(name, mtime); p->insert(&invlist_); num_inv_++; }}InvalidationRec* HttpYucInvalServer::get_invrec(const char *name){ // XXX What should we do if we already have an // invalidation record of this page in our // invlist_? --> We should replace it with the new one InvalidationRec *r = invlist_; for (r = invlist_; r != NULL; r = r->next()) if (strcmp(name, r->pg()) == 0) return r; return NULL;}HttpHbData* HttpYucInvalServer::pack_heartbeat(){ HttpHbData *data = new HttpHbData(id_, num_inv_); InvalidationRec *p = invlist_, *q; int i = 0; while (p != NULL) { data->add(i++, p); // Clearing up invalidation sending list if (!p->dec_scount()) { // Each invalidation is sent to its children // for at most HTTP_HBEXPIRE times. After that // the invalidation record is removed from // the list q = p; p = p->next(); q->detach(); delete q; num_inv_--; } else p = p->next(); } return data;}void HttpYucInvalServer::send_hb_helper(int size, AppData *data){ inv_sender_->send(size, data);}void HttpYucInvalServer::send_heartbeat(){ if (inv_sender_ == NULL) return; HttpHbData* d = pack_heartbeat(); send_hb_helper(d->cost(), d);}//----------------------------------------------------------------------// Http cache with invalidation protocols. Http/Cache and Http/Cache/Inval// are used as base classes and provide common TCL methods. Http/Cache // derives Http/Cache/TTL and Http/Cache/TTL/Old. Http/Cache/Inval derives// unicast invalidation and multicast invalidation.//----------------------------------------------------------------------static class HttpCacheClass : public TclClass {public: HttpCacheClass() : TclClass("Http/Cache") {} TclObject* create(int, const char*const*) { return (new HttpCache()); }} class_httpcache_app;static class HttpInvalCacheClass : public TclClass {public: HttpInvalCacheClass() : TclClass("Http/Cache/Inval") {} TclObject* create(int, const char*const*) { return (new HttpInvalCache()); }} class_httpinvalcache_app;static class HttpMInvalCacheClass : public TclClass {public: HttpMInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast") {} TclObject* create(int, const char*const*) { return (new HttpMInvalCache()); }} class_HttpMInvalCache_app;// Static members and functionsHttpMInvalCache** HttpMInvalCache::CacheRepository_ = NULL;int HttpMInvalCache::NumCache_ = 0;void HttpMInvalCache::add_cache(HttpMInvalCache *c){ if (CacheRepository_ == NULL) { CacheRepository_ = new HttpMInvalCache* [c->id() + 1]; CacheRepository_[c->id()] = c; NumCache_ = c->id(); } else if (NumCache_ < c->id()) { HttpMInvalCache** p = new HttpMInvalCache* [c->id()+1]; memcpy(p, CacheRepository_, (c->id()+1)*sizeof(HttpMInvalCache*)); delete[]CacheRepository_; CacheRepository_ = p; NumCache_ = c->id(); p[c->id()] = c; } else CacheRepository_[c->id()] = c;}HttpMInvalCache::HttpMInvalCache() : hb_timer_(this, HTTP_HBINTERVAL), inv_sender_(0), num_sender_(0), size_sender_(0), invlist_(0), num_inv_(0), inv_parent_(NULL), upd_sender_(NULL), num_updater_(0), size_updater_(0){ bind("hb_interval_", &hb_interval_); bind("enable_upd_", &enable_upd_); // If we allow push bind("Ca_", &Ca_); bind("Cb_", &Cb_); bind("push_thresh_", &push_thresh_); bind("push_high_bound_", &push_high_bound_); bind("push_low_bound_", &push_low_bound_); hb_timer_.set_interval(hb_interval_); Tcl_InitHashTable(&sstate_, TCL_ONE_WORD_KEYS); Tcl_InitHashTable(&nbr_, TCL_ONE_WORD_KEYS);}HttpMInvalCache::~HttpMInvalCache() { if (num_sender_ > 0) delete []inv_sender_; Tcl_DeleteHashTable(&sstate_); Tcl_DeleteHashTable(&nbr_);}int HttpMInvalCache::command(int argc, const char*const* argv){ Tcl& tcl = Tcl::instance(); if (argc < 2) return HttpInvalCache::command(argc, argv); switch (argv[1][0]) { case 'a': if ((strcmp(argv[1], "add-inval-listener") == 0) || (strcmp(argv[1], "add-upd-listener") == 0)) { HttpInvalAgent *tmp = (HttpInvalAgent *)TclObject::lookup(argv[2]); tmp->attachApp((Application *)this); return TCL_OK; } else if (strcmp(argv[1], "add-inval-sender") == 0) { HttpInvalAgent *tmp = (HttpInvalAgent *)TclObject::lookup(argv[2]); if (tmp == NULL) { tcl.resultf("Non-existent agent %s", argv[2]); return TCL_ERROR; } if (num_sender_ == size_sender_) { HttpInvalAgent **tt = new HttpInvalAgent*[size_sender_+5]; memcpy(tt, inv_sender_, sizeof(HttpInvalAgent*)*size_sender_); delete []inv_sender_; size_sender_ += 5; inv_sender_ = tt; } inv_sender_[num_sender_++] = tmp; return TCL_OK; } else if (strcmp(argv[1], "add-to-map") == 0) { add_cache(this); return TCL_OK; } else if (strcmp(argv[1], "add-upd-sender") == 0) { HttpInvalAgent *tmp = (HttpInvalAgent *)TclObject::lookup(argv[2]); if (tmp == NULL) { tcl.resultf("Non-existent agent %s", argv[2]); return TCL_ERROR; } if (num_updater_ == size_updater_) { HttpInvalAgent **tt = new HttpInvalAgent*[size_updater_+5]; memcpy(tt, upd_sender_, sizeof(HttpInvalAgent*)*size_updater_); delete []upd_sender_; size_updater_ += 5; upd_sender_ = tt; } upd_sender_[num_updater_++] = tmp; return TCL_OK; } break; case 'c': if (strcmp(argv[1], "count-request") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d count-request: No page %s", id_, argv[2]); return TCL_ERROR; } pg->count_request(Cb_, push_high_bound_); log("E NTF p %s v %d\n", argv[2], pg->counter()); return TCL_OK; } else if (strcmp(argv[1], "check-sstate") == 0) { /* * <cache> check-sstate <sid> <cid> * If server is re-connected, reinstate it */ int sid = atoi(argv[2]); int cid = atoi(argv[3]); check_sstate(sid, cid); return TCL_OK; } break; case 'i': // XXX We don't need a "is-pushable" for cache! if (strcmp(argv[1], "is-unread") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-unread: No page %s", id_, argv[2]); return TCL_ERROR; } tcl.resultf("%d", pg->is_unread()); return TCL_OK; } break; case 'j': if (strcmp(argv[1], "join") == 0) { /* * <cache> join <server_id> <cache> * * <server> join via <cache>. If they are the same, * it means we are the primary cache for <server>. */ int sid = atoi(argv[2]); HttpMInvalCache *cache = (HttpMInvalCache*)TclObject::lookup(argv[3]); if (cache == NULL) { tcl.add_errorf("Non-existent cache %s", argv[3]); return TCL_ERROR; } // Add neighbor cache if necessary NeighborCache *c = lookup_nbr(cache->id()); if (c == NULL) add_nbr(cache); // Establish server invalidation contract check_sstate(sid, cache->id()); return TCL_OK; } break; case 'p': if (strcmp(argv[1], "parent-cache") == 0) { /* * <cache> parent-cache <web_server_id> * Return the parent cache of <web_server_id> in the * virtual distribution tree. */ int sid = atoi(argv[2]); SState *sst = lookup_sstate(sid); if (sst == NULL) tcl.result(""); else { // Bad hack... :( NeighborCache *c = lookup_nbr(sst->cache()->cache()->id()); tcl.resultf("%s", c->cache()->name()); } return TCL_OK; } else if (strcmp(argv[1], "push-children") == 0) { // Multicast the pushed page to all children ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } send_upd(pg); return TCL_OK; } break; case 'r': if (strcmp(argv[1], "recv-inv") == 0) { /* * <cache> recv-inv <pageid> <modtime> * This should be called only by a web server, * therefore we do not check the validity of the * invalidation */ // Pack it into a HttpHbData, and process it HttpHbData *d = new HttpHbData(id_, 1); strcpy(d->rec_pg(0), argv[2]); d->rec_mtime(0) = strtod(argv[3], NULL); //int old_inv = num_inv_; tcl.resultf("%d", recv_inv(d)); delete d; return TCL_OK; } else if (strcmp(argv[1], "recv-push") == 0) { /* * <cache> recv-push <pageid> args */ HttpUpdateData *d = new HttpUpdateData(id_, 1); strcpy(d->rec_page(0), argv[2]); for (int i = 3; i < argc; i+=2) { if (strcmp(argv[i], "modtime") == 0) d->rec_mtime(0) = strtod(argv[i+1], NULL); else if (strcmp(argv[i], "size") == 0) { d->rec_size(0) = atoi(argv[i+1]); // XXX need to set total update page size d->set_pgsize(d->rec_size(0)); } else if (strcmp(argv[i], "age") == 0) d->rec_age(0) = strtod(argv[i+1], NULL); } tcl.resultf("%d", recv_upd(d)); delete d; return TCL_OK; } else if (strcmp(argv[1], "register-server") == 0) { /* * <self> register-server <cache_id> <server_id> * We get a GET response about a page from <server>, * which we hear from <cache> */ int cid = atoi(argv[2]); int sid = atoi(argv[3]); // Assuming we've already known the cache check_sstate(sid, cid); return TCL_OK; } break; case 's': if (strcmp(argv[1], "start-hbtimer") == 0) { if (hb_timer_.status() == TIMER_IDLE) hb_timer_.sched(); return TCL_OK; } else if (strcmp(argv[1], "server-hb") == 0) { int id = atoi(argv[2]); recv_heartbeat(id); return TCL_OK; } else if (strcmp(argv[1], "set-pinv-agent") == 0) { inv_parent_ = (HttpUInvalAgent*)TclObject::lookup(argv[2]); return TCL_OK; } else if (strcmp(argv[1], "set-parent") == 0) { HttpMInvalCache *c = (HttpMInvalCache*)TclObject::lookup(argv[2]); if (c == NULL) { tcl.add_errorf("Non-existent cache %s", argv[2]); return TCL_ERROR; } // Add parent cache into known cache list add_nbr(c); return TCL_OK; } else if (strcmp(argv[1], "set-unread") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } pg->set_unread(); return TCL_OK; } else if (strcmp(argv[1], "set-read") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } pg->set_read(); return TCL_OK; } else if (strcmp(argv[1], "set-mandatory-push") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } pg->set_mpush(Scheduler::instance().clock()); return TCL_OK; } else if (strcmp(argv[1], "stop-mpush") == 0) { ClientPage *pg = (ClientPage *)pool_->get_page(argv[2]); if (pg == NULL) { tcl.resultf("%d is-valid: No page %s", id_, argv[2]); return TCL_ERROR; } pg->clear_mpush(); return TCL_OK; } break; default: break; } return HttpInvalCache::command(argc, argv);}void HttpMInvalCache::check_sstate(int sid, int cid){ if ((sid == cid) && (cid == id_)) // How come? return; SState *sst = lookup_sstate(sid); NeighborCache *c = lookup_nbr(cid); if (sst == NULL) { if (c == NULL) { fprintf(stderr, "%g: cache %d: No neighbor cache for received invalidation from %d via %d\n", Scheduler::instance().clock(), id_, sid, cid); abort(); }#ifdef WEBCACHE_DEBUG fprintf(stderr, "%g: cache %d: registered server %d via cache %d\n", Scheduler::instance().clock(), id_, sid, cid);#endif sst = new SState(c); add_sstate(sid, sst); c->add_server(sid); } else if (sst->is_down()) { sst->up(); if (cid != id_) { if (c == NULL) { fprintf(stderr, "[%g]: Cache %d has an invalid neighbor cache %d\n", Scheduler::instance().clock(), id_, cid); abort(); } c->server_up(sid); }#ifdef WEBCACHE_DEBUG fprintf(stderr, "[%g] Cache %d reconnected to server %d via cache %d\n", Scheduler::instance().clock(), id_, sid, cid);#endif Tcl::instance().evalf("%s mark-rejoin", name_); }}void HttpMInvalCache::add_sstate(int sid, SState *sst){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -