Commit 5dde0685 authored by maniscalco's avatar maniscalco Committed by Commit bot

Replace AttachmentStore's StoreAttachments with UploadAttachments.

This change is lays groundwork for making attachment upload operations
persistent.

Make GenericChangeProcessor responsible for writing attachments to the
AttachmentStore and calling UploadAttachments.  In a future CL, datatype
code will be responsible for writing attachments to the store.

Queue up attachments for upload inside of AttachmentService.  In a
future CL we'll add rate limiting, retry, and back-off logic.

Expose AttachmentService's AttachmentStore via GetStore method.

BUG=

Review URL: https://codereview.chromium.org/512413003

Cr-Commit-Position: refs/heads/master@{#293536}
parent 17267f48
......@@ -85,6 +85,11 @@ syncer::SyncData BuildRemoteSyncData(
attachment_service_proxy);
}
const syncer::AttachmentId& AttachmentToAttachmentId(
const syncer::Attachment& attachment) {
return attachment.GetId();
}
} // namespace
GenericChangeProcessor::GenericChangeProcessor(
......@@ -102,7 +107,8 @@ GenericChangeProcessor::GenericChangeProcessor(
attachment_service_weak_ptr_factory_(attachment_service_.get()),
attachment_service_proxy_(
base::MessageLoopProxy::current(),
attachment_service_weak_ptr_factory_.GetWeakPtr()) {
attachment_service_weak_ptr_factory_.GetWeakPtr()),
weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread());
DCHECK(attachment_service_);
}
......@@ -387,25 +393,6 @@ syncer::SyncError AttemptDelete(const syncer::SyncChange& change,
return syncer::SyncError();
}
// A callback invoked on completion of AttachmentService::StoreAttachment.
void IgnoreStoreResult(const syncer::AttachmentService::StoreResult&) {
// TODO(maniscalco): Here is where we're going to update the in-directory
// entry to indicate that the attachments have been successfully stored on
// disk. Why do we care? Because we might crash after persisting the
// directory to disk, but before we have persisted its attachments, leaving us
// with danging attachment ids. Having a flag that indicates we've stored the
// entry will allow us to detect and filter entries with dangling attachment
// ids (bug 368353).
}
void StoreAttachments(syncer::AttachmentService* attachment_service,
const syncer::AttachmentList& attachments) {
DCHECK(attachment_service);
syncer::AttachmentService::StoreCallback ignore_store_result =
base::Bind(&IgnoreStoreResult);
attachment_service->StoreAttachments(attachments, ignore_store_result);
}
} // namespace
syncer::SyncError GenericChangeProcessor::ProcessSyncChanges(
......@@ -465,7 +452,7 @@ syncer::SyncError GenericChangeProcessor::ProcessSyncChanges(
}
if (!new_attachments.empty()) {
StoreAttachments(attachment_service_.get(), new_attachments);
StoreAndUploadAttachments(new_attachments);
}
return syncer::SyncError();
......@@ -711,4 +698,40 @@ syncer::UserShare* GenericChangeProcessor::share_handle() const {
return share_handle_;
}
void GenericChangeProcessor::StoreAndUploadAttachments(
const syncer::AttachmentList& attachments) {
DCHECK(CalledOnValidThread());
attachment_service_->GetStore()->Write(
attachments,
base::Bind(&GenericChangeProcessor::WriteAttachmentsDone,
weak_ptr_factory_.GetWeakPtr(),
attachments));
}
void GenericChangeProcessor::WriteAttachmentsDone(
const syncer::AttachmentList& attachments,
const syncer::AttachmentStore::Result& result) {
DCHECK(CalledOnValidThread());
if (result != syncer::AttachmentStore::SUCCESS) {
// TODO(maniscalco): Deal with case where an error occurred (bug 361251).
return;
}
// TODO(maniscalco): Here is where we're going to update the in-directory
// entry to indicate that the attachments have been successfully stored on
// disk. Why do we care? Because we might crash after persisting the
// directory to disk, but before we have persisted its attachments, leaving us
// with danging attachment ids. Having a flag that indicates we've stored the
// entry will allow us to detect and filter entries with dangling attachment
// ids (bug 368353).
// Begin uploading the attachments now that they are safe on disk.
syncer::AttachmentIdSet attachment_ids;
std::transform(attachments.begin(),
attachments.end(),
std::inserter(attachment_ids, attachment_ids.end()),
AttachmentToAttachmentId);
attachment_service_->UploadAttachments(attachment_ids);
}
} // namespace sync_driver
......@@ -13,6 +13,7 @@
#include "components/sync_driver/change_processor.h"
#include "components/sync_driver/data_type_controller.h"
#include "components/sync_driver/data_type_error_handler.h"
#include "sync/api/attachments/attachment_store.h"
#include "sync/api/sync_change_processor.h"
#include "sync/api/sync_merge_result.h"
#include "sync/internal_api/public/attachments/attachment_service.h"
......@@ -126,6 +127,18 @@ class GenericChangeProcessor : public ChangeProcessor,
syncer::WriteNode* sync_node,
syncer::AttachmentList* new_attachments);
// Store |attachments| locally then upload them to the sync server.
//
// Store and uploading are asynchronous operations. |WriteAttachmentsDone|
// will be invoked once the attachments have been stored on the local device.
void StoreAndUploadAttachments(const syncer::AttachmentList& attachments);
// Invoked once attachments have been stored locally.
//
// See also AttachmentStore::WriteCallback.
void WriteAttachmentsDone(const syncer::AttachmentList& attachments,
const syncer::AttachmentStore::Result& result);
// The SyncableService this change processor will forward changes on to.
const base::WeakPtr<syncer::SyncableService> local_service_;
......@@ -155,6 +168,8 @@ class GenericChangeProcessor : public ChangeProcessor,
attachment_service_weak_ptr_factory_;
syncer::AttachmentServiceProxy attachment_service_proxy_;
base::WeakPtrFactory<GenericChangeProcessor> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(GenericChangeProcessor);
};
......
......@@ -7,6 +7,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "components/sync_driver/data_type_error_handler_mock.h"
#include "components/sync_driver/sync_api_component_factory.h"
......@@ -25,6 +26,7 @@
#include "sync/internal_api/public/user_share.h"
#include "sync/internal_api/public/write_node.h"
#include "sync/internal_api/public/write_transaction.h"
#include "testing/gmock/include/gmock/gmock-matchers.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace sync_driver {
......@@ -33,17 +35,17 @@ namespace {
const char kTestData[] = "some data";
// A mock that keeps track of attachments passed to StoreAttachments.
// A mock that keeps track of attachments passed to UploadAttachments.
class MockAttachmentService : public syncer::AttachmentServiceImpl {
public:
MockAttachmentService();
virtual ~MockAttachmentService();
virtual void StoreAttachments(const syncer::AttachmentList& attachments,
const StoreCallback& callback) OVERRIDE;
std::vector<syncer::AttachmentList>* attachment_lists();
virtual void UploadAttachments(
const syncer::AttachmentIdSet& attachment_ids) OVERRIDE;
std::vector<syncer::AttachmentIdSet>* attachment_id_sets();
private:
std::vector<syncer::AttachmentList> attachment_lists_;
std::vector<syncer::AttachmentIdSet> attachment_id_sets_;
};
MockAttachmentService::MockAttachmentService()
......@@ -60,15 +62,15 @@ MockAttachmentService::MockAttachmentService()
MockAttachmentService::~MockAttachmentService() {
}
void MockAttachmentService::StoreAttachments(
const syncer::AttachmentList& attachments,
const StoreCallback& callback) {
attachment_lists_.push_back(attachments);
AttachmentServiceImpl::StoreAttachments(attachments, callback);
void MockAttachmentService::UploadAttachments(
const syncer::AttachmentIdSet& attachment_ids) {
attachment_id_sets_.push_back(attachment_ids);
AttachmentServiceImpl::UploadAttachments(attachment_ids);
}
std::vector<syncer::AttachmentList>* MockAttachmentService::attachment_lists() {
return &attachment_lists_;
std::vector<syncer::AttachmentIdSet>*
MockAttachmentService::attachment_id_sets() {
return &attachment_id_sets_;
}
// MockSyncApiComponentFactory needed to initialize GenericChangeProcessor and
......@@ -161,6 +163,11 @@ class SyncGenericChangeProcessorTest : public testing::Test {
return mock_attachment_service_;
}
void RunLoop() {
base::RunLoop run_loop;
run_loop.RunUntilIdle();
}
private:
base::MessageLoopForUI loop_;
......@@ -349,19 +356,20 @@ TEST_F(SyncGenericChangeProcessorTest,
tag, title, specifics, attachments)));
ASSERT_FALSE(
change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet());
RunLoop();
// Check that the AttachmentService received the new attachments.
ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U);
const syncer::AttachmentList& attachments_added =
mock_attachment_service()->attachment_lists()->front();
ASSERT_EQ(attachments_added.size(), 2U);
ASSERT_EQ(attachments_added[0].GetId(), attachments[0].GetId());
ASSERT_EQ(attachments_added[1].GetId(), attachments[1].GetId());
ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U);
const syncer::AttachmentIdSet& attachments_added =
mock_attachment_service()->attachment_id_sets()->front();
ASSERT_THAT(attachments_added,
testing::UnorderedElementsAre(attachments[0].GetId(),
attachments[1].GetId()));
// Update the SyncData, replacing its two attachments with one new attachment.
syncer::AttachmentList new_attachments;
new_attachments.push_back(syncer::Attachment::Create(attachment_data));
mock_attachment_service()->attachment_lists()->clear();
mock_attachment_service()->attachment_id_sets()->clear();
change_list.clear();
change_list.push_back(
syncer::SyncChange(FROM_HERE,
......@@ -370,13 +378,14 @@ TEST_F(SyncGenericChangeProcessorTest,
tag, title, specifics, new_attachments)));
ASSERT_FALSE(
change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet());
RunLoop();
// Check that the AttachmentService received it.
ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U);
const syncer::AttachmentList& new_attachments_added =
mock_attachment_service()->attachment_lists()->front();
ASSERT_EQ(new_attachments_added.size(), 1U);
ASSERT_EQ(new_attachments_added[0].GetId(), new_attachments[0].GetId());
ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U);
const syncer::AttachmentIdSet& new_attachments_added =
mock_attachment_service()->attachment_id_sets()->front();
ASSERT_THAT(new_attachments_added,
testing::UnorderedElementsAre(new_attachments[0].GetId()));
}
// Verify that after attachment is uploaded GenericChangeProcessor updates
......
......@@ -4,6 +4,8 @@
#include "sync/internal_api/public/attachments/attachment_service_impl.h"
#include <iterator>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "base/thread_task_runner_handle.h"
......@@ -141,6 +143,10 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
return attachment_service.Pass();
}
AttachmentStore* AttachmentServiceImpl::GetStore() {
return attachment_store_.get();
}
void AttachmentServiceImpl::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
......@@ -163,25 +169,6 @@ void AttachmentServiceImpl::DropAttachments(
callback));
}
void AttachmentServiceImpl::StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) {
DCHECK(CalledOnValidThread());
attachment_store_->Write(attachments,
base::Bind(&AttachmentServiceImpl::WriteDone,
weak_ptr_factory_.GetWeakPtr(),
callback));
if (attachment_uploader_.get()) {
for (AttachmentList::const_iterator iter = attachments.begin();
iter != attachments.end();
++iter) {
attachment_uploader_->UploadAttachment(
*iter,
base::Bind(&AttachmentServiceImpl::UploadDone,
weak_ptr_factory_.GetWeakPtr()));
}
}
}
void AttachmentServiceImpl::ReadDone(
const scoped_refptr<GetOrDownloadState>& state,
const AttachmentStore::Result& result,
......@@ -226,21 +213,10 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback,
base::Bind(callback, drop_result));
}
void AttachmentServiceImpl::WriteDone(const StoreCallback& callback,
const AttachmentStore::Result& result) {
AttachmentService::StoreResult store_result =
AttachmentService::STORE_UNSPECIFIED_ERROR;
if (result == AttachmentStore::SUCCESS) {
store_result = AttachmentService::STORE_SUCCESS;
}
// TODO(maniscalco): Deal with case where an error occurred (bug 361251).
base::MessageLoop::current()->PostTask(FROM_HERE,
base::Bind(callback, store_result));
}
void AttachmentServiceImpl::UploadDone(
const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id) {
ids_in_queue_.erase(attachment_id);
// TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
if (result != AttachmentUploader::UPLOAD_SUCCESS)
return;
......@@ -261,4 +237,60 @@ void AttachmentServiceImpl::DownloadDone(
}
}
void AttachmentServiceImpl::UploadAttachments(
const AttachmentIdSet& attachment_ids) {
DCHECK(CalledOnValidThread());
if (!attachment_uploader_.get()) {
return;
}
// Enqueue the attachment ids that aren't already in the queue.
AttachmentIdSet::const_iterator iter = attachment_ids.begin();
AttachmentIdSet::const_iterator end = attachment_ids.end();
for (; iter != end; ++iter) {
if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) {
queue_.push_back(*iter);
ids_in_queue_.insert(*iter);
}
}
ProcessQueuedUploads();
}
void AttachmentServiceImpl::ProcessQueuedUploads() {
DCHECK(CalledOnValidThread());
// TODO(maniscalco): Don't dequeue them all. Instead, limit the number of
// concurrent uploads and apply backoff on failure.
while (!queue_.empty()) {
const AttachmentId id = queue_.front();
queue_.pop_front();
AttachmentIdList attachment_ids;
attachment_ids.push_back(id);
attachment_store_->Read(
attachment_ids,
base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
weak_ptr_factory_.GetWeakPtr()));
}
}
void AttachmentServiceImpl::ReadDoneNowUpload(
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
DCHECK(CalledOnValidThread());
if (!unavailable_attachment_ids->empty()) {
// TODO(maniscalco): We failed to read some attachments. What should we do
// now?
}
AttachmentMap::const_iterator iter = attachments->begin();
AttachmentMap::const_iterator end = attachments->end();
for (; iter != end; ++iter) {
attachment_uploader_->UploadAttachment(
iter->second,
base::Bind(&AttachmentServiceImpl::UploadDone,
weak_ptr_factory_.GetWeakPtr()));
}
}
} // namespace syncer
......@@ -35,15 +35,6 @@ void ProxyDropCallback(
task_runner->PostTask(FROM_HERE, base::Bind(callback, result));
}
// Invokes |callback| with |result| and |attachments| in the |task_runner|
// thread.
void ProxyStoreCallback(
const scoped_refptr<base::SequencedTaskRunner>& task_runner,
const AttachmentService::StoreCallback& callback,
const AttachmentService::StoreResult& result) {
task_runner->PostTask(FROM_HERE, base::Bind(callback, result));
}
} // namespace
AttachmentServiceProxy::AttachmentServiceProxy() {
......@@ -67,6 +58,10 @@ AttachmentServiceProxy::AttachmentServiceProxy(
AttachmentServiceProxy::~AttachmentServiceProxy() {
}
AttachmentStore* AttachmentServiceProxy::GetStore() {
return NULL;
}
void AttachmentServiceProxy::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
......@@ -96,17 +91,12 @@ void AttachmentServiceProxy::DropAttachments(
proxy_callback));
}
void AttachmentServiceProxy::StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) {
DCHECK(wrapped_task_runner_.get());
StoreCallback proxy_callback = base::Bind(
&ProxyStoreCallback, base::ThreadTaskRunnerHandle::Get(), callback);
void AttachmentServiceProxy::UploadAttachments(
const AttachmentIdSet& attachment_ids) {
DCHECK(wrapped_task_runner_);
wrapped_task_runner_->PostTask(
FROM_HERE,
base::Bind(&AttachmentService::StoreAttachments,
core_,
attachments,
proxy_callback));
base::Bind(&AttachmentService::UploadAttachments, core_, attachment_ids));
}
AttachmentServiceProxy::Core::Core(
......@@ -117,6 +107,10 @@ AttachmentServiceProxy::Core::Core(
AttachmentServiceProxy::Core::~Core() {
}
AttachmentStore* AttachmentServiceProxy::Core::GetStore() {
return NULL;
}
void AttachmentServiceProxy::Core::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
......@@ -135,13 +129,12 @@ void AttachmentServiceProxy::Core::DropAttachments(
wrapped_->DropAttachments(attachment_ids, callback);
}
void AttachmentServiceProxy::Core::StoreAttachments(
const AttachmentList& attachments,
const StoreCallback& callback) {
void AttachmentServiceProxy::Core::UploadAttachments(
const AttachmentIdSet& attachment_ids) {
if (!wrapped_) {
return;
}
wrapped_->StoreAttachments(attachments, callback);
wrapped_->UploadAttachments(attachment_ids);
}
} // namespace syncer
......@@ -34,6 +34,8 @@ class StubAttachmentService : public AttachmentService,
virtual ~StubAttachmentService() {}
virtual AttachmentStore* GetStore() OVERRIDE { return NULL; }
virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback)
OVERRIDE {
......@@ -55,12 +57,10 @@ class StubAttachmentService : public AttachmentService,
FROM_HERE, base::Bind(callback, AttachmentService::DROP_SUCCESS));
}
virtual void StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) OVERRIDE {
virtual void UploadAttachments(
const AttachmentIdSet& attachments_ids) OVERRIDE {
CalledOnValidThread();
Increment();
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(callback, AttachmentService::STORE_SUCCESS));
}
virtual base::WeakPtr<AttachmentService> AsWeakPtr() {
......@@ -105,11 +105,8 @@ class AttachmentServiceProxyTest : public testing::Test,
base::Unretained(this));
callback_drop = base::Bind(&AttachmentServiceProxyTest::IncrementDrop,
base::Unretained(this));
callback_store = base::Bind(&AttachmentServiceProxyTest::IncrementStore,
base::Unretained(this));
count_callback_get_or_download = 0;
count_callback_drop = 0;
count_callback_store = 0;
}
virtual void TearDown()
......@@ -136,12 +133,6 @@ class AttachmentServiceProxyTest : public testing::Test,
++count_callback_drop;
}
// a StoreCallback
void IncrementStore(const AttachmentService::StoreResult&) {
CalledOnValidThread();
++count_callback_store;
}
void WaitForStubThread() {
base::WaitableEvent done(false, false);
stub_thread->message_loop()->PostTask(
......@@ -157,23 +148,24 @@ class AttachmentServiceProxyTest : public testing::Test,
AttachmentService::GetOrDownloadCallback callback_get_or_download;
AttachmentService::DropCallback callback_drop;
AttachmentService::StoreCallback callback_store;
// number of times callback_get_or_download was invoked
int count_callback_get_or_download;
// number of times callback_drop was invoked
int count_callback_drop;
// number of times callback_store was invoked
int count_callback_store;
};
// Verify that each of AttachmentServiceProxy's callback methods (those that
// take callbacks) are invoked on the stub and that the passed callbacks are
// invoked in this thread.
TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) {
TEST_F(AttachmentServiceProxyTest, GetStore) {
EXPECT_EQ(NULL, proxy->GetStore());
}
// Verify that each of AttachmentServiceProxy's methods are invoked on the stub.
// Verify that the methods that take callbacks invoke passed callbacks on this
// thread.
TEST_F(AttachmentServiceProxyTest, MethodsAreProxied) {
proxy->GetOrDownloadAttachments(AttachmentIdList(), callback_get_or_download);
proxy->DropAttachments(AttachmentIdList(), callback_drop);
proxy->StoreAttachments(AttachmentList(), callback_store);
proxy->UploadAttachments(AttachmentIdSet());
// Wait for the posted calls to execute in the stub thread.
WaitForStubThread();
EXPECT_EQ(3, stub->GetCallCount());
......@@ -185,7 +177,6 @@ TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) {
loop.RunUntilIdle();
EXPECT_EQ(1, count_callback_get_or_download);
EXPECT_EQ(1, count_callback_drop);
EXPECT_EQ(1, count_callback_store);
}
// Verify that it's safe to use an AttachmentServiceProxy even after its wrapped
......
......@@ -14,6 +14,7 @@
namespace syncer {
class AttachmentStore;
class SyncData;
// AttachmentService is responsible for managing a model type's attachments.
......@@ -44,16 +45,6 @@ class SYNC_EXPORT AttachmentService {
typedef base::Callback<void(const DropResult&)> DropCallback;
// The result of a StoreAttachments operation.
enum StoreResult {
STORE_SUCCESS, // No error, all attachments stored (at least
// locally).
STORE_UNSPECIFIED_ERROR, // An unspecified error occurred. Some or all
// attachments may not have been stored.
};
typedef base::Callback<void(const StoreResult&)> StoreCallback;
// An interface that embedder code implements to be notified about different
// events that originate from AttachmentService.
// This interface will be called from the same thread AttachmentService was
......@@ -70,6 +61,11 @@ class SYNC_EXPORT AttachmentService {
AttachmentService();
virtual ~AttachmentService();
// Return a pointer to the AttachmentStore owned by this object.
//
// May return NULL.
virtual AttachmentStore* GetStore() = 0;
// See SyncData::GetOrDownloadAttachments.
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
......@@ -79,12 +75,18 @@ class SYNC_EXPORT AttachmentService {
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) = 0;
// Store |attachments| on device and (eventually) upload them to the server.
// Schedules the attachments identified by |attachment_ids| to be uploaded to
// the server.
//
// Assumes the attachments are already in the attachment store.
//
// A request to upload attachments is persistent in that uploads will be
// automatically retried if transient errors occur.
//
// A request to upload attachments does not persist across restarts of Chrome.
//
// Invokes |callback| once the attachments have been written to device
// storage.
virtual void StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) = 0;
// Invokes OnAttachmentUploaded on the Delegate (if provided).
virtual void UploadAttachments(const AttachmentIdSet& attachment_ids) = 0;
};
} // namespace syncer
......
......@@ -5,6 +5,8 @@
#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
#include <deque>
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h"
......@@ -43,13 +45,14 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
static scoped_ptr<syncer::AttachmentService> CreateForTest();
// AttachmentService implementation.
virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback)
OVERRIDE;
virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE;
virtual void StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) OVERRIDE;
virtual void UploadAttachments(
const AttachmentIdSet& attachment_ids) OVERRIDE;
private:
class GetOrDownloadState;
......@@ -60,14 +63,17 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
void DropDone(const DropCallback& callback,
const AttachmentStore::Result& result);
void WriteDone(const StoreCallback& callback,
const AttachmentStore::Result& result);
void UploadDone(const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id);
void DownloadDone(const scoped_refptr<GetOrDownloadState>& state,
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment);
void ProcessQueuedUploads();
void ReadDoneNowUpload(
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
const scoped_ptr<AttachmentStore> attachment_store_;
......@@ -80,6 +86,13 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// May be null.
Delegate* delegate_;
// Queue of attachment ids to be uploaded. Every entry in this queue should
// also exist in ids_in_queue_.
std::deque<AttachmentId> queue_;
// Ids of attachments currently being uploaded or queued for upload.
AttachmentIdSet ids_in_queue_;
// Must be last data member.
base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_;
......
......@@ -51,13 +51,16 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService {
virtual ~AttachmentServiceProxy();
// AttachmentService implementation.
//
// GetStore always returns NULL.
virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE;
virtual void StoreAttachments(const AttachmentList& attachment,
const StoreCallback& callback) OVERRIDE;
virtual void UploadAttachments(
const AttachmentIdSet& attachment_ids) OVERRIDE;
protected:
// Core does the work of proxying calls to AttachmentService methods from one
......@@ -80,13 +83,14 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService {
Core(const base::WeakPtr<syncer::AttachmentService>& wrapped);
// AttachmentService implementation.
virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE;
virtual void StoreAttachments(const AttachmentList& attachment,
const StoreCallback& callback) OVERRIDE;
virtual void UploadAttachments(
const AttachmentIdSet& attachment_ids) OVERRIDE;
protected:
friend class base::RefCountedThreadSafe<Core>;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment