Commit e4c008c9 authored by Erik Arvidsson's avatar Erik Arvidsson

Revert "Replace AttachmentStore's StoreAttachments with UploadAttachments."

This reverts commit 5dde0685.

Broke a bunch of builds:

FAILED: /mnt/data/b/build/goma/gomacc ../../third_party/llvm-build/Release+Asserts/bin/clang++ -MMD -MF obj/sync/internal_api/attachments/sync_core.attachment_service_proxy.o.d -DSYNC_IMPLEMENTATION -DCHROMIUM_BUILD -DENABLE_NOTIFICATIONS -DENABLE_EGLIMAGE=1 -DENABLE_BACKGROUND=1 -DV8_DEPRECATION_WARNINGS -DCLD_VERSION=2 -DENABLE_MDNS=1 -DENABLE_PEPPER_CDMS -DENABLE_PLUGINS=1 -DENABLE_PRINTING=1 -DENABLE_FULL_PRINTING=1 -DENABLE_SPELLCHECK=1 -DUSE_UDEV -DTOOLKIT_VIEWS=1 -DUI_COMPOSITOR_IMAGE_TRANSPORT -DUSE_ASH=1 -DUSE_AURA=1 -DUSE_CAIRO=1 -DUSE_CLIPBOARD_AURAX11=1 -DUSE_DEFAULT_RENDER_THEME=1 -DUSE_GLIB=1 -DUSE_NSS=1 -DUSE_X11=1 -DUSE_XI2_MT=2 -DDISABLE_NACL -DENABLE_EXTENSIONS=1 -DENABLE_CONFIGURATION_POLICY -DENABLE_TASK_MANAGER=1 -DENABLE_THEMES=1 -DENABLE_CAPTIVE_PORTAL_DETECTION=1 -DENABLE_SESSION_SERVICE=1 -DENABLE_APP_LIST=1 -DENABLE_SETTINGS_APP=1 -DENABLE_MANAGED_USERS=1 -DENABLE_SERVICE_DISCOVERY=1 -DENABLE_AUTOFILL_DIALOG=1 -DENABLE_REMOTING=1 -DENABLE_GOOGLE_NOW=1 -DENABLE_ONE_CLICK_SIGNIN -D_FILE_OFFSET_BITS=64 -D__STDC_CONSTANT_MACROS -D__STDC_FORMAT_MACROS -DPROTOBUF_USE_DLLS -DGOOGLE_PROTOBUF_NO_RTTI -DGOOGLE_PROTOBUF_NO_STATIC_INITIALIZER -I../.. -Igen -I/usr/include/glib-2.0 -I/usr/lib/x86_64-linux-gnu/glib-2.0/include -I../../net/third_party/nss/ssl -I/usr/include/nss -I/usr/include/nspr -I../../third_party/zlib -Igen/protoc_out -I../../third_party/protobuf/src -I../../third_party/protobuf -fno-strict-aliasing -fstack-protector --param=ssp-buffer-size=4 -m64 -funwind-tables -fPIC -pipe -pthread -fcolor-diagnostics -Wall -Wsign-compare -Wendif-labels -Wno-missing-field-initializers -Wno-unused-parameter -Wno-c++11-narrowing -Wno-char-subscripts -Wno-covered-switch-default -Wno-deprecated-register -Wno-unused-function -fvisibility=hidden -Xclang -load -Xclang ../../third_party/llvm-build/Release+Asserts/lib/libFindBadConstructs.so -Xclang -add-plugin -Xclang find-bad-constructs -Wheader-hygiene -Wstring-conversion -fno-ident -fdata-sections -ffunction-sections -O2 -g2 -Wexit-time-destructors -fno-threadsafe-statics -fvisibility-inlines-hidden -std=gnu++11 -Wno-reserved-user-defined-literal -fno-rtti -fno-exceptions -c ../../sync/internal_api/attachments/attachment_service_proxy.cc -o obj/sync/internal_api/attachments/sync_core.attachment_service_proxy.o
../../sync/internal_api/attachments/attachment_service_proxy.cc:96:3: error: invalid argument type 'scoped_refptr<base::SequencedTaskRunner>' to unary expression
  DCHECK(wrapped_task_runner_);
  ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
../../base/logging.h:622:51: note: expanded from macro 'DCHECK'
  LAZY_STREAM(LOG_STREAM(DCHECK), DCHECK_IS_ON && !(condition))   \
  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~
../../base/logging.h:355:5: note: expanded from macro 'LAZY_STREAM'
  !(condition) ? (void) 0 : ::logging::LogMessageVoidify() & (stream)
    ^
1 error generated.
ninja: build stopped: subcommand failed.

BUG=none
TBR=maniscalco

Review URL: https://codereview.chromium.org/546873003

Cr-Commit-Position: refs/heads/master@{#293542}
parent 8816ae54
...@@ -85,11 +85,6 @@ syncer::SyncData BuildRemoteSyncData( ...@@ -85,11 +85,6 @@ syncer::SyncData BuildRemoteSyncData(
attachment_service_proxy); attachment_service_proxy);
} }
const syncer::AttachmentId& AttachmentToAttachmentId(
const syncer::Attachment& attachment) {
return attachment.GetId();
}
} // namespace } // namespace
GenericChangeProcessor::GenericChangeProcessor( GenericChangeProcessor::GenericChangeProcessor(
...@@ -107,8 +102,7 @@ GenericChangeProcessor::GenericChangeProcessor( ...@@ -107,8 +102,7 @@ GenericChangeProcessor::GenericChangeProcessor(
attachment_service_weak_ptr_factory_(attachment_service_.get()), attachment_service_weak_ptr_factory_(attachment_service_.get()),
attachment_service_proxy_( attachment_service_proxy_(
base::MessageLoopProxy::current(), base::MessageLoopProxy::current(),
attachment_service_weak_ptr_factory_.GetWeakPtr()), attachment_service_weak_ptr_factory_.GetWeakPtr()) {
weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread()); DCHECK(CalledOnValidThread());
DCHECK(attachment_service_); DCHECK(attachment_service_);
} }
...@@ -393,6 +387,25 @@ syncer::SyncError AttemptDelete(const syncer::SyncChange& change, ...@@ -393,6 +387,25 @@ syncer::SyncError AttemptDelete(const syncer::SyncChange& change,
return syncer::SyncError(); return syncer::SyncError();
} }
// A callback invoked on completion of AttachmentService::StoreAttachment.
void IgnoreStoreResult(const syncer::AttachmentService::StoreResult&) {
// TODO(maniscalco): Here is where we're going to update the in-directory
// entry to indicate that the attachments have been successfully stored on
// disk. Why do we care? Because we might crash after persisting the
// directory to disk, but before we have persisted its attachments, leaving us
// with danging attachment ids. Having a flag that indicates we've stored the
// entry will allow us to detect and filter entries with dangling attachment
// ids (bug 368353).
}
void StoreAttachments(syncer::AttachmentService* attachment_service,
const syncer::AttachmentList& attachments) {
DCHECK(attachment_service);
syncer::AttachmentService::StoreCallback ignore_store_result =
base::Bind(&IgnoreStoreResult);
attachment_service->StoreAttachments(attachments, ignore_store_result);
}
} // namespace } // namespace
syncer::SyncError GenericChangeProcessor::ProcessSyncChanges( syncer::SyncError GenericChangeProcessor::ProcessSyncChanges(
...@@ -452,7 +465,7 @@ syncer::SyncError GenericChangeProcessor::ProcessSyncChanges( ...@@ -452,7 +465,7 @@ syncer::SyncError GenericChangeProcessor::ProcessSyncChanges(
} }
if (!new_attachments.empty()) { if (!new_attachments.empty()) {
StoreAndUploadAttachments(new_attachments); StoreAttachments(attachment_service_.get(), new_attachments);
} }
return syncer::SyncError(); return syncer::SyncError();
...@@ -698,40 +711,4 @@ syncer::UserShare* GenericChangeProcessor::share_handle() const { ...@@ -698,40 +711,4 @@ syncer::UserShare* GenericChangeProcessor::share_handle() const {
return share_handle_; return share_handle_;
} }
void GenericChangeProcessor::StoreAndUploadAttachments(
const syncer::AttachmentList& attachments) {
DCHECK(CalledOnValidThread());
attachment_service_->GetStore()->Write(
attachments,
base::Bind(&GenericChangeProcessor::WriteAttachmentsDone,
weak_ptr_factory_.GetWeakPtr(),
attachments));
}
void GenericChangeProcessor::WriteAttachmentsDone(
const syncer::AttachmentList& attachments,
const syncer::AttachmentStore::Result& result) {
DCHECK(CalledOnValidThread());
if (result != syncer::AttachmentStore::SUCCESS) {
// TODO(maniscalco): Deal with case where an error occurred (bug 361251).
return;
}
// TODO(maniscalco): Here is where we're going to update the in-directory
// entry to indicate that the attachments have been successfully stored on
// disk. Why do we care? Because we might crash after persisting the
// directory to disk, but before we have persisted its attachments, leaving us
// with danging attachment ids. Having a flag that indicates we've stored the
// entry will allow us to detect and filter entries with dangling attachment
// ids (bug 368353).
// Begin uploading the attachments now that they are safe on disk.
syncer::AttachmentIdSet attachment_ids;
std::transform(attachments.begin(),
attachments.end(),
std::inserter(attachment_ids, attachment_ids.end()),
AttachmentToAttachmentId);
attachment_service_->UploadAttachments(attachment_ids);
}
} // namespace sync_driver } // namespace sync_driver
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
#include "components/sync_driver/change_processor.h" #include "components/sync_driver/change_processor.h"
#include "components/sync_driver/data_type_controller.h" #include "components/sync_driver/data_type_controller.h"
#include "components/sync_driver/data_type_error_handler.h" #include "components/sync_driver/data_type_error_handler.h"
#include "sync/api/attachments/attachment_store.h"
#include "sync/api/sync_change_processor.h" #include "sync/api/sync_change_processor.h"
#include "sync/api/sync_merge_result.h" #include "sync/api/sync_merge_result.h"
#include "sync/internal_api/public/attachments/attachment_service.h" #include "sync/internal_api/public/attachments/attachment_service.h"
...@@ -127,18 +126,6 @@ class GenericChangeProcessor : public ChangeProcessor, ...@@ -127,18 +126,6 @@ class GenericChangeProcessor : public ChangeProcessor,
syncer::WriteNode* sync_node, syncer::WriteNode* sync_node,
syncer::AttachmentList* new_attachments); syncer::AttachmentList* new_attachments);
// Store |attachments| locally then upload them to the sync server.
//
// Store and uploading are asynchronous operations. |WriteAttachmentsDone|
// will be invoked once the attachments have been stored on the local device.
void StoreAndUploadAttachments(const syncer::AttachmentList& attachments);
// Invoked once attachments have been stored locally.
//
// See also AttachmentStore::WriteCallback.
void WriteAttachmentsDone(const syncer::AttachmentList& attachments,
const syncer::AttachmentStore::Result& result);
// The SyncableService this change processor will forward changes on to. // The SyncableService this change processor will forward changes on to.
const base::WeakPtr<syncer::SyncableService> local_service_; const base::WeakPtr<syncer::SyncableService> local_service_;
...@@ -168,8 +155,6 @@ class GenericChangeProcessor : public ChangeProcessor, ...@@ -168,8 +155,6 @@ class GenericChangeProcessor : public ChangeProcessor,
attachment_service_weak_ptr_factory_; attachment_service_weak_ptr_factory_;
syncer::AttachmentServiceProxy attachment_service_proxy_; syncer::AttachmentServiceProxy attachment_service_proxy_;
base::WeakPtrFactory<GenericChangeProcessor> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(GenericChangeProcessor); DISALLOW_COPY_AND_ASSIGN(GenericChangeProcessor);
}; };
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "components/sync_driver/data_type_error_handler_mock.h" #include "components/sync_driver/data_type_error_handler_mock.h"
#include "components/sync_driver/sync_api_component_factory.h" #include "components/sync_driver/sync_api_component_factory.h"
...@@ -26,7 +25,6 @@ ...@@ -26,7 +25,6 @@
#include "sync/internal_api/public/user_share.h" #include "sync/internal_api/public/user_share.h"
#include "sync/internal_api/public/write_node.h" #include "sync/internal_api/public/write_node.h"
#include "sync/internal_api/public/write_transaction.h" #include "sync/internal_api/public/write_transaction.h"
#include "testing/gmock/include/gmock/gmock-matchers.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
namespace sync_driver { namespace sync_driver {
...@@ -35,17 +33,17 @@ namespace { ...@@ -35,17 +33,17 @@ namespace {
const char kTestData[] = "some data"; const char kTestData[] = "some data";
// A mock that keeps track of attachments passed to UploadAttachments. // A mock that keeps track of attachments passed to StoreAttachments.
class MockAttachmentService : public syncer::AttachmentServiceImpl { class MockAttachmentService : public syncer::AttachmentServiceImpl {
public: public:
MockAttachmentService(); MockAttachmentService();
virtual ~MockAttachmentService(); virtual ~MockAttachmentService();
virtual void UploadAttachments( virtual void StoreAttachments(const syncer::AttachmentList& attachments,
const syncer::AttachmentIdSet& attachment_ids) OVERRIDE; const StoreCallback& callback) OVERRIDE;
std::vector<syncer::AttachmentIdSet>* attachment_id_sets(); std::vector<syncer::AttachmentList>* attachment_lists();
private: private:
std::vector<syncer::AttachmentIdSet> attachment_id_sets_; std::vector<syncer::AttachmentList> attachment_lists_;
}; };
MockAttachmentService::MockAttachmentService() MockAttachmentService::MockAttachmentService()
...@@ -62,15 +60,15 @@ MockAttachmentService::MockAttachmentService() ...@@ -62,15 +60,15 @@ MockAttachmentService::MockAttachmentService()
MockAttachmentService::~MockAttachmentService() { MockAttachmentService::~MockAttachmentService() {
} }
void MockAttachmentService::UploadAttachments( void MockAttachmentService::StoreAttachments(
const syncer::AttachmentIdSet& attachment_ids) { const syncer::AttachmentList& attachments,
attachment_id_sets_.push_back(attachment_ids); const StoreCallback& callback) {
AttachmentServiceImpl::UploadAttachments(attachment_ids); attachment_lists_.push_back(attachments);
AttachmentServiceImpl::StoreAttachments(attachments, callback);
} }
std::vector<syncer::AttachmentIdSet>* std::vector<syncer::AttachmentList>* MockAttachmentService::attachment_lists() {
MockAttachmentService::attachment_id_sets() { return &attachment_lists_;
return &attachment_id_sets_;
} }
// MockSyncApiComponentFactory needed to initialize GenericChangeProcessor and // MockSyncApiComponentFactory needed to initialize GenericChangeProcessor and
...@@ -163,11 +161,6 @@ class SyncGenericChangeProcessorTest : public testing::Test { ...@@ -163,11 +161,6 @@ class SyncGenericChangeProcessorTest : public testing::Test {
return mock_attachment_service_; return mock_attachment_service_;
} }
void RunLoop() {
base::RunLoop run_loop;
run_loop.RunUntilIdle();
}
private: private:
base::MessageLoopForUI loop_; base::MessageLoopForUI loop_;
...@@ -356,20 +349,19 @@ TEST_F(SyncGenericChangeProcessorTest, ...@@ -356,20 +349,19 @@ TEST_F(SyncGenericChangeProcessorTest,
tag, title, specifics, attachments))); tag, title, specifics, attachments)));
ASSERT_FALSE( ASSERT_FALSE(
change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet()); change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet());
RunLoop();
// Check that the AttachmentService received the new attachments. // Check that the AttachmentService received the new attachments.
ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U); ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U);
const syncer::AttachmentIdSet& attachments_added = const syncer::AttachmentList& attachments_added =
mock_attachment_service()->attachment_id_sets()->front(); mock_attachment_service()->attachment_lists()->front();
ASSERT_THAT(attachments_added, ASSERT_EQ(attachments_added.size(), 2U);
testing::UnorderedElementsAre(attachments[0].GetId(), ASSERT_EQ(attachments_added[0].GetId(), attachments[0].GetId());
attachments[1].GetId())); ASSERT_EQ(attachments_added[1].GetId(), attachments[1].GetId());
// Update the SyncData, replacing its two attachments with one new attachment. // Update the SyncData, replacing its two attachments with one new attachment.
syncer::AttachmentList new_attachments; syncer::AttachmentList new_attachments;
new_attachments.push_back(syncer::Attachment::Create(attachment_data)); new_attachments.push_back(syncer::Attachment::Create(attachment_data));
mock_attachment_service()->attachment_id_sets()->clear(); mock_attachment_service()->attachment_lists()->clear();
change_list.clear(); change_list.clear();
change_list.push_back( change_list.push_back(
syncer::SyncChange(FROM_HERE, syncer::SyncChange(FROM_HERE,
...@@ -378,14 +370,13 @@ TEST_F(SyncGenericChangeProcessorTest, ...@@ -378,14 +370,13 @@ TEST_F(SyncGenericChangeProcessorTest,
tag, title, specifics, new_attachments))); tag, title, specifics, new_attachments)));
ASSERT_FALSE( ASSERT_FALSE(
change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet()); change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet());
RunLoop();
// Check that the AttachmentService received it. // Check that the AttachmentService received it.
ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U); ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U);
const syncer::AttachmentIdSet& new_attachments_added = const syncer::AttachmentList& new_attachments_added =
mock_attachment_service()->attachment_id_sets()->front(); mock_attachment_service()->attachment_lists()->front();
ASSERT_THAT(new_attachments_added, ASSERT_EQ(new_attachments_added.size(), 1U);
testing::UnorderedElementsAre(new_attachments[0].GetId())); ASSERT_EQ(new_attachments_added[0].GetId(), new_attachments[0].GetId());
} }
// Verify that after attachment is uploaded GenericChangeProcessor updates // Verify that after attachment is uploaded GenericChangeProcessor updates
......
...@@ -4,8 +4,6 @@ ...@@ -4,8 +4,6 @@
#include "sync/internal_api/public/attachments/attachment_service_impl.h" #include "sync/internal_api/public/attachments/attachment_service_impl.h"
#include <iterator>
#include "base/bind.h" #include "base/bind.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/thread_task_runner_handle.h" #include "base/thread_task_runner_handle.h"
...@@ -143,10 +141,6 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { ...@@ -143,10 +141,6 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
return attachment_service.Pass(); return attachment_service.Pass();
} }
AttachmentStore* AttachmentServiceImpl::GetStore() {
return attachment_store_.get();
}
void AttachmentServiceImpl::GetOrDownloadAttachments( void AttachmentServiceImpl::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids, const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) { const GetOrDownloadCallback& callback) {
...@@ -169,6 +163,25 @@ void AttachmentServiceImpl::DropAttachments( ...@@ -169,6 +163,25 @@ void AttachmentServiceImpl::DropAttachments(
callback)); callback));
} }
void AttachmentServiceImpl::StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) {
DCHECK(CalledOnValidThread());
attachment_store_->Write(attachments,
base::Bind(&AttachmentServiceImpl::WriteDone,
weak_ptr_factory_.GetWeakPtr(),
callback));
if (attachment_uploader_.get()) {
for (AttachmentList::const_iterator iter = attachments.begin();
iter != attachments.end();
++iter) {
attachment_uploader_->UploadAttachment(
*iter,
base::Bind(&AttachmentServiceImpl::UploadDone,
weak_ptr_factory_.GetWeakPtr()));
}
}
}
void AttachmentServiceImpl::ReadDone( void AttachmentServiceImpl::ReadDone(
const scoped_refptr<GetOrDownloadState>& state, const scoped_refptr<GetOrDownloadState>& state,
const AttachmentStore::Result& result, const AttachmentStore::Result& result,
...@@ -213,10 +226,21 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback, ...@@ -213,10 +226,21 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback,
base::Bind(callback, drop_result)); base::Bind(callback, drop_result));
} }
void AttachmentServiceImpl::WriteDone(const StoreCallback& callback,
const AttachmentStore::Result& result) {
AttachmentService::StoreResult store_result =
AttachmentService::STORE_UNSPECIFIED_ERROR;
if (result == AttachmentStore::SUCCESS) {
store_result = AttachmentService::STORE_SUCCESS;
}
// TODO(maniscalco): Deal with case where an error occurred (bug 361251).
base::MessageLoop::current()->PostTask(FROM_HERE,
base::Bind(callback, store_result));
}
void AttachmentServiceImpl::UploadDone( void AttachmentServiceImpl::UploadDone(
const AttachmentUploader::UploadResult& result, const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id) { const AttachmentId& attachment_id) {
ids_in_queue_.erase(attachment_id);
// TODO(pavely): crbug/372622: Deal with UploadAttachment failures. // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
if (result != AttachmentUploader::UPLOAD_SUCCESS) if (result != AttachmentUploader::UPLOAD_SUCCESS)
return; return;
...@@ -237,60 +261,4 @@ void AttachmentServiceImpl::DownloadDone( ...@@ -237,60 +261,4 @@ void AttachmentServiceImpl::DownloadDone(
} }
} }
void AttachmentServiceImpl::UploadAttachments(
const AttachmentIdSet& attachment_ids) {
DCHECK(CalledOnValidThread());
if (!attachment_uploader_.get()) {
return;
}
// Enqueue the attachment ids that aren't already in the queue.
AttachmentIdSet::const_iterator iter = attachment_ids.begin();
AttachmentIdSet::const_iterator end = attachment_ids.end();
for (; iter != end; ++iter) {
if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) {
queue_.push_back(*iter);
ids_in_queue_.insert(*iter);
}
}
ProcessQueuedUploads();
}
void AttachmentServiceImpl::ProcessQueuedUploads() {
DCHECK(CalledOnValidThread());
// TODO(maniscalco): Don't dequeue them all. Instead, limit the number of
// concurrent uploads and apply backoff on failure.
while (!queue_.empty()) {
const AttachmentId id = queue_.front();
queue_.pop_front();
AttachmentIdList attachment_ids;
attachment_ids.push_back(id);
attachment_store_->Read(
attachment_ids,
base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
weak_ptr_factory_.GetWeakPtr()));
}
}
void AttachmentServiceImpl::ReadDoneNowUpload(
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
DCHECK(CalledOnValidThread());
if (!unavailable_attachment_ids->empty()) {
// TODO(maniscalco): We failed to read some attachments. What should we do
// now?
}
AttachmentMap::const_iterator iter = attachments->begin();
AttachmentMap::const_iterator end = attachments->end();
for (; iter != end; ++iter) {
attachment_uploader_->UploadAttachment(
iter->second,
base::Bind(&AttachmentServiceImpl::UploadDone,
weak_ptr_factory_.GetWeakPtr()));
}
}
} // namespace syncer } // namespace syncer
...@@ -35,6 +35,15 @@ void ProxyDropCallback( ...@@ -35,6 +35,15 @@ void ProxyDropCallback(
task_runner->PostTask(FROM_HERE, base::Bind(callback, result)); task_runner->PostTask(FROM_HERE, base::Bind(callback, result));
} }
// Invokes |callback| with |result| and |attachments| in the |task_runner|
// thread.
void ProxyStoreCallback(
const scoped_refptr<base::SequencedTaskRunner>& task_runner,
const AttachmentService::StoreCallback& callback,
const AttachmentService::StoreResult& result) {
task_runner->PostTask(FROM_HERE, base::Bind(callback, result));
}
} // namespace } // namespace
AttachmentServiceProxy::AttachmentServiceProxy() { AttachmentServiceProxy::AttachmentServiceProxy() {
...@@ -58,10 +67,6 @@ AttachmentServiceProxy::AttachmentServiceProxy( ...@@ -58,10 +67,6 @@ AttachmentServiceProxy::AttachmentServiceProxy(
AttachmentServiceProxy::~AttachmentServiceProxy() { AttachmentServiceProxy::~AttachmentServiceProxy() {
} }
AttachmentStore* AttachmentServiceProxy::GetStore() {
return NULL;
}
void AttachmentServiceProxy::GetOrDownloadAttachments( void AttachmentServiceProxy::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids, const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) { const GetOrDownloadCallback& callback) {
...@@ -91,12 +96,17 @@ void AttachmentServiceProxy::DropAttachments( ...@@ -91,12 +96,17 @@ void AttachmentServiceProxy::DropAttachments(
proxy_callback)); proxy_callback));
} }
void AttachmentServiceProxy::UploadAttachments( void AttachmentServiceProxy::StoreAttachments(const AttachmentList& attachments,
const AttachmentIdSet& attachment_ids) { const StoreCallback& callback) {
DCHECK(wrapped_task_runner_); DCHECK(wrapped_task_runner_.get());
StoreCallback proxy_callback = base::Bind(
&ProxyStoreCallback, base::ThreadTaskRunnerHandle::Get(), callback);
wrapped_task_runner_->PostTask( wrapped_task_runner_->PostTask(
FROM_HERE, FROM_HERE,
base::Bind(&AttachmentService::UploadAttachments, core_, attachment_ids)); base::Bind(&AttachmentService::StoreAttachments,
core_,
attachments,
proxy_callback));
} }
AttachmentServiceProxy::Core::Core( AttachmentServiceProxy::Core::Core(
...@@ -107,10 +117,6 @@ AttachmentServiceProxy::Core::Core( ...@@ -107,10 +117,6 @@ AttachmentServiceProxy::Core::Core(
AttachmentServiceProxy::Core::~Core() { AttachmentServiceProxy::Core::~Core() {
} }
AttachmentStore* AttachmentServiceProxy::Core::GetStore() {
return NULL;
}
void AttachmentServiceProxy::Core::GetOrDownloadAttachments( void AttachmentServiceProxy::Core::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids, const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) { const GetOrDownloadCallback& callback) {
...@@ -129,12 +135,13 @@ void AttachmentServiceProxy::Core::DropAttachments( ...@@ -129,12 +135,13 @@ void AttachmentServiceProxy::Core::DropAttachments(
wrapped_->DropAttachments(attachment_ids, callback); wrapped_->DropAttachments(attachment_ids, callback);
} }
void AttachmentServiceProxy::Core::UploadAttachments( void AttachmentServiceProxy::Core::StoreAttachments(
const AttachmentIdSet& attachment_ids) { const AttachmentList& attachments,
const StoreCallback& callback) {
if (!wrapped_) { if (!wrapped_) {
return; return;
} }
wrapped_->UploadAttachments(attachment_ids); wrapped_->StoreAttachments(attachments, callback);
} }
} // namespace syncer } // namespace syncer
...@@ -34,8 +34,6 @@ class StubAttachmentService : public AttachmentService, ...@@ -34,8 +34,6 @@ class StubAttachmentService : public AttachmentService,
virtual ~StubAttachmentService() {} virtual ~StubAttachmentService() {}
virtual AttachmentStore* GetStore() OVERRIDE { return NULL; }
virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids, virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) const GetOrDownloadCallback& callback)
OVERRIDE { OVERRIDE {
...@@ -57,10 +55,12 @@ class StubAttachmentService : public AttachmentService, ...@@ -57,10 +55,12 @@ class StubAttachmentService : public AttachmentService,
FROM_HERE, base::Bind(callback, AttachmentService::DROP_SUCCESS)); FROM_HERE, base::Bind(callback, AttachmentService::DROP_SUCCESS));
} }
virtual void UploadAttachments( virtual void StoreAttachments(const AttachmentList& attachments,
const AttachmentIdSet& attachments_ids) OVERRIDE { const StoreCallback& callback) OVERRIDE {
CalledOnValidThread(); CalledOnValidThread();
Increment(); Increment();
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(callback, AttachmentService::STORE_SUCCESS));
} }
virtual base::WeakPtr<AttachmentService> AsWeakPtr() { virtual base::WeakPtr<AttachmentService> AsWeakPtr() {
...@@ -105,8 +105,11 @@ class AttachmentServiceProxyTest : public testing::Test, ...@@ -105,8 +105,11 @@ class AttachmentServiceProxyTest : public testing::Test,
base::Unretained(this)); base::Unretained(this));
callback_drop = base::Bind(&AttachmentServiceProxyTest::IncrementDrop, callback_drop = base::Bind(&AttachmentServiceProxyTest::IncrementDrop,
base::Unretained(this)); base::Unretained(this));
callback_store = base::Bind(&AttachmentServiceProxyTest::IncrementStore,
base::Unretained(this));
count_callback_get_or_download = 0; count_callback_get_or_download = 0;
count_callback_drop = 0; count_callback_drop = 0;
count_callback_store = 0;
} }
virtual void TearDown() virtual void TearDown()
...@@ -133,6 +136,12 @@ class AttachmentServiceProxyTest : public testing::Test, ...@@ -133,6 +136,12 @@ class AttachmentServiceProxyTest : public testing::Test,
++count_callback_drop; ++count_callback_drop;
} }
// a StoreCallback
void IncrementStore(const AttachmentService::StoreResult&) {
CalledOnValidThread();
++count_callback_store;
}
void WaitForStubThread() { void WaitForStubThread() {
base::WaitableEvent done(false, false); base::WaitableEvent done(false, false);
stub_thread->message_loop()->PostTask( stub_thread->message_loop()->PostTask(
...@@ -148,24 +157,23 @@ class AttachmentServiceProxyTest : public testing::Test, ...@@ -148,24 +157,23 @@ class AttachmentServiceProxyTest : public testing::Test,
AttachmentService::GetOrDownloadCallback callback_get_or_download; AttachmentService::GetOrDownloadCallback callback_get_or_download;
AttachmentService::DropCallback callback_drop; AttachmentService::DropCallback callback_drop;
AttachmentService::StoreCallback callback_store;
// number of times callback_get_or_download was invoked // number of times callback_get_or_download was invoked
int count_callback_get_or_download; int count_callback_get_or_download;
// number of times callback_drop was invoked // number of times callback_drop was invoked
int count_callback_drop; int count_callback_drop;
// number of times callback_store was invoked
int count_callback_store;
}; };
TEST_F(AttachmentServiceProxyTest, GetStore) { // Verify that each of AttachmentServiceProxy's callback methods (those that
EXPECT_EQ(NULL, proxy->GetStore()); // take callbacks) are invoked on the stub and that the passed callbacks are
} // invoked in this thread.
TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) {
// Verify that each of AttachmentServiceProxy's methods are invoked on the stub.
// Verify that the methods that take callbacks invoke passed callbacks on this
// thread.
TEST_F(AttachmentServiceProxyTest, MethodsAreProxied) {
proxy->GetOrDownloadAttachments(AttachmentIdList(), callback_get_or_download); proxy->GetOrDownloadAttachments(AttachmentIdList(), callback_get_or_download);
proxy->DropAttachments(AttachmentIdList(), callback_drop); proxy->DropAttachments(AttachmentIdList(), callback_drop);
proxy->UploadAttachments(AttachmentIdSet()); proxy->StoreAttachments(AttachmentList(), callback_store);
// Wait for the posted calls to execute in the stub thread. // Wait for the posted calls to execute in the stub thread.
WaitForStubThread(); WaitForStubThread();
EXPECT_EQ(3, stub->GetCallCount()); EXPECT_EQ(3, stub->GetCallCount());
...@@ -177,6 +185,7 @@ TEST_F(AttachmentServiceProxyTest, MethodsAreProxied) { ...@@ -177,6 +185,7 @@ TEST_F(AttachmentServiceProxyTest, MethodsAreProxied) {
loop.RunUntilIdle(); loop.RunUntilIdle();
EXPECT_EQ(1, count_callback_get_or_download); EXPECT_EQ(1, count_callback_get_or_download);
EXPECT_EQ(1, count_callback_drop); EXPECT_EQ(1, count_callback_drop);
EXPECT_EQ(1, count_callback_store);
} }
// Verify that it's safe to use an AttachmentServiceProxy even after its wrapped // Verify that it's safe to use an AttachmentServiceProxy even after its wrapped
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
namespace syncer { namespace syncer {
class AttachmentStore;
class SyncData; class SyncData;
// AttachmentService is responsible for managing a model type's attachments. // AttachmentService is responsible for managing a model type's attachments.
...@@ -45,6 +44,16 @@ class SYNC_EXPORT AttachmentService { ...@@ -45,6 +44,16 @@ class SYNC_EXPORT AttachmentService {
typedef base::Callback<void(const DropResult&)> DropCallback; typedef base::Callback<void(const DropResult&)> DropCallback;
// The result of a StoreAttachments operation.
enum StoreResult {
STORE_SUCCESS, // No error, all attachments stored (at least
// locally).
STORE_UNSPECIFIED_ERROR, // An unspecified error occurred. Some or all
// attachments may not have been stored.
};
typedef base::Callback<void(const StoreResult&)> StoreCallback;
// An interface that embedder code implements to be notified about different // An interface that embedder code implements to be notified about different
// events that originate from AttachmentService. // events that originate from AttachmentService.
// This interface will be called from the same thread AttachmentService was // This interface will be called from the same thread AttachmentService was
...@@ -61,11 +70,6 @@ class SYNC_EXPORT AttachmentService { ...@@ -61,11 +70,6 @@ class SYNC_EXPORT AttachmentService {
AttachmentService(); AttachmentService();
virtual ~AttachmentService(); virtual ~AttachmentService();
// Return a pointer to the AttachmentStore owned by this object.
//
// May return NULL.
virtual AttachmentStore* GetStore() = 0;
// See SyncData::GetOrDownloadAttachments. // See SyncData::GetOrDownloadAttachments.
virtual void GetOrDownloadAttachments( virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids, const AttachmentIdList& attachment_ids,
...@@ -75,18 +79,12 @@ class SYNC_EXPORT AttachmentService { ...@@ -75,18 +79,12 @@ class SYNC_EXPORT AttachmentService {
virtual void DropAttachments(const AttachmentIdList& attachment_ids, virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) = 0; const DropCallback& callback) = 0;
// Schedules the attachments identified by |attachment_ids| to be uploaded to // Store |attachments| on device and (eventually) upload them to the server.
// the server.
//
// Assumes the attachments are already in the attachment store.
//
// A request to upload attachments is persistent in that uploads will be
// automatically retried if transient errors occur.
//
// A request to upload attachments does not persist across restarts of Chrome.
// //
// Invokes OnAttachmentUploaded on the Delegate (if provided). // Invokes |callback| once the attachments have been written to device
virtual void UploadAttachments(const AttachmentIdSet& attachment_ids) = 0; // storage.
virtual void StoreAttachments(const AttachmentList& attachments,
const StoreCallback& callback) = 0;
}; };
} // namespace syncer } // namespace syncer
......
...@@ -5,8 +5,6 @@ ...@@ -5,8 +5,6 @@
#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_ #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_ #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
#include <deque>
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h" #include "base/threading/non_thread_safe.h"
...@@ -45,14 +43,13 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, ...@@ -45,14 +43,13 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
static scoped_ptr<syncer::AttachmentService> CreateForTest(); static scoped_ptr<syncer::AttachmentService> CreateForTest();
// AttachmentService implementation. // AttachmentService implementation.
virtual AttachmentStore* GetStore() OVERRIDE; virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids,
virtual void GetOrDownloadAttachments( const GetOrDownloadCallback& callback)
const AttachmentIdList& attachment_ids, OVERRIDE;
const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids, virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE; const DropCallback& callback) OVERRIDE;
virtual void UploadAttachments( virtual void StoreAttachments(const AttachmentList& attachments,
const AttachmentIdSet& attachment_ids) OVERRIDE; const StoreCallback& callback) OVERRIDE;
private: private:
class GetOrDownloadState; class GetOrDownloadState;
...@@ -63,17 +60,14 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, ...@@ -63,17 +60,14 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids); scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
void DropDone(const DropCallback& callback, void DropDone(const DropCallback& callback,
const AttachmentStore::Result& result); const AttachmentStore::Result& result);
void WriteDone(const StoreCallback& callback,
const AttachmentStore::Result& result);
void UploadDone(const AttachmentUploader::UploadResult& result, void UploadDone(const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id); const AttachmentId& attachment_id);
void DownloadDone(const scoped_refptr<GetOrDownloadState>& state, void DownloadDone(const scoped_refptr<GetOrDownloadState>& state,
const AttachmentId& attachment_id, const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result, const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment); scoped_ptr<Attachment> attachment);
void ProcessQueuedUploads();
void ReadDoneNowUpload(
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
const scoped_ptr<AttachmentStore> attachment_store_; const scoped_ptr<AttachmentStore> attachment_store_;
...@@ -86,13 +80,6 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, ...@@ -86,13 +80,6 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// May be null. // May be null.
Delegate* delegate_; Delegate* delegate_;
// Queue of attachment ids to be uploaded. Every entry in this queue should
// also exist in ids_in_queue_.
std::deque<AttachmentId> queue_;
// Ids of attachments currently being uploaded or queued for upload.
AttachmentIdSet ids_in_queue_;
// Must be last data member. // Must be last data member.
base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_; base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_;
......
...@@ -51,16 +51,13 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService { ...@@ -51,16 +51,13 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService {
virtual ~AttachmentServiceProxy(); virtual ~AttachmentServiceProxy();
// AttachmentService implementation. // AttachmentService implementation.
//
// GetStore always returns NULL.
virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments( virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids, const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE; const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids, virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE; const DropCallback& callback) OVERRIDE;
virtual void UploadAttachments( virtual void StoreAttachments(const AttachmentList& attachment,
const AttachmentIdSet& attachment_ids) OVERRIDE; const StoreCallback& callback) OVERRIDE;
protected: protected:
// Core does the work of proxying calls to AttachmentService methods from one // Core does the work of proxying calls to AttachmentService methods from one
...@@ -83,14 +80,13 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService { ...@@ -83,14 +80,13 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService {
Core(const base::WeakPtr<syncer::AttachmentService>& wrapped); Core(const base::WeakPtr<syncer::AttachmentService>& wrapped);
// AttachmentService implementation. // AttachmentService implementation.
virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments( virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids, const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE; const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids, virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE; const DropCallback& callback) OVERRIDE;
virtual void UploadAttachments( virtual void StoreAttachments(const AttachmentList& attachment,
const AttachmentIdSet& attachment_ids) OVERRIDE; const StoreCallback& callback) OVERRIDE;
protected: protected:
friend class base::RefCountedThreadSafe<Core>; friend class base::RefCountedThreadSafe<Core>;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment