📄 fileshare.cc
字号:
void FileShareSession::OnMessage(talk_base::Message* msg) { if (MSG_PROXY_WAIT == msg->message_id) { LOG_F(LS_INFO) << "MSG_PROXY_WAIT"; if (proxies_.empty() && IsComplete() && !IsClosed()) { DoClose(true); } }}// Session Signalsvoid FileShareSession::OnSessionState(cricket::Session* session, cricket::Session::State state) { // Once we are complete, state changes are meaningless. if (!IsComplete()) { switch (state) { case cricket::Session::STATE_SENTINITIATE: case cricket::Session::STATE_RECEIVEDINITIATE: OnInitiate(); break; case cricket::Session::STATE_SENTACCEPT: case cricket::Session::STATE_RECEIVEDACCEPT: case cricket::Session::STATE_INPROGRESS: SetState(FS_TRANSFER, false); break; case cricket::Session::STATE_SENTREJECT: case cricket::Session::STATE_SENTTERMINATE: case cricket::Session::STATE_DEINIT: if (local_cancel_) { SetState(FS_LOCAL_CANCEL, false); } else { SetState(FS_REMOTE_CANCEL, false); } break; case cricket::Session::STATE_RECEIVEDTERMINATE: if (is_sender()) { // If we are the sender, and the receiver downloaded the correct number // of bytes, then we assume the transfer was successful. We've // introduced support for explicit completion notification // (QN_SHARE_COMPLETE), but it's not mandatory at this point, so we need // this as a fallback. size_t total_bytes; GetTotalSize(total_bytes); if (bytes_transferred_ >= total_bytes) { SetState(FS_COMPLETE, false); break; } } // Fall through case cricket::Session::STATE_RECEIVEDREJECT: SetState(FS_REMOTE_CANCEL, false); break; case cricket::Session::STATE_INIT: case cricket::Session::STATE_SENTMODIFY: case cricket::Session::STATE_RECEIVEDMODIFY: case cricket::Session::STATE_SENTREDIRECT: default: // These states should not occur. ASSERT(false); break; } } if (state == cricket::Session::STATE_DEINIT) { if (!IsClosed()) { DoClose(false); } session_ = NULL; }}void FileShareSession::OnSessionInfoMessage(cricket::Session* session, const cricket::Session::XmlElements& els) { if (IsClosed()) return; ASSERT(NULL != session_); for (size_t i=0; i<els.size(); ++i) { if (is_sender() && (els[i]->Name() == QN_SHARE_CHANNEL)) { if (els[i]->HasAttr(buzz::QN_NAME)) { cricket::PseudoTcpChannel* channel = new cricket::PseudoTcpChannel(talk_base::Thread::Current(), session_); VERIFY(channel->Connect(els[i]->Attr(buzz::QN_NAME))); talk_base::StreamInterface* stream = channel->GetStream(); http_server_->HandleConnection(stream); } } else if (is_sender() && (els[i]->Name() == QN_SHARE_COMPLETE)) { // Normal file transfer has completed, but receiver may still be getting // previews. if (!IsComplete()) { SetState(FS_COMPLETE, true); } } else { LOG(LS_WARNING) << "Unknown FileShareSession info message: " << els[i]->Name().Merged(); } }}void FileShareSession::OnSessionChannelGone(cricket::Session* session, const std::string& name) { LOG_F(LS_WARNING) << "(" << name << ")"; ASSERT(session == session_); if (cricket::TransportChannel* channel = session->GetChannel(name)) { session->DestroyChannel(channel); }}// HttpClient Signalsvoid FileShareSession::OnHttpClientComplete(talk_base::HttpClient* http, int err) { LOG_F(LS_INFO) << "(" << err << ", " << http->response().scode << ")"; ASSERT(http == http_client_); ASSERT(NULL != session_); transfer_name_.clear(); counter_ = NULL; // counter_ is deleted by HttpClient http->response().document.reset(); bool success = (err == 0) && (http->response().scode == talk_base::HC_OK); const FileShareManifest::Item& item = manifest_->item(item_transferring_); talk_base::Pathname local_name; local_name.SetFilename(item.name); local_name.SetFolder(local_folder_); if (local_name.pathname() != transfer_path_) { const bool is_folder = (item.type == FileShareManifest::T_FOLDER); if (success && !talk_base::CreateUniqueFile(local_name, false)) { LOG(LS_ERROR) << "Couldn't rename downloaded file: " << local_name.pathname(); success = false; } talk_base::Pathname temp_name(transfer_path_); if (is_folder) { // The folder we want is a subdirectory of the transfer_path_. temp_name.AppendFolder(item.name); } if (!talk_base::Filesystem::MoveFile(temp_name.pathname(), local_name.pathname())) { success = false; LOG(LS_ERROR) << "Couldn't move downloaded file from '" << temp_name.pathname() << "' to '" << local_name.pathname(); } if (success && is_folder) { talk_base::Filesystem::DeleteFile(transfer_path_); } } if (!success) { if (!talk_base::Filesystem::DeleteFile(transfer_path_)) { LOG(LS_ERROR) << "Couldn't delete downloaded file: " << transfer_path_; } if (!IsComplete()) { SetState(FS_FAILURE, false); } return; } // We may have skipped over some items (if they are directories, or otherwise // failed. resize ensures that we populate the skipped entries with empty // strings. stored_location_.resize(item_transferring_ + 1); stored_location_[item_transferring_] = local_name.pathname(); // bytes_transferred_ represents the size of items which have completely // transferred, and is added to the progress of the currently transferring // items. if (item.size == FileShareManifest::SIZE_UNKNOWN) { bytes_transferred_ += 1; } else { bytes_transferred_ += item.size; } item_transferring_ += 1; NextDownload();}void FileShareSession::OnHttpClientClosed(talk_base::HttpClient* http, int err) { LOG_F(LS_INFO) << "(" << err << ")";}// HttpServer Signalsvoid FileShareSession::OnHttpRequest(talk_base::HttpServer* server, talk_base::HttpTransaction* transaction) { LOG_F(LS_INFO) << "(" << transaction->request()->path << ")"; ASSERT(server == http_server_); std::string path, query; size_t query_start = transaction->request()->path.find('?'); if (query_start != std::string::npos) { path = transaction->request()->path.substr(0, query_start); query = transaction->request()->path.substr(query_start + 1); } else { path = transaction->request()->path; } talk_base::Pathname remote_name(path); bool preview = (preview_path_ == remote_name.folder()); bool original = (source_path_ == remote_name.folder()); std::string requested_file(remote_name.filename()); talk_base::transform(requested_file, requested_file.size(), requested_file, talk_base::url_decode); size_t item_index; const FileShareManifest::Item* item = NULL; if (preview || original) { for (size_t i=0; i<manifest_->size(); ++i) { LOG(LS_INFO) << "++++ " << manifest_->item(i).name + " " << requested_file; if (manifest_->item(i).name == requested_file) { item_index = i; item = &manifest_->item(item_index); break; } } } talk_base::StreamInterface* stream = NULL; std::string mime_type(MIME_OCTET_STREAM); if (!item) { // Fall through } else if (preview) { // Only image previews allowed unsigned int width = 0, height = 0; if ((item->type == FileShareManifest::T_IMAGE) && !query.empty() && (sscanf(query.c_str(), "width=%u&height=%u", &width, &height) == 2)) { width = talk_base::_max<unsigned int>(1, talk_base::_min(width, kMaxPreviewSize)); height = talk_base::_max<unsigned int>(1, talk_base::_min(height, kMaxPreviewSize)); std::string pathname; if (is_sender_) { talk_base::Pathname local_path; local_path.SetFolder(local_folder_); local_path.SetFilename(item->name); pathname = local_path.pathname(); } else if ((item_index < stored_location_.size()) && !stored_location_[item_index].empty()) { pathname = stored_location_[item_index]; } if (!pathname.empty()) { transactions_.push_back(transaction); SignalResampleImage(pathname, width, height, transaction); } } } else if (item->type == FileShareManifest::T_FOLDER) { talk_base::Pathname local_path; local_path.SetFolder(local_folder_); local_path.AppendFolder(item->name); talk_base::TarStream* tar = new talk_base::TarStream; VERIFY(tar->AddFilter(local_path.folder_name())); if (tar->Open(local_path.parent_folder(), true)) { stream = tar; tar->SignalNextEntry.connect(this, &FileShareSession::OnNextEntry); mime_type = "application/x-tar"; } else { delete tar; } } else if ((item->type == FileShareManifest::T_FILE) || (item->type == FileShareManifest::T_IMAGE)) { talk_base::Pathname local_path; local_path.SetFolder(local_folder_); local_path.SetFilename(item->name); talk_base::FileStream* file = new talk_base::FileStream; LOG(LS_INFO) << "opening file " << local_path.pathname(); if (file->Open(local_path.pathname().c_str(), "rb")) { LOG(LS_INFO) << "File opened"; stream = file; } else { delete file; } } if (!stream) { transaction->response()->set_error(talk_base::HC_NOT_FOUND); } else if (original) { // We should never have more than one original request pending at a time ASSERT(NULL == counter_); StreamCounter* counter = new StreamCounter(stream); counter->SignalUpdateByteCount.connect(this, &FileShareSession::OnUpdateBytes); transaction->response()->set_success(mime_type.c_str(), counter); transfer_connection_id_ = transaction->connection_id(); item_transferring_ = item_index; counter_ = counter; } else { // Note: in the preview case, we don't set counter_, so the transferred // bytes won't be shown as progress, and won't trigger a state change. transaction->response()->set_success(mime_type.c_str(), stream); } LOG_F(LS_INFO) << "Result: " << transaction->response()->scode; http_server_->Respond(transaction);}void FileShareSession::OnHttpRequestComplete(talk_base::HttpServer* server, talk_base::HttpTransaction* transaction, int err) { LOG_F(LS_INFO) << "(" << transaction->request()->path << ", " << err << ")"; ASSERT(server == http_server_); // We only care about transferred originals if (transfer_connection_id_ != transaction->connection_id()) return; ASSERT(item_transferring_ < manifest_->size()); ASSERT(NULL != counter_); transfer_connection_id_ = talk_base::HTTP_INVALID_CONNECTION_ID; transfer_name_.clear(); counter_ = NULL; if (err == 0) { const FileShareManifest::Item& item = manifest_->item(item_transferring_); if (item.size == FileShareManifest::SIZE_UNKNOWN) { bytes_transferred_ += 1; } else { bytes_transferred_ += item.size; } }}void FileShareSession::OnHttpConnectionClosed(talk_base::HttpServer* server, int err, talk_base::StreamInterface* stream) { LOG_F(LS_INFO) << "(" << err << ")"; talk_base::Thread::Current()->Dispose(stream);}// TarStream Signalsvoid FileShareSession::OnNextEntry(const std::string& name, size_t size) { LOG_F(LS_VERBOSE) << "(" << name << ", " << size << ")"; transfer_name_ = name; SignalNextFile(this);}// Socket Signalsvoid FileShareSession::OnProxyAccept(talk_base::AsyncSocket* socket) { bool is_remote; if (socket == remote_listener_) { is_remote = true; ASSERT(NULL != session_); } else if (socket == local_listener_) { is_remote = false; } else { ASSERT(false); return; } while (talk_base::AsyncSocket* accepted = static_cast<talk_base::AsyncSocket*>(socket->Accept(NULL))) { // Check if connection is from localhost. if (accepted->GetRemoteAddress().ip() != 0x7F000001) { delete accepted; continue; } LOG_F(LS_VERBOSE) << (is_remote ? "[remote]" : "[local]"); if (is_remote) { char channel_name[64]; talk_base::sprintfn(channel_name, ARRAY_SIZE(channel_name), "proxy-%u", next_channel_id_++); talk_base::StreamInterface* remote = (NULL != session_) ? CreateChannel(channel_name) : NULL; if (!remote) { LOG_F(LS_WARNING) << "CreateChannel(" << channel_name << ") failed"; delete accepted; continue; } talk_base::StreamInterface* local = new talk_base::SocketStream(accepted); StreamRelay* proxy = new StreamRelay(local, remote, 64 * 1024); proxy->SignalClosed.connect(this, &FileShareSession::OnProxyClosed); proxies_.push_back(proxy); proxy->Circulate(); talk_base::Thread::Current()->Clear(this, MSG_PROXY_WAIT); } else { talk_base::StreamInterface* local = new talk_base::SocketStream(accepted); http_server_->HandleConnection(local); } }}void FileShareSession::OnProxyClosed(StreamRelay* proxy, int error) { ProxyList::iterator it = std::find(proxies_.begin(), proxies_.end(), proxy); if (it == proxies_.end()) { ASSERT(false); return; } LOG_F(LS_VERBOSE) << "(" << error << ")"; proxies_.erase(it); talk_base::Thread::Current()->Dispose(proxy); if (proxies_.empty() && IsComplete() && !IsClosed()) { talk_base::Thread::Current()->PostDelayed(kProxyWait, this, MSG_PROXY_WAIT); }}void FileShareSession::OnUpdateBytes(size_t count) { SignalUpdateProgress(this);}// Internal Helpersvoid FileShareSession::GenerateTemporaryPrefix(std::string* prefix) { std::string data = cricket::CreateRandomString(32); ASSERT(NULL != prefix); prefix->assign("/temporary/"); prefix->append(talk_base::MD5(data)); prefix->append("/");}void FileShareSession::GetItemNetworkPath(size_t index, bool preview, std::string* path) { ASSERT(index < manifest_->size()); ASSERT(NULL != path); // preview_path_ and source_path_ are url path segments, which are composed // with the address of the localhost p2p proxy to provide a url which IE can // use. std::string ue_name; const std::string& name = manifest_->item(index).name; talk_base::transform(ue_name, name.length() * 3, name, talk_base::url_encode); talk_base::Pathname pathname; pathname.SetFolder(preview ? preview_path_ : source_path_); pathname.SetFilename(ue_name);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -