Commit 9cf453b7 authored by hirono's avatar hirono Committed by Commit bot

Drive: Let DriveUploader use batch request API.

The CL lets DriveUploader use batch request API. DriveUploader does asynchronous
preparation before calling API on service interface. To hold batch request
during the asynchronous preparation, the CL introduce refcounted hepler class,
which manages life time of batch request configurator.

BUG=451917
TEST=DriveUploaderTest

Review URL: https://codereview.chromium.org/1134633003

Cr-Commit-Position: refs/heads/master@{#329599}
parent 8706f545
......@@ -41,22 +41,18 @@ void JobQueue::PopForRun(int accepted_priority, std::vector<JobID>* jobs) {
return;
// Looks up the queue in the order of priority upto |accepted_priority|.
bool processing_batch_request = false;
int64 total_size = 0;
uint64 total_size = 0;
bool batchable = true;
for (int priority = 0; priority <= accepted_priority; ++priority) {
auto it = queue_[priority].begin();
while (it != queue_[priority].end()) {
if (!processing_batch_request ||
(it->batchable && total_size + it->size <= max_batch_size_)) {
total_size += it->size;
processing_batch_request = it->batchable;
jobs->push_back(it->id);
running_.insert(it->id);
it = queue_[priority].erase(it);
if (processing_batch_request)
continue;
}
return;
while (!queue_[priority].empty()) {
const auto& item = queue_[priority].front();
total_size += item.size;
batchable = batchable && item.batchable && total_size <= max_batch_size_;
if (!(jobs->empty() || batchable))
return;
jobs->push_back(item.id);
running_.insert(item.id);
queue_[priority].pop_front();
}
}
}
......
......@@ -4,6 +4,8 @@
#include "chrome/browser/chromeos/drive/job_scheduler.h"
#include <algorithm>
#include "base/files/file_util.h"
#include "base/message_loop/message_loop.h"
#include "base/metrics/histogram.h"
......@@ -742,7 +744,9 @@ void JobScheduler::QueueJob(JobID job_id) {
const JobInfo& job_info = job_entry->job_info;
const QueueType queue_type = GetJobQueueType(job_info.job_type);
queue_[queue_type]->Push(job_id, job_entry->context.type, false,
const bool batchable = job_info.job_type == TYPE_UPLOAD_EXISTING_FILE ||
job_info.job_type == TYPE_UPLOAD_NEW_FILE;
queue_[queue_type]->Push(job_id, job_entry->context.type, batchable,
job_info.num_total_bytes);
// Temporary histogram for crbug.com/229650.
......@@ -802,25 +806,28 @@ void JobScheduler::DoJobLoop(QueueType queue_type) {
if (job_ids.empty())
return;
// TODO(hirono): Currently all requests are not batchable. So the queue always
// return just 1 job.
DCHECK_EQ(1u, job_ids.size());
JobEntry* entry = job_map_.Lookup(job_ids.front());
DCHECK(entry);
if (job_ids.size() > 1)
uploader_->StartBatchProcessing();
JobInfo* job_info = &entry->job_info;
job_info->state = STATE_RUNNING;
job_info->start_time = now;
NotifyJobUpdated(*job_info);
for (JobID job_id : job_ids) {
JobEntry* entry = job_map_.Lookup(job_id);
DCHECK(entry);
entry->cancel_callback = entry->task.Run();
JobInfo* job_info = &entry->job_info;
job_info->state = STATE_RUNNING;
job_info->start_time = now;
NotifyJobUpdated(*job_info);
UpdateWait();
entry->cancel_callback = entry->task.Run();
logger_->Log(logging::LOG_INFO, "Job started: %s - %s",
job_info->ToString().c_str(),
GetQueueInfo(queue_type).c_str());
}
logger_->Log(logging::LOG_INFO,
"Job started: %s - %s",
job_info->ToString().c_str(),
GetQueueInfo(queue_type).c_str());
if (job_ids.size() > 1)
uploader_->StopBatchProcessing();
UpdateWait();
}
int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
......
......@@ -44,6 +44,30 @@ const int64 kUploadChunkSize = (1LL << 30); // 1GB
const int64 kMaxMultipartUploadSize = (1LL << 20); // 1MB
} // namespace
// Refcounted helper class to manage batch request. DriveUploader uses the class
// for keeping the BatchRequestConfigurator instance while it prepares upload
// file information asynchronously. DriveUploader discard the reference after
// getting file information and the instance will be destroyed after all
// preparations complete. At that time, the helper instance commits owned batch
// request at the destrutor.
class DriveUploader::RefCountedBatchRequest
: public base::RefCounted<RefCountedBatchRequest> {
public:
RefCountedBatchRequest(
scoped_ptr<BatchRequestConfiguratorInterface> configurator)
: configurator_(configurator.Pass()) {}
// Gets pointer of BatchRequestConfiguratorInterface owned by the instance.
BatchRequestConfiguratorInterface* configurator() const {
return configurator_.get();
}
private:
friend class base::RefCounted<RefCountedBatchRequest>;
~RefCountedBatchRequest() { configurator_->Commit(); }
scoped_ptr<BatchRequestConfiguratorInterface> configurator_;
};
// Structure containing current upload information of file, passed between
// DriveServiceInterface methods and callbacks.
struct DriveUploader::UploadFileInfo {
......@@ -155,7 +179,17 @@ CancelCallback DriveUploader::UploadNewFile(
local_file_path, content_type, callback, progress_callback)),
base::Bind(&DriveUploader::CallUploadServiceAPINewFile,
weak_ptr_factory_.GetWeakPtr(), parent_resource_id, title,
options));
options, current_batch_request_));
}
void DriveUploader::StartBatchProcessing() {
DCHECK(current_batch_request_ == nullptr);
current_batch_request_ =
new RefCountedBatchRequest(drive_service_->StartBatchRequest().Pass());
}
void DriveUploader::StopBatchProcessing() {
current_batch_request_ = nullptr;
}
CancelCallback DriveUploader::UploadExistingFile(
......@@ -175,7 +209,8 @@ CancelCallback DriveUploader::UploadExistingFile(
scoped_ptr<UploadFileInfo>(new UploadFileInfo(
local_file_path, content_type, callback, progress_callback)),
base::Bind(&DriveUploader::CallUploadServiceAPIExistingFile,
weak_ptr_factory_.GetWeakPtr(), resource_id, options));
weak_ptr_factory_.GetWeakPtr(), resource_id, options,
current_batch_request_));
}
CancelCallback DriveUploader::ResumeUploadFile(
......@@ -190,8 +225,7 @@ CancelCallback DriveUploader::ResumeUploadFile(
DCHECK(!callback.is_null());
scoped_ptr<UploadFileInfo> upload_file_info(new UploadFileInfo(
local_file_path, content_type,
callback, progress_callback));
local_file_path, content_type, callback, progress_callback));
upload_file_info->upload_location = upload_location;
return StartUploadFile(
......@@ -243,12 +277,17 @@ void DriveUploader::CallUploadServiceAPINewFile(
const std::string& parent_resource_id,
const std::string& title,
const UploadNewFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info) {
DCHECK(thread_checker_.CalledOnValidThread());
UploadFileInfo* const info_ptr = upload_file_info.get();
if (info_ptr->content_length <= kMaxMultipartUploadSize) {
info_ptr->cancel_callback = drive_service_->MultipartUploadNewFile(
DriveServiceBatchOperationsInterface* service = drive_service_;
// If this is a batched request, calls the API on the request instead.
if (batch_request.get())
service = batch_request->configurator();
info_ptr->cancel_callback = service->MultipartUploadNewFile(
info_ptr->content_type, info_ptr->content_length, parent_resource_id,
title, info_ptr->file_path, options,
base::Bind(&DriveUploader::OnMultipartUploadComplete,
......@@ -267,12 +306,17 @@ void DriveUploader::CallUploadServiceAPINewFile(
void DriveUploader::CallUploadServiceAPIExistingFile(
const std::string& resource_id,
const UploadExistingFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info) {
DCHECK(thread_checker_.CalledOnValidThread());
UploadFileInfo* const info_ptr = upload_file_info.get();
if (info_ptr->content_length <= kMaxMultipartUploadSize) {
info_ptr->cancel_callback = drive_service_->MultipartUploadExistingFile(
DriveServiceBatchOperationsInterface* service = drive_service_;
// If this is a batched request, calls the API on the request instead.
if (batch_request.get())
service = batch_request->configurator();
info_ptr->cancel_callback = service->MultipartUploadExistingFile(
info_ptr->content_type, info_ptr->content_length, resource_id,
info_ptr->file_path, options,
base::Bind(&DriveUploader::OnMultipartUploadComplete,
......
......@@ -43,6 +43,16 @@ class DriveUploaderInterface {
public:
virtual ~DriveUploaderInterface() {}
// Starts batch processing for upload requests. All requests which upload
// small files (less than kMaxMultipartUploadSize) between
// |StartBatchProcessing| and |StopBatchProcessing| are sent as a single batch
// request.
virtual void StartBatchProcessing() = 0;
// Stops batch processing. Must be called after calling |StartBatchProcessing|
// to commit requests.
virtual void StopBatchProcessing() = 0;
// Uploads a new file to a directory specified by |upload_location|.
// Returns a callback for cancelling the uploading job.
//
......@@ -114,6 +124,8 @@ class DriveUploader : public DriveUploaderInterface {
~DriveUploader() override;
// DriveUploaderInterface overrides.
void StartBatchProcessing() override;
void StopBatchProcessing() override;
google_apis::CancelCallback UploadNewFile(
const std::string& parent_resource_id,
const base::FilePath& local_file_path,
......@@ -137,6 +149,7 @@ class DriveUploader : public DriveUploaderInterface {
const google_apis::ProgressCallback& progress_callback) override;
private:
class RefCountedBatchRequest;
struct UploadFileInfo;
typedef base::Callback<void(scoped_ptr<UploadFileInfo> upload_file_info)>
StartInitiateUploadCallback;
......@@ -153,18 +166,25 @@ class DriveUploader : public DriveUploaderInterface {
// Checks file size and call InitiateUploadNewFile or MultipartUploadNewFile
// API. Upon completion, OnUploadLocationReceived (for InitiateUploadNewFile)
// or OnMultipartUploadComplete (for MultipartUploadNewFile) should be called.
void CallUploadServiceAPINewFile(const std::string& parent_resource_id,
const std::string& title,
const UploadNewFileOptions& options,
scoped_ptr<UploadFileInfo> upload_file_info);
// If |batch_request| is non-null, it calls the API function on the batch
// request.
void CallUploadServiceAPINewFile(
const std::string& parent_resource_id,
const std::string& title,
const UploadNewFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info);
// Checks file size and call InitiateUploadExistingFile or
// MultipartUploadExistingFile API. Upon completion, OnUploadLocationReceived
// (for InitiateUploadExistingFile) or OnMultipartUploadComplete (for
// MultipartUploadExistingFile) should be called.
// If |batch_request| is non-null, it calls the API function on the batch
// request.
void CallUploadServiceAPIExistingFile(
const std::string& resource_id,
const UploadExistingFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info);
// DriveService callback for InitiateUpload.
......@@ -207,6 +227,7 @@ class DriveUploader : public DriveUploaderInterface {
DriveServiceInterface* drive_service_; // Not owned by this class.
scoped_refptr<base::TaskRunner> blocking_task_runner_;
scoped_refptr<RefCountedBatchRequest> current_batch_request_;
// Note: This should remain the last member so it'll be destroyed and
// invalidate its weak pointers before any other members are destroyed.
......
......@@ -28,6 +28,11 @@ DriveUploaderOnWorker::DriveUploaderOnWorker(
DriveUploaderOnWorker::~DriveUploaderOnWorker() {}
void DriveUploaderOnWorker::StartBatchProcessing() {
}
void DriveUploaderOnWorker::StopBatchProcessing() {
}
google_apis::CancelCallback DriveUploaderOnWorker::UploadNewFile(
const std::string& parent_resource_id,
const base::FilePath& local_file_path,
......
......@@ -5,6 +5,8 @@
#ifndef CHROME_BROWSER_SYNC_FILE_SYSTEM_DRIVE_BACKEND_DRIVE_UPLOADER_ON_WORKER_H_
#define CHROME_BROWSER_SYNC_FILE_SYSTEM_DRIVE_BACKEND_DRIVE_UPLOADER_ON_WORKER_H_
#include <string>
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/sequence_checker.h"
......@@ -32,6 +34,8 @@ class DriveUploaderOnWorker : public drive::DriveUploaderInterface {
base::SequencedTaskRunner* worker_task_runner);
~DriveUploaderOnWorker() override;
void StartBatchProcessing() override;
void StopBatchProcessing() override;
google_apis::CancelCallback UploadNewFile(
const std::string& parent_resource_id,
const base::FilePath& local_file_path,
......
......@@ -87,6 +87,12 @@ FakeDriveUploader::FakeDriveUploader(
FakeDriveUploader::~FakeDriveUploader() {}
void FakeDriveUploader::StartBatchProcessing() {
}
void FakeDriveUploader::StopBatchProcessing() {
}
CancelCallback FakeDriveUploader::UploadNewFile(
const std::string& parent_resource_id,
const base::FilePath& local_file_path,
......
......@@ -50,6 +50,8 @@ class FakeDriveUploader : public drive::DriveUploaderInterface {
~FakeDriveUploader() override;
// DriveUploaderInterface overrides.
void StartBatchProcessing() override;
void StopBatchProcessing() override;
google_apis::CancelCallback UploadNewFile(
const std::string& parent_resource_id,
const base::FilePath& local_file_path,
......
......@@ -1211,9 +1211,12 @@ void BatchUploadRequest::OnChildRequestPrepared(RequestID request_id,
void BatchUploadRequest::Commit() {
DCHECK(CalledOnValidThread());
DCHECK(!committed_);
CHECK(!child_requests_.empty());
committed_ = true;
MayCompletePrepare();
if (child_requests_.empty()) {
Cancel();
} else {
committed_ = true;
MayCompletePrepare();
}
}
void BatchUploadRequest::Prepare(const PrepareCallback& callback) {
......
......@@ -2066,9 +2066,12 @@ TEST_F(DriveApiRequestsTest, BatchUploadRequest) {
}
TEST_F(DriveApiRequestsTest, EmptyBatchUploadRequest) {
scoped_ptr<drive::BatchUploadRequest> request(new drive::BatchUploadRequest(
request_sender_.get(), *url_generator_));
EXPECT_DEATH(request->Commit(), "");
drive::BatchUploadRequest* const request =
new drive::BatchUploadRequest(request_sender_.get(), *url_generator_);
base::WeakPtr<drive::BatchUploadRequest> weak_ptr =
request->GetWeakPtrAsBatchUploadRequest();
request->Commit();
ASSERT_FALSE(weak_ptr.get());
}
TEST_F(DriveApiRequestsTest, BatchUploadRequestWithBodyIncludingZero) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment