Commit c51cb78e authored by maniscalco's avatar maniscalco Committed by Commit bot

Update AttachmentServiceImpl to retry attachment uploads.

Add TaskQueue, a class that provides retry and backoff semantics for
tasks.  Used by AttachmentServiceImpl.

AttachmentUploaderImpl and AttachmentDownloaderImpl now differentiate
between transient and non-transient errors.  Almost all errors are
assumed to be transient.  "403 Forbidden" is the exception and is
returned by the sync server if attachment are disabled for the user.

Transient errors encountered during attachment upload will be retried
with exponential backoff (using TaskQueue and BackoffEntry).  The idea
is to consolidate retry logic at the AttachmentServiceImpl level.

In a future CL, changes in network connectivity will affect retry and
backoff.

BUG=372622, 380437

Review URL: https://codereview.chromium.org/554743004

Cr-Commit-Position: refs/heads/master@{#294195}
parent 6b339cc6
......@@ -630,12 +630,20 @@ ProfileSyncComponentsFactoryImpl::CreateAttachmentService(
token_service_provider);
}
// It is important that the initial backoff delay is relatively large. For
// whatever reason, the server may fail all requests for a short period of
// time. When this happens we don't want to overwhelm the server with
// requests so we use a large initial backoff.
const base::TimeDelta initial_backoff_delay =
base::TimeDelta::FromMinutes(30);
const base::TimeDelta max_backoff_delay = base::TimeDelta::FromHours(4);
scoped_ptr<syncer::AttachmentService> attachment_service(
new syncer::AttachmentServiceImpl(attachment_store,
attachment_uploader.Pass(),
attachment_downloader.Pass(),
delegate));
delegate,
initial_backoff_delay,
max_backoff_delay));
return attachment_service.Pass();
}
......
......@@ -56,7 +56,9 @@ MockAttachmentService::MockAttachmentService(
new syncer::FakeAttachmentUploader),
scoped_ptr<syncer::AttachmentDownloader>(
new syncer::FakeAttachmentDownloader),
NULL) {
NULL,
base::TimeDelta(),
base::TimeDelta()) {
}
MockAttachmentService::~MockAttachmentService() {
......
......@@ -115,7 +115,7 @@ void AttachmentDownloaderImpl::OnGetTokenFailure(
DownloadState* download_state = *iter;
scoped_refptr<base::RefCountedString> null_attachment_data;
ReportResult(
*download_state, DOWNLOAD_UNSPECIFIED_ERROR, null_attachment_data);
*download_state, DOWNLOAD_TRANSIENT_ERROR, null_attachment_data);
DCHECK(state_map_.find(download_state->attachment_url) != state_map_.end());
state_map_.erase(download_state->attachment_url);
}
......@@ -133,22 +133,28 @@ void AttachmentDownloaderImpl::OnURLFetchComplete(
const DownloadState& download_state = *iter->second;
DCHECK(source == download_state.url_fetcher.get());
DownloadResult result = DOWNLOAD_UNSPECIFIED_ERROR;
DownloadResult result = DOWNLOAD_TRANSIENT_ERROR;
scoped_refptr<base::RefCountedString> attachment_data;
if (source->GetResponseCode() == net::HTTP_OK) {
const int response_code = source->GetResponseCode();
if (response_code == net::HTTP_OK) {
result = DOWNLOAD_SUCCESS;
std::string data_as_string;
source->GetResponseAsString(&data_as_string);
attachment_data = base::RefCountedString::TakeString(&data_as_string);
} else if (source->GetResponseCode() == net::HTTP_UNAUTHORIZED) {
} else if (response_code == net::HTTP_UNAUTHORIZED) {
// Server tells us we've got a bad token so invalidate it.
OAuth2TokenServiceRequest::InvalidateToken(token_service_provider_.get(),
account_id_,
oauth2_scopes_,
download_state.access_token);
// TODO(pavely): crbug/380437. This is transient error. Request new access
// token for this DownloadState. The only trick is to do it with exponential
// backoff.
// Fail the request, but indicate that it may be successful if retried.
result = DOWNLOAD_TRANSIENT_ERROR;
} else if (response_code == net::HTTP_FORBIDDEN) {
// User is not allowed to use attachments. Retrying won't help.
result = DOWNLOAD_UNSPECIFIED_ERROR;
} else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
result = DOWNLOAD_TRANSIENT_ERROR;
}
ReportResult(download_state, result, attachment_data);
state_map_.erase(iter);
......@@ -159,6 +165,7 @@ scoped_ptr<net::URLFetcher> AttachmentDownloaderImpl::CreateFetcher(
const std::string& access_token) {
scoped_ptr<net::URLFetcher> url_fetcher(
net::URLFetcher::Create(GURL(url), net::URLFetcher::GET, this));
url_fetcher->SetAutomaticallyRetryOn5xx(false);
const std::string auth_header("Authorization: Bearer " + access_token);
url_fetcher->AddExtraRequestHeader(auth_header);
url_fetcher->SetRequestContext(url_request_context_getter_.get());
......
......@@ -319,7 +319,7 @@ TEST_F(AttachmentDownloaderImplTest, RequestAccessTokenFails) {
GoogleServiceAuthError(GoogleServiceAuthError::INVALID_GAIA_CREDENTIALS));
RunMessageLoop();
// Only id2 should fail.
VerifyDownloadResult(id2, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR);
VerifyDownloadResult(id2, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR);
// Complete request for id1.
CompleteDownload(net::HTTP_OK);
VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_SUCCESS);
......@@ -337,7 +337,7 @@ TEST_F(AttachmentDownloaderImplTest, URLFetcher_BadToken) {
// invalidation.
CompleteDownload(net::HTTP_UNAUTHORIZED);
EXPECT_EQ(1, token_service()->num_invalidate_token());
VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR);
VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR);
}
TEST_F(AttachmentDownloaderImplTest, URLFetcher_ServiceUnavailable) {
......@@ -352,7 +352,7 @@ TEST_F(AttachmentDownloaderImplTest, URLFetcher_ServiceUnavailable) {
// shouldn't be invalidated.
CompleteDownload(net::HTTP_SERVICE_UNAVAILABLE);
EXPECT_EQ(0, token_service()->num_invalidate_token());
VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR);
VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR);
}
} // namespace syncer
......@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "base/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "sync/api/attachments/attachment.h"
#include "sync/api/attachments/fake_attachment_store.h"
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
......@@ -113,7 +114,9 @@ AttachmentServiceImpl::AttachmentServiceImpl(
scoped_refptr<AttachmentStore> attachment_store,
scoped_ptr<AttachmentUploader> attachment_uploader,
scoped_ptr<AttachmentDownloader> attachment_downloader,
Delegate* delegate)
Delegate* delegate,
const base::TimeDelta& initial_backoff_delay,
const base::TimeDelta& max_backoff_delay)
: attachment_store_(attachment_store),
attachment_uploader_(attachment_uploader.Pass()),
attachment_downloader_(attachment_downloader.Pass()),
......@@ -121,6 +124,16 @@ AttachmentServiceImpl::AttachmentServiceImpl(
weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread());
DCHECK(attachment_store_.get());
// TODO(maniscalco): Observe network connectivity change events. When the
// network becomes disconnected, consider suspending queue dispatch. When
// connectivity is restored, consider clearing any dispatch backoff (bug
// 411981).
upload_task_queue_.reset(new TaskQueue<AttachmentId>(
base::Bind(&AttachmentServiceImpl::BeginUpload,
weak_ptr_factory_.GetWeakPtr()),
initial_backoff_delay,
max_backoff_delay));
}
AttachmentServiceImpl::~AttachmentServiceImpl() {
......@@ -139,7 +152,9 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
new syncer::AttachmentServiceImpl(attachment_store,
attachment_uploader.Pass(),
attachment_downloader.Pass(),
NULL));
NULL,
base::TimeDelta(),
base::TimeDelta()));
return attachment_service.Pass();
}
......@@ -216,12 +231,22 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback,
void AttachmentServiceImpl::UploadDone(
const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id) {
ids_in_queue_.erase(attachment_id);
// TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
if (result != AttachmentUploader::UPLOAD_SUCCESS)
return;
if (delegate_) {
delegate_->OnAttachmentUploaded(attachment_id);
DCHECK(CalledOnValidThread());
switch (result) {
case AttachmentUploader::UPLOAD_SUCCESS:
upload_task_queue_->MarkAsSucceeded(attachment_id);
if (delegate_) {
delegate_->OnAttachmentUploaded(attachment_id);
}
break;
case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
upload_task_queue_->MarkAsFailed(attachment_id);
upload_task_queue_->AddToQueue(attachment_id);
break;
case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
// TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
upload_task_queue_->MarkAsFailed(attachment_id);
break;
}
}
......@@ -230,46 +255,36 @@ void AttachmentServiceImpl::DownloadDone(
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment) {
if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) {
state->AddAttachment(*attachment.get());
} else {
state->AddUnavailableAttachmentId(attachment_id);
switch (result) {
case AttachmentDownloader::DOWNLOAD_SUCCESS:
state->AddAttachment(*attachment.get());
break;
case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
state->AddUnavailableAttachmentId(attachment_id);
break;
}
}
void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
DCHECK(CalledOnValidThread());
AttachmentIdList attachment_ids;
attachment_ids.push_back(attachment_id);
attachment_store_->Read(attachment_ids,
base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
weak_ptr_factory_.GetWeakPtr()));
}
void AttachmentServiceImpl::UploadAttachments(
const AttachmentIdSet& attachment_ids) {
DCHECK(CalledOnValidThread());
if (!attachment_uploader_.get()) {
return;
}
// Enqueue the attachment ids that aren't already in the queue.
AttachmentIdSet::const_iterator iter = attachment_ids.begin();
AttachmentIdSet::const_iterator end = attachment_ids.end();
for (; iter != end; ++iter) {
if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) {
queue_.push_back(*iter);
ids_in_queue_.insert(*iter);
}
}
ProcessQueuedUploads();
}
void AttachmentServiceImpl::ProcessQueuedUploads() {
DCHECK(CalledOnValidThread());
// TODO(maniscalco): Don't dequeue them all. Instead, limit the number of
// concurrent uploads and apply backoff on failure.
while (!queue_.empty()) {
const AttachmentId id = queue_.front();
queue_.pop_front();
AttachmentIdList attachment_ids;
attachment_ids.push_back(id);
attachment_store_->Read(
attachment_ids,
base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
weak_ptr_factory_.GetWeakPtr()));
upload_task_queue_->AddToQueue(*iter);
}
}
......@@ -281,6 +296,11 @@ void AttachmentServiceImpl::ReadDoneNowUpload(
if (!unavailable_attachment_ids->empty()) {
// TODO(maniscalco): We failed to read some attachments. What should we do
// now?
AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
for (; iter != end; ++iter) {
upload_task_queue_->Cancel(*iter);
}
}
AttachmentMap::const_iterator iter = attachments->begin();
......
......@@ -191,7 +191,9 @@ class AttachmentServiceImplTest : public testing::Test,
new AttachmentServiceImpl(attachment_store,
uploader.PassAs<AttachmentUploader>(),
downloader.PassAs<AttachmentDownloader>(),
delegate));
delegate,
base::TimeDelta(),
base::TimeDelta()));
}
AttachmentService* attachment_service() { return attachment_service_.get(); }
......@@ -344,32 +346,31 @@ TEST_F(AttachmentServiceImplTest, GetOrDownload_NoDownloader) {
TEST_F(AttachmentServiceImplTest, UploadAttachments_Success) {
AttachmentIdSet attachment_ids;
const size_t num_attachments = 3;
const unsigned num_attachments = 3;
for (unsigned i = 0; i < num_attachments; ++i) {
attachment_ids.insert(AttachmentId::Create());
}
attachment_service()->UploadAttachments(attachment_ids);
RunLoop();
// See that the service has issued reads for the attachments, but not yet
// uploaded anything.
EXPECT_EQ(num_attachments, store()->read_ids.size());
EXPECT_EQ(0U, uploader()->upload_requests.size());
for (unsigned i = 0; i < num_attachments; ++i) {
RunLoop();
// See that the service has issued a read for at least one of the
// attachments.
ASSERT_GE(1U, store()->read_ids.size());
store()->RespondToRead(attachment_ids);
}
RunLoop();
EXPECT_EQ(0U, store()->read_ids.size());
EXPECT_EQ(num_attachments, uploader()->upload_requests.size());
AttachmentIdSet::const_iterator iter = attachment_ids.begin();
const AttachmentIdSet::const_iterator end = attachment_ids.end();
for (; iter != end; ++iter) {
uploader()->RespondToUpload(*iter, AttachmentUploader::UPLOAD_SUCCESS);
RunLoop();
ASSERT_GE(1U, uploader()->upload_requests.size());
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
AttachmentUploader::UPLOAD_SUCCESS);
}
RunLoop();
ASSERT_EQ(0U, store()->read_ids.size());
ASSERT_EQ(0U, uploader()->upload_requests.size());
// See that all the attachments were uploaded.
ASSERT_EQ(attachment_ids.size(), on_attachment_uploaded_list().size());
AttachmentIdSet::const_iterator iter = attachment_ids.begin();
const AttachmentIdSet::const_iterator end = attachment_ids.end();
for (iter = attachment_ids.begin(); iter != end; ++iter) {
EXPECT_THAT(on_attachment_uploaded_list(), testing::Contains(*iter));
}
......@@ -400,47 +401,45 @@ TEST_F(AttachmentServiceImplTest, UploadAttachments_SomeMissingFromStore) {
AttachmentIdSet attachment_ids;
attachment_ids.insert(AttachmentId::Create());
attachment_ids.insert(AttachmentId::Create());
attachment_service()->UploadAttachments(attachment_ids);
RunLoop();
EXPECT_EQ(2U, store()->read_ids.size());
EXPECT_EQ(0U, uploader()->upload_requests.size());
ASSERT_GE(1U, store()->read_ids.size());
ASSERT_EQ(0U, uploader()->upload_requests.size());
store()->RespondToRead(attachment_ids);
EXPECT_EQ(1U, store()->read_ids.size());
// Not found!
store()->RespondToRead(AttachmentIdSet());
EXPECT_EQ(0U, store()->read_ids.size());
RunLoop();
ASSERT_EQ(1U, uploader()->upload_requests.size());
// One attachment went missing so we should see only one upload request.
EXPECT_EQ(1U, uploader()->upload_requests.size());
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
AttachmentUploader::UPLOAD_SUCCESS);
RunLoop();
// See that the delegate was called for only one.
ASSERT_EQ(1U, on_attachment_uploaded_list().size());
ASSERT_GE(1U, store()->read_ids.size());
// Not found!
store()->RespondToRead(AttachmentIdSet());
RunLoop();
// No upload requests since the read failed.
ASSERT_EQ(0U, uploader()->upload_requests.size());
}
TEST_F(AttachmentServiceImplTest, UploadAttachments_AllMissingFromStore) {
AttachmentIdSet attachment_ids;
attachment_ids.insert(AttachmentId::Create());
attachment_ids.insert(AttachmentId::Create());
const unsigned num_attachments = 2;
for (unsigned i = 0; i < num_attachments; ++i) {
attachment_ids.insert(AttachmentId::Create());
}
attachment_service()->UploadAttachments(attachment_ids);
RunLoop();
EXPECT_EQ(2U, store()->read_ids.size());
EXPECT_EQ(0U, uploader()->upload_requests.size());
// None found!
store()->RespondToRead(AttachmentIdSet());
store()->RespondToRead(AttachmentIdSet());
EXPECT_EQ(0U, store()->read_ids.size());
for (unsigned i = 0; i < num_attachments; ++i) {
RunLoop();
ASSERT_GE(1U, store()->read_ids.size());
// None found!
store()->RespondToRead(AttachmentIdSet());
}
RunLoop();
// Nothing uploaded.
EXPECT_EQ(0U, uploader()->upload_requests.size());
RunLoop();
// See that the delegate was never called.
ASSERT_EQ(0U, on_attachment_uploaded_list().size());
}
......@@ -461,32 +460,31 @@ TEST_F(AttachmentServiceImplTest, UploadAttachments_NoUploader) {
// Upload three attachments. For one of them, server responds with error.
TEST_F(AttachmentServiceImplTest, UploadAttachments_OneUploadFails) {
AttachmentIdSet attachment_ids;
attachment_ids.insert(AttachmentId::Create());
attachment_ids.insert(AttachmentId::Create());
attachment_ids.insert(AttachmentId::Create());
const unsigned num_attachments = 3;
for (unsigned i = 0; i < num_attachments; ++i) {
attachment_ids.insert(AttachmentId::Create());
}
attachment_service()->UploadAttachments(attachment_ids);
RunLoop();
EXPECT_EQ(3U, store()->read_ids.size());
EXPECT_EQ(0U, uploader()->upload_requests.size());
// All attachments found.
store()->RespondToRead(attachment_ids);
store()->RespondToRead(attachment_ids);
store()->RespondToRead(attachment_ids);
RunLoop();
EXPECT_EQ(3U, uploader()->upload_requests.size());
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
AttachmentUploader::UPLOAD_SUCCESS);
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR);
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
AttachmentUploader::UPLOAD_SUCCESS);
EXPECT_EQ(0U, uploader()->upload_requests.size());
for (unsigned i = 0; i < num_attachments; ++i) {
RunLoop();
ASSERT_GE(1U, store()->read_ids.size());
store()->RespondToRead(attachment_ids);
RunLoop();
ASSERT_EQ(1U, uploader()->upload_requests.size());
AttachmentUploader::UploadResult result =
AttachmentUploader::UPLOAD_SUCCESS;
// Fail the 2nd one.
if (i == 2U) {
result = AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR;
} else {
result = AttachmentUploader::UPLOAD_SUCCESS;
}
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
result);
}
RunLoop();
EXPECT_EQ(2U, on_attachment_uploaded_list().size());
ASSERT_EQ(2U, on_attachment_uploaded_list().size());
}
} // namespace syncer
......@@ -133,20 +133,22 @@ const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() {
void AttachmentUploaderImpl::UploadState::OnURLFetchComplete(
const net::URLFetcher* source) {
DCHECK(CalledOnValidThread());
UploadResult result = UPLOAD_UNSPECIFIED_ERROR;
UploadResult result = UPLOAD_TRANSIENT_ERROR;
AttachmentId attachment_id = attachment_.GetId();
if (source->GetResponseCode() == net::HTTP_OK) {
const int response_code = source->GetResponseCode();
if (response_code == net::HTTP_OK) {
result = UPLOAD_SUCCESS;
} else if (source->GetResponseCode() == net::HTTP_UNAUTHORIZED) {
// TODO(maniscalco): One possibility is that we received a 401 because our
// access token has expired. We should probably fetch a new access token
// and retry this upload before giving up and reporting failure to our
// caller (bug 380437).
} else if (response_code == net::HTTP_UNAUTHORIZED) {
// Server tells us we've got a bad token so invalidate it.
OAuth2TokenServiceRequest::InvalidateToken(
token_service_provider_, account_id_, scopes_, access_token_);
} else {
// TODO(maniscalco): Once the protocol is better defined, deal with the
// various HTTP response codes we may encounter.
// Fail the request, but indicate that it may be successful if retried.
result = UPLOAD_TRANSIENT_ERROR;
} else if (response_code == net::HTTP_FORBIDDEN) {
// User is not allowed to use attachments. Retrying won't help.
result = UPLOAD_UNSPECIFIED_ERROR;
} else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
result = UPLOAD_TRANSIENT_ERROR;
}
ReportResult(result, attachment_id);
}
......@@ -160,6 +162,7 @@ void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess(
access_token_ = access_token;
fetcher_.reset(
net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this));
fetcher_->SetAutomaticallyRetryOn5xx(false);
fetcher_->SetRequestContext(url_request_context_getter_.get());
// TODO(maniscalco): Is there a better way? Copying the attachment data into
// a string feels wrong given how large attachments may be (several MBs). If
......@@ -184,7 +187,11 @@ void AttachmentUploaderImpl::UploadState::OnGetTokenFailure(
const GoogleServiceAuthError& error) {
DCHECK_EQ(access_token_request_.get(), request);
access_token_request_.reset();
ReportResult(UPLOAD_UNSPECIFIED_ERROR, attachment_.GetId());
// TODO(maniscalco): We treat this as a transient error, but it may in fact be
// a very long lived error and require user action. Consider differentiating
// between the causes of GetToken failure and act accordingly. Think about
// the causes of GetToken failure. Are there (bug 412802).
ReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId());
}
void AttachmentUploaderImpl::UploadState::GetToken() {
......
......@@ -509,7 +509,7 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_FailToGetToken) {
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]);
EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]);
ASSERT_EQ(1U, attachment_ids().size());
EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
......@@ -529,6 +529,43 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_ServiceUnavilable) {
RunAndWaitFor(1);
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]);
ASSERT_EQ(1U, attachment_ids().size());
EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
// See that the HTTP server received one request.
ASSERT_EQ(1U, http_requests_received().size());
const HttpRequest& http_request = http_requests_received().front();
EXPECT_EQ(net::test_server::METHOD_POST, http_request.method);
std::string expected_relative_url(kAttachments +
attachment.GetId().GetProto().unique_id());
EXPECT_EQ(expected_relative_url, http_request.relative_url);
EXPECT_TRUE(http_request.has_content);
EXPECT_EQ(kAttachmentData, http_request.content);
std::string expected_header(kAuthorization);
const std::string header_name(kAuthorization);
const std::string header_value(std::string("Bearer ") + kAccessToken);
EXPECT_THAT(http_request.headers,
testing::Contains(testing::Pair(header_name, header_value)));
// See that we did not invalidate the token.
ASSERT_EQ(0, token_service().num_invalidate_token());
}
// Verify that we "403 Forbidden" as a non-transient error.
TEST_F(AttachmentUploaderImplTest, UploadAttachment_Forbidden) {
token_service().AddAccount(kAccountId);
request_handler().SetStatusCode(net::HTTP_FORBIDDEN);
scoped_refptr<base::RefCountedString> some_data(new base::RefCountedString);
some_data->data() = kAttachmentData;
Attachment attachment = Attachment::Create(some_data);
uploader()->UploadAttachment(attachment, upload_callback());
RunAndWaitFor(1);
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]);
......@@ -569,7 +606,7 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_BadToken) {
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]);
EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]);
ASSERT_EQ(1U, attachment_ids().size());
EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
......
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "sync/internal_api/public/attachments/task_queue.h"
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "sync/internal_api/public/attachments/task_queue.h"
#include <vector>
#include "base/bind.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/timer/mock_timer.h"
#include "testing/gtest/include/gtest/gtest.h"
using base::TimeDelta;
namespace syncer {
namespace {
const TimeDelta kZero;
} // namespace
class TaskQueueTest : public testing::Test {
protected:
TaskQueueTest() : weak_ptr_factory_(this) {
queue_.reset(new TaskQueue<int>(
base::Bind(&TaskQueueTest::Process, weak_ptr_factory_.GetWeakPtr()),
TimeDelta::FromMinutes(1),
TimeDelta::FromMinutes(8)));
}
void RunLoop() {
base::RunLoop run_loop;
run_loop.RunUntilIdle();
}
void Process(const int& task) { dispatched_.push_back(task); }
base::MessageLoop message_loop_;
scoped_ptr<TaskQueue<int> > queue_;
std::vector<int> dispatched_;
base::WeakPtrFactory<TaskQueueTest> weak_ptr_factory_;
};
// See that at most one task is dispatched at a time.
TEST_F(TaskQueueTest, AddToQueue_NoConcurrentTasks) {
queue_->AddToQueue(1);
queue_->AddToQueue(2);
RunLoop();
// Only one has been dispatched.
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
RunLoop();
// Still only one.
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
dispatched_.clear();
queue_->MarkAsSucceeded(1);
RunLoop();
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(2, dispatched_.front());
dispatched_.clear();
queue_->MarkAsSucceeded(2);
RunLoop();
ASSERT_TRUE(dispatched_.empty());
}
// See that that the queue ignores duplicate adds.
TEST_F(TaskQueueTest, AddToQueue_NoDuplicates) {
queue_->AddToQueue(1);
queue_->AddToQueue(1);
queue_->AddToQueue(2);
queue_->AddToQueue(1);
ASSERT_TRUE(dispatched_.empty());
RunLoop();
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
dispatched_.clear();
queue_->MarkAsSucceeded(1);
RunLoop();
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(2, dispatched_.front());
dispatched_.clear();
queue_->MarkAsSucceeded(2);
RunLoop();
ASSERT_TRUE(dispatched_.empty());
}
// See that Retry works as expected.
TEST_F(TaskQueueTest, Retry) {
scoped_ptr<base::MockTimer> timer_to_pass(new base::MockTimer(false, false));
base::MockTimer* mock_timer = timer_to_pass.get();
queue_->SetTimerForTest(timer_to_pass.PassAs<base::Timer>());
// 1st attempt.
queue_->AddToQueue(1);
ASSERT_TRUE(mock_timer->IsRunning());
ASSERT_EQ(TimeDelta(), mock_timer->GetCurrentDelay());
TimeDelta last_delay = mock_timer->GetCurrentDelay();
mock_timer->Fire();
RunLoop();
// 2nd attempt.
ASSERT_FALSE(mock_timer->IsRunning());
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
dispatched_.clear();
queue_->MarkAsFailed(1);
queue_->AddToQueue(1);
ASSERT_TRUE(mock_timer->IsRunning());
EXPECT_GT(mock_timer->GetCurrentDelay(), last_delay);
EXPECT_LE(mock_timer->GetCurrentDelay(), TimeDelta::FromMinutes(1));
last_delay = mock_timer->GetCurrentDelay();
mock_timer->Fire();
RunLoop();
// 3rd attempt.
ASSERT_FALSE(mock_timer->IsRunning());
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
dispatched_.clear();
queue_->MarkAsFailed(1);
queue_->AddToQueue(1);
ASSERT_TRUE(mock_timer->IsRunning());
EXPECT_GT(mock_timer->GetCurrentDelay(), last_delay);
last_delay = mock_timer->GetCurrentDelay();
mock_timer->Fire();
RunLoop();
// Give up.
ASSERT_FALSE(mock_timer->IsRunning());
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
dispatched_.clear();
queue_->Cancel(1);
ASSERT_FALSE(mock_timer->IsRunning());
// Try a different task. See the timer remains unchanged because the previous
// task was cancelled.
ASSERT_TRUE(dispatched_.empty());
queue_->AddToQueue(2);
ASSERT_TRUE(mock_timer->IsRunning());
EXPECT_GE(last_delay, mock_timer->GetCurrentDelay());
last_delay = mock_timer->GetCurrentDelay();
mock_timer->Fire();
RunLoop();
// Mark this one as succeeding, which will clear the backoff delay.
ASSERT_FALSE(mock_timer->IsRunning());
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(2, dispatched_.front());
dispatched_.clear();
queue_->MarkAsSucceeded(2);
ASSERT_FALSE(mock_timer->IsRunning());
// Add one last task and see that it's dispatched without delay because the
// previous one succeeded.
ASSERT_TRUE(dispatched_.empty());
queue_->AddToQueue(3);
ASSERT_TRUE(mock_timer->IsRunning());
EXPECT_LT(mock_timer->GetCurrentDelay(), last_delay);
last_delay = mock_timer->GetCurrentDelay();
mock_timer->Fire();
RunLoop();
// Clean up.
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(3, dispatched_.front());
dispatched_.clear();
queue_->MarkAsSucceeded(3);
ASSERT_FALSE(mock_timer->IsRunning());
}
TEST_F(TaskQueueTest, Cancel) {
queue_->AddToQueue(1);
RunLoop();
ASSERT_EQ(1U, dispatched_.size());
EXPECT_EQ(1, dispatched_.front());
dispatched_.clear();
queue_->Cancel(1);
RunLoop();
ASSERT_TRUE(dispatched_.empty());
}
} // namespace syncer
......@@ -24,6 +24,7 @@ class SYNC_EXPORT AttachmentDownloader {
enum DownloadResult {
DOWNLOAD_SUCCESS, // No error, attachment was downloaded
// successfully.
DOWNLOAD_TRANSIENT_ERROR, // A transient error occurred, try again later.
DOWNLOAD_UNSPECIFIED_ERROR, // An unspecified error occurred.
};
......
......@@ -15,6 +15,7 @@
#include "sync/internal_api/public/attachments/attachment_service.h"
#include "sync/internal_api/public/attachments/attachment_service_proxy.h"
#include "sync/internal_api/public/attachments/attachment_uploader.h"
#include "sync/internal_api/public/attachments/task_queue.h"
namespace syncer {
......@@ -22,11 +23,6 @@ namespace syncer {
class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
public base::NonThreadSafe {
public:
// |delegate| is optional delegate for AttachmentService to notify about
// asynchronous events (AttachmentUploaded). Pass NULL if delegate is not
// provided. AttachmentService doesn't take ownership of delegate, the pointer
// must be valid throughout AttachmentService lifetime.
//
// |attachment_uploader| is optional. If null, attachments will never be
// uploaded to the sync server and |delegate|'s OnAttachmentUploaded will
// never be invoked.
......@@ -34,11 +30,26 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// |attachment_downloader| is optional. If null, attachments will never be
// downloaded. Only attachments in |attachment_store| will be returned from
// GetOrDownloadAttachments.
//
// |delegate| is optional delegate for AttachmentService to notify about
// asynchronous events (AttachmentUploaded). Pass NULL if delegate is not
// provided. AttachmentService doesn't take ownership of delegate, the pointer
// must be valid throughout AttachmentService lifetime.
//
// |initial_backoff_delay| the initial delay between upload attempts. This
// class automatically retries failed uploads. After the first failure, it
// will wait this amount of time until it tries again. After each failure,
// the delay is doubled until the |max_backoff_delay| is reached. A
// successful upload clears the delay.
//
// |max_backoff_delay| the maxmium delay between upload attempts when backed
// off.
AttachmentServiceImpl(scoped_refptr<AttachmentStore> attachment_store,
scoped_ptr<AttachmentUploader> attachment_uploader,
scoped_ptr<AttachmentDownloader> attachment_downloader,
Delegate* delegate);
Delegate* delegate,
const base::TimeDelta& initial_backoff_delay,
const base::TimeDelta& max_backoff_delay);
virtual ~AttachmentServiceImpl();
// Create an AttachmentServiceImpl suitable for use in tests.
......@@ -69,7 +80,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment);
void ProcessQueuedUploads();
void BeginUpload(const AttachmentId& attachment_id);
void ReadDoneNowUpload(
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
......@@ -86,12 +97,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// May be null.
Delegate* delegate_;
// Queue of attachment ids to be uploaded. Every entry in this queue should
// also exist in ids_in_queue_.
std::deque<AttachmentId> queue_;
// Ids of attachments currently being uploaded or queued for upload.
AttachmentIdSet ids_in_queue_;
scoped_ptr<TaskQueue<AttachmentId> > upload_task_queue_;
// Must be last data member.
base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_;
......
......@@ -20,6 +20,7 @@ class SYNC_EXPORT AttachmentUploader {
enum UploadResult {
UPLOAD_SUCCESS, // No error, attachment was uploaded
// successfully.
UPLOAD_TRANSIENT_ERROR, // A transient error occurred, try again later.
UPLOAD_UNSPECIFIED_ERROR, // An unspecified error occurred.
};
......
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
#include <deque>
#include <set>
#include "base/bind.h"
#include "base/callback.h"
#include "base/gtest_prod_util.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/threading/non_thread_safe.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "net/base/backoff_entry.h"
namespace syncer {
// A queue that dispatches tasks, ignores duplicates, and provides backoff
// semantics.
//
// |T| is the task type.
//
// For each task added to the queue, the HandleTaskCallback will eventually be
// invoked. For each invocation, the user of TaskQueue must call exactly one of
// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
//
// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
//
// Example usage:
//
// void Handle(const Foo& foo);
// ...
// TaskQueue<Foo> queue(base::Bind(&Handle),
// base::TimeDelta::FromSeconds(1),
// base::TimeDelta::FromMinutes(1));
// ...
// {
// Foo foo;
// // Add foo to the queue. At some point, Handle will be invoked in this
// // message loop.
// queue.AddToQueue(foo);
// }
// ...
// void Handle(const Foo& foo) {
// DoSomethingWith(foo);
// // We must call one of the three methods to tell the queue how we're
// // dealing with foo. Of course, we are free to call in the the context of
// // this HandleTaskCallback or outside the context if we so choose.
// if (SuccessfullyHandled(foo)) {
// queue.MarkAsSucceeded(foo);
// } else if (Failed(foo)) {
// queue.MarkAsFailed(foo);
// if (ShouldRetry(foo)) {
// queue.AddToQueue(foo);
// }
// } else {
// Cancel(foo);
// }
// }
//
template <typename T>
class TaskQueue : base::NonThreadSafe {
public:
// A callback provided by users of the TaskQueue to handle tasks.
//
// This callback is invoked by the queue with a task to be handled. The
// callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
// or |Cancel| to signify completion of the task.
typedef base::Callback<void(const T&)> HandleTaskCallback;
// Construct a TaskQueue.
//
// |callback| the callback to be invoked for handling tasks.
//
// |initial_backoff_delay| the initial amount of time the queue will wait
// before dispatching tasks after a failed task (see |MarkAsFailed|). May be
// zero. Subsequent failures will increase the delay up to
// |max_backoff_delay|.
//
// |max_backoff_delay| the maximum amount of time the queue will wait before
// dispatching tasks. May be zero. Must be greater than or equal to
// |initial_backoff_delay|.
TaskQueue(const HandleTaskCallback& callback,
const base::TimeDelta& initial_backoff_delay,
const base::TimeDelta& max_backoff_delay);
// Add |task| to the end of the queue.
//
// If |task| is already present (as determined by operator==) it is not added.
void AddToQueue(const T& task);
// Mark |task| as completing successfully.
//
// Marking a task as completing successfully will reduce or eliminate any
// backoff delay in effect.
//
// May only be called after the HandleTaskCallback has been invoked with
// |task|.
void MarkAsSucceeded(const T& task);
// Mark |task| as failed.
//
// Marking a task as failed will cause a backoff, i.e. a delay in dispatching
// of subsequent tasks. Repeated failures will increase the delay.
//
// May only be called after the HandleTaskCallback has been invoked with
// |task|.
void MarkAsFailed(const T& task);
// Cancel |task|.
//
// |task| is removed from the queue and will not be retried. Does not affect
// the backoff delay.
//
// May only be called after the HandleTaskCallback has been invoked with
// |task|.
void Cancel(const T& task);
private:
FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry);
// Use |timer| for scheduled events.
//
// Used in tests. See also MockTimer.
void SetTimerForTest(scoped_ptr<base::Timer> timer);
void FinishTask(const T& task);
void ScheduleDispatch();
void Dispatch();
// Return true if we should dispatch tasks.
bool ShouldDispatch();
const HandleTaskCallback process_callback_;
net::BackoffEntry::Policy backoff_policy_;
scoped_ptr<net::BackoffEntry> backoff_entry_;
// The number of tasks currently being handled.
int num_in_progress_;
std::deque<T> queue_;
// The set of tasks in queue_ or currently being handled.
std::set<T> tasks_;
base::Closure dispatch_closure_;
scoped_ptr<base::Timer> backoff_timer_;
base::TimeDelta delay_;
// Must be last data member.
base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};
// The maximum number of tasks that may be concurrently executed. Think
// carefully before changing this value. The desired behavior of backoff may
// not be obvious when there is more than one concurrent task
const int kMaxConcurrentTasks = 1;
template <typename T>
TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
const base::TimeDelta& initial_backoff_delay,
const base::TimeDelta& max_backoff_delay)
: process_callback_(callback),
backoff_policy_({}),
num_in_progress_(0),
weak_ptr_factory_(this) {
DCHECK_LE(initial_backoff_delay.InMicroseconds(),
max_backoff_delay.InMicroseconds());
backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
backoff_policy_.multiply_factor = 2.0;
backoff_policy_.jitter_factor = 0.1;
backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
backoff_policy_.entry_lifetime_ms = -1;
backoff_policy_.always_use_initial_delay = false;
backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
dispatch_closure_ =
base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
backoff_timer_.reset(new base::Timer(false, false));
}
template <typename T>
void TaskQueue<T>::AddToQueue(const T& task) {
DCHECK(CalledOnValidThread());
// Ignore duplicates.
if (tasks_.find(task) == tasks_.end()) {
queue_.push_back(task);
tasks_.insert(task);
}
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::MarkAsSucceeded(const T& task) {
DCHECK(CalledOnValidThread());
FinishTask(task);
// The task succeeded. Stop any pending timer, reset (clear) the backoff, and
// reschedule a dispatch.
backoff_timer_->Stop();
backoff_entry_->Reset();
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::MarkAsFailed(const T& task) {
DCHECK(CalledOnValidThread());
FinishTask(task);
backoff_entry_->InformOfRequest(false);
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::Cancel(const T& task) {
DCHECK(CalledOnValidThread());
FinishTask(task);
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
DCHECK(CalledOnValidThread());
DCHECK(timer.get());
backoff_timer_ = timer.Pass();
}
template <typename T>
void TaskQueue<T>::FinishTask(const T& task) {
DCHECK(CalledOnValidThread());
DCHECK_GE(num_in_progress_, 1);
--num_in_progress_;
const size_t num_erased = tasks_.erase(task);
DCHECK_EQ(1U, num_erased);
}
template <typename T>
void TaskQueue<T>::ScheduleDispatch() {
DCHECK(CalledOnValidThread());
if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
return;
}
backoff_timer_->Start(
FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
}
template <typename T>
void TaskQueue<T>::Dispatch() {
DCHECK(CalledOnValidThread());
if (!ShouldDispatch()) {
return;
}
DCHECK(!queue_.empty());
const T& task = queue_.front();
++num_in_progress_;
DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
base::MessageLoop::current()->PostTask(FROM_HERE,
base::Bind(process_callback_, task));
queue_.pop_front();
}
template <typename T>
bool TaskQueue<T>::ShouldDispatch() {
return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
}
} // namespace syncer
#endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
......@@ -169,6 +169,7 @@
'internal_api/attachments/attachment_uploader_impl.cc',
'internal_api/attachments/fake_attachment_downloader.cc',
'internal_api/attachments/fake_attachment_uploader.cc',
'internal_api/attachments/task_queue.cc',
'internal_api/base_node.cc',
'internal_api/base_transaction.cc',
'internal_api/change_record.cc',
......@@ -205,6 +206,7 @@
'internal_api/public/attachments/attachment_uploader_impl.h',
'internal_api/public/attachments/fake_attachment_downloader.h',
'internal_api/public/attachments/fake_attachment_uploader.h',
'internal_api/public/attachments/task_queue.h',
'internal_api/public/base/attachment_id_proto.cc',
'internal_api/public/base/attachment_id_proto.h',
'internal_api/public/base/cancelation_observer.cc',
......
......@@ -284,6 +284,7 @@
'internal_api/attachments/attachment_uploader_impl_unittest.cc',
'internal_api/attachments/fake_attachment_downloader_unittest.cc',
'internal_api/attachments/fake_attachment_uploader_unittest.cc',
'internal_api/attachments/task_queue_unittest.cc',
'internal_api/debug_info_event_listener_unittest.cc',
'internal_api/http_bridge_unittest.cc',
'internal_api/js_mutation_event_observer_unittest.cc',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment