Commit 0d2e78fa authored by Mikel Astiz's avatar Mikel Astiz Committed by Commit Bot

Move parsing of protos away from UI thread for pseudo-USS

We've seen reports (in non-pseudo-USS datatypes, e.g.
https://bugs.chromium.org/p/chromium/issues/detail?id=902203) that
suggest proto-parsing can take noticeable time if executed on the UI
thread.

Since this will be a common situation for most sync datatypes, let's add
a convenience function in ModelTypeStore itself and adopt it for
pseudo-USS types (SyncableServiceBasedBridge).

Bug: 870624
Change-Id: Id9cee20c7f384ac51d4f5353b4aae831b5e74749
Reviewed-on: https://chromium-review.googlesource.com/c/1349313Reviewed-by: default avatarMarc Treib <treib@chromium.org>
Commit-Queue: Mikel Astiz <mastiz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#610838}
parent 93657d1d
...@@ -61,6 +61,9 @@ class ModelTypeStore : public ModelTypeStoreBase { ...@@ -61,6 +61,9 @@ class ModelTypeStore : public ModelTypeStoreBase {
using ReadMetadataCallback = using ReadMetadataCallback =
base::OnceCallback<void(const base::Optional<ModelError>& error, base::OnceCallback<void(const base::Optional<ModelError>& error,
std::unique_ptr<MetadataBatch> metadata_batch)>; std::unique_ptr<MetadataBatch> metadata_batch)>;
// Callback that runs on the backend sequence, see ReadAllDataAndPreprocess().
using PreprocessCallback = base::OnceCallback<base::Optional<ModelError>(
std::unique_ptr<RecordList> data_records)>;
// Read operations return records either for all entries or only for ones // Read operations return records either for all entries or only for ones
// identified in |id_list|. |error| is nullopt if all records were read // identified in |id_list|. |error| is nullopt if all records were read
...@@ -74,6 +77,15 @@ class ModelTypeStore : public ModelTypeStoreBase { ...@@ -74,6 +77,15 @@ class ModelTypeStore : public ModelTypeStoreBase {
// operation, list of metadata records and global metadata. // operation, list of metadata records and global metadata.
virtual void ReadAllMetadata(ReadMetadataCallback callback) = 0; virtual void ReadAllMetadata(ReadMetadataCallback callback) = 0;
// Similar to ReadAllData() but allows some custom processing in the
// background sequence (e.g. proto parsing). Note that |preprocess_callback|
// will not run if reading itself triggers an error.
// |completion_on_frontend_sequence_callback| is guaranteed to outlive
// |preprocess_on_backend_sequence_callback|.
virtual void ReadAllDataAndPreprocess(
PreprocessCallback preprocess_on_backend_sequence_callback,
CallbackWithResult completion_on_frontend_sequence_callback) = 0;
// Creates write batch for write operations. // Creates write batch for write operations.
virtual std::unique_ptr<WriteBatch> CreateWriteBatch() = 0; virtual std::unique_ptr<WriteBatch> CreateWriteBatch() = 0;
......
...@@ -38,6 +38,14 @@ class ForwardingModelTypeStore : public ModelTypeStore { ...@@ -38,6 +38,14 @@ class ForwardingModelTypeStore : public ModelTypeStore {
other_->ReadAllMetadata(std::move(callback)); other_->ReadAllMetadata(std::move(callback));
} }
void ReadAllDataAndPreprocess(
PreprocessCallback preprocess_on_backend_sequence_callback,
CallbackWithResult completion_on_frontend_sequence_callback) override {
other_->ReadAllDataAndPreprocess(
std::move(preprocess_on_backend_sequence_callback),
std::move(completion_on_frontend_sequence_callback));
}
std::unique_ptr<WriteBatch> CreateWriteBatch() override { std::unique_ptr<WriteBatch> CreateWriteBatch() override {
return other_->CreateWriteBatch(); return other_->CreateWriteBatch();
} }
......
...@@ -18,6 +18,27 @@ ...@@ -18,6 +18,27 @@
namespace syncer { namespace syncer {
namespace {
base::Optional<ModelError> ReadAllDataAndPreprocessOnBackendSequence(
BlockingModelTypeStoreImpl* blocking_store,
ModelTypeStore::PreprocessCallback
preprocess_on_backend_sequence_callback) {
DCHECK(blocking_store);
auto record_list = std::make_unique<ModelTypeStoreBase::RecordList>();
base::Optional<ModelError> error =
blocking_store->ReadAllData(record_list.get());
if (error) {
return error;
}
return std::move(preprocess_on_backend_sequence_callback)
.Run(std::move(record_list));
}
} // namespace
ModelTypeStoreImpl::ModelTypeStoreImpl( ModelTypeStoreImpl::ModelTypeStoreImpl(
ModelType type, ModelType type,
std::unique_ptr<BlockingModelTypeStoreImpl, base::OnTaskRunnerDeleter> std::unique_ptr<BlockingModelTypeStoreImpl, base::OnTaskRunnerDeleter>
...@@ -124,6 +145,33 @@ void ModelTypeStoreImpl::ReadAllMetadataDone( ...@@ -124,6 +145,33 @@ void ModelTypeStoreImpl::ReadAllMetadataDone(
std::move(callback).Run({}, std::move(metadata_batch)); std::move(callback).Run({}, std::move(metadata_batch));
} }
void ModelTypeStoreImpl::ReadAllDataAndPreprocess(
PreprocessCallback preprocess_on_backend_sequence_callback,
CallbackWithResult completion_on_frontend_sequence_callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!preprocess_on_backend_sequence_callback.is_null());
DCHECK(!completion_on_frontend_sequence_callback.is_null());
auto task =
base::BindOnce(&ReadAllDataAndPreprocessOnBackendSequence,
base::Unretained(backend_store_.get()),
std::move(preprocess_on_backend_sequence_callback));
// ReadAllDataAndPreprocessDone() is only needed to guarantee that callbacks
// get cancelled if |this| gets destroyed.
auto reply =
base::BindOnce(&ModelTypeStoreImpl::ReadAllDataAndPreprocessDone,
weak_ptr_factory_.GetWeakPtr(),
std::move(completion_on_frontend_sequence_callback));
base::PostTaskAndReplyWithResult(backend_task_runner_.get(), FROM_HERE,
std::move(task), std::move(reply));
}
void ModelTypeStoreImpl::ReadAllDataAndPreprocessDone(
CallbackWithResult callback,
const base::Optional<ModelError>& error) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::move(callback).Run(error);
}
void ModelTypeStoreImpl::DeleteAllDataAndMetadata(CallbackWithResult callback) { void ModelTypeStoreImpl::DeleteAllDataAndMetadata(CallbackWithResult callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!callback.is_null()); DCHECK(!callback.is_null());
......
...@@ -37,6 +37,9 @@ class ModelTypeStoreImpl : public ModelTypeStore { ...@@ -37,6 +37,9 @@ class ModelTypeStoreImpl : public ModelTypeStore {
void ReadData(const IdList& id_list, ReadDataCallback callback) override; void ReadData(const IdList& id_list, ReadDataCallback callback) override;
void ReadAllData(ReadAllDataCallback callback) override; void ReadAllData(ReadAllDataCallback callback) override;
void ReadAllMetadata(ReadMetadataCallback callback) override; void ReadAllMetadata(ReadMetadataCallback callback) override;
void ReadAllDataAndPreprocess(
PreprocessCallback preprocess_on_backend_sequence_callback,
CallbackWithResult completion_on_frontend_sequence_callback) override;
std::unique_ptr<WriteBatch> CreateWriteBatch() override; std::unique_ptr<WriteBatch> CreateWriteBatch() override;
void CommitWriteBatch(std::unique_ptr<WriteBatch> write_batch, void CommitWriteBatch(std::unique_ptr<WriteBatch> write_batch,
CallbackWithResult callback) override; CallbackWithResult callback) override;
...@@ -54,6 +57,8 @@ class ModelTypeStoreImpl : public ModelTypeStore { ...@@ -54,6 +57,8 @@ class ModelTypeStoreImpl : public ModelTypeStore {
void ReadAllMetadataDone(ReadMetadataCallback callback, void ReadAllMetadataDone(ReadMetadataCallback callback,
std::unique_ptr<MetadataBatch> metadata_batch, std::unique_ptr<MetadataBatch> metadata_batch,
const base::Optional<ModelError>& error); const base::Optional<ModelError>& error);
void ReadAllDataAndPreprocessDone(CallbackWithResult callback,
const base::Optional<ModelError>& error);
void WriteModificationsDone(CallbackWithResult callback, void WriteModificationsDone(CallbackWithResult callback,
const base::Optional<ModelError>& error); const base::Optional<ModelError>& error);
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/optional.h" #include "base/optional.h"
#include "base/run_loop.h" #include "base/run_loop.h"
#include "base/test/bind_test_util.h"
#include "components/sync/model/model_error.h" #include "components/sync/model/model_error.h"
#include "components/sync/model/model_type_store_test_util.h" #include "components/sync/model/model_type_store_test_util.h"
#include "components/sync/protocol/entity_metadata.pb.h" #include "components/sync/protocol/entity_metadata.pb.h"
...@@ -26,6 +27,7 @@ namespace { ...@@ -26,6 +27,7 @@ namespace {
using testing::IsEmpty; using testing::IsEmpty;
using testing::Not; using testing::Not;
using testing::Pair;
using testing::SizeIs; using testing::SizeIs;
sync_pb::ModelTypeState CreateModelTypeState(const std::string& value) { sync_pb::ModelTypeState CreateModelTypeState(const std::string& value) {
...@@ -202,6 +204,51 @@ TEST_F(ModelTypeStoreImplTest, WriteThenRead) { ...@@ -202,6 +204,51 @@ TEST_F(ModelTypeStoreImplTest, WriteThenRead) {
{{"id1", CreateEntityMetadata("metadata1")}}); {{"id1", CreateEntityMetadata("metadata1")}});
} }
TEST_F(ModelTypeStoreImplTest, WriteThenReadWithPreprocessing) {
WriteTestData();
base::RunLoop loop;
std::map<std::string, std::string> preprocessed;
store()->ReadAllDataAndPreprocess(
base::BindLambdaForTesting(
[&](std::unique_ptr<ModelTypeStore::RecordList> record_list)
-> base::Optional<ModelError> {
for (const auto& record : *record_list) {
preprocessed[std::string("key_") + record.id] =
std::string("value_") + record.value;
}
return base::nullopt;
}),
base::BindLambdaForTesting([&](const base::Optional<ModelError>& error) {
EXPECT_FALSE(error) << error->ToString();
loop.Quit();
}));
loop.Run();
// Preprocessing function above prefixes "key_" and "value_" to keys and
// values respectively.
EXPECT_THAT(preprocessed,
testing::ElementsAre(Pair("key_id1", "value_data1"),
Pair("key_id2", "value_data2")));
}
TEST_F(ModelTypeStoreImplTest, WriteThenReadWithPreprocessingError) {
WriteTestData();
base::RunLoop loop;
store()->ReadAllDataAndPreprocess(
base::BindLambdaForTesting(
[&](std::unique_ptr<ModelTypeStore::RecordList> record_list)
-> base::Optional<ModelError> {
return ModelError(FROM_HERE, "Preprocessing error");
}),
base::BindLambdaForTesting([&](const base::Optional<ModelError>& error) {
EXPECT_TRUE(error);
loop.Quit();
}));
loop.Run();
}
// Test that records that DeleteAllDataAndMetadata() deletes everything. // Test that records that DeleteAllDataAndMetadata() deletes everything.
TEST_F(ModelTypeStoreImplTest, WriteThenDeleteAll) { TEST_F(ModelTypeStoreImplTest, WriteThenDeleteAll) {
WriteTestData(); WriteTestData();
......
...@@ -85,6 +85,27 @@ base::Optional<ModelError> ConvertToModelError(const SyncError& sync_error) { ...@@ -85,6 +85,27 @@ base::Optional<ModelError> ConvertToModelError(const SyncError& sync_error) {
return base::nullopt; return base::nullopt;
} }
// Parses the content of |record_list| into |*in_memory_store|. The output
// parameter is first for binding purposes.
base::Optional<ModelError> ParseInMemoryStoreOnBackendSequence(
SyncableServiceBasedBridge::InMemoryStore* in_memory_store,
std::unique_ptr<ModelTypeStore::RecordList> record_list) {
DCHECK(in_memory_store);
DCHECK(in_memory_store->empty());
DCHECK(record_list);
for (const ModelTypeStore::Record& record : *record_list) {
sync_pb::PersistedEntityData persisted_entity;
if (!persisted_entity.ParseFromString(record.value)) {
return ModelError(FROM_HERE, "Failed deserializing data.");
}
in_memory_store->emplace(record.id, std::move(persisted_entity));
}
return base::nullopt;
}
// Object to propagate local changes to the bridge, which will ultimately // Object to propagate local changes to the bridge, which will ultimately
// propagate them to the server. // propagate them to the server.
class LocalChangeProcessor : public SyncChangeProcessor { class LocalChangeProcessor : public SyncChangeProcessor {
...@@ -94,7 +115,7 @@ class LocalChangeProcessor : public SyncChangeProcessor { ...@@ -94,7 +115,7 @@ class LocalChangeProcessor : public SyncChangeProcessor {
const base::RepeatingCallback<void(const base::Optional<ModelError>&)>& const base::RepeatingCallback<void(const base::Optional<ModelError>&)>&
error_callback, error_callback,
ModelTypeStore* store, ModelTypeStore* store,
std::map<std::string, sync_pb::PersistedEntityData>* in_memory_store, SyncableServiceBasedBridge::InMemoryStore* in_memory_store,
scoped_refptr<SyncableServiceBasedBridge::ModelCryptographer> scoped_refptr<SyncableServiceBasedBridge::ModelCryptographer>
cryptographer, cryptographer,
ModelTypeChangeProcessor* other) ModelTypeChangeProcessor* other)
...@@ -250,7 +271,7 @@ class LocalChangeProcessor : public SyncChangeProcessor { ...@@ -250,7 +271,7 @@ class LocalChangeProcessor : public SyncChangeProcessor {
const base::RepeatingCallback<void(const base::Optional<ModelError>&)> const base::RepeatingCallback<void(const base::Optional<ModelError>&)>
error_callback_; error_callback_;
ModelTypeStore* const store_; ModelTypeStore* const store_;
std::map<std::string, sync_pb::PersistedEntityData>* const in_memory_store_; SyncableServiceBasedBridge::InMemoryStore* const in_memory_store_;
const scoped_refptr<SyncableServiceBasedBridge::ModelCryptographer> const scoped_refptr<SyncableServiceBasedBridge::ModelCryptographer>
cryptographer_; cryptographer_;
ModelTypeChangeProcessor* const other_; ModelTypeChangeProcessor* const other_;
...@@ -500,7 +521,7 @@ std::unique_ptr<SyncChangeProcessor> ...@@ -500,7 +521,7 @@ std::unique_ptr<SyncChangeProcessor>
SyncableServiceBasedBridge::CreateLocalChangeProcessorForTesting( SyncableServiceBasedBridge::CreateLocalChangeProcessorForTesting(
ModelType type, ModelType type,
ModelTypeStore* store, ModelTypeStore* store,
std::map<std::string, sync_pb::PersistedEntityData>* in_memory_store, InMemoryStore* in_memory_store,
ModelTypeChangeProcessor* other) { ModelTypeChangeProcessor* other) {
return std::make_unique<LocalChangeProcessor>( return std::make_unique<LocalChangeProcessor>(
type, /*error_callback=*/base::DoNothing(), store, in_memory_store, type, /*error_callback=*/base::DoNothing(), store, in_memory_store,
...@@ -520,15 +541,21 @@ void SyncableServiceBasedBridge::OnStoreCreated( ...@@ -520,15 +541,21 @@ void SyncableServiceBasedBridge::OnStoreCreated(
DCHECK(store); DCHECK(store);
store_ = std::move(store); store_ = std::move(store);
store_->ReadAllData( auto in_memory_store = std::make_unique<InMemoryStore>();
InMemoryStore* raw_in_memory_store = in_memory_store.get();
store_->ReadAllDataAndPreprocess(
base::BindOnce(&ParseInMemoryStoreOnBackendSequence,
base::Unretained(raw_in_memory_store)),
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllDataForInit, base::BindOnce(&SyncableServiceBasedBridge::OnReadAllDataForInit,
weak_ptr_factory_.GetWeakPtr())); weak_ptr_factory_.GetWeakPtr(),
std::move(in_memory_store)));
} }
void SyncableServiceBasedBridge::OnReadAllDataForInit( void SyncableServiceBasedBridge::OnReadAllDataForInit(
const base::Optional<ModelError>& error, std::unique_ptr<InMemoryStore> in_memory_store,
std::unique_ptr<ModelTypeStore::RecordList> record_list) { const base::Optional<ModelError>& error) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(in_memory_store.get());
DCHECK(in_memory_store_.empty()); DCHECK(in_memory_store_.empty());
if (error) { if (error) {
...@@ -536,16 +563,7 @@ void SyncableServiceBasedBridge::OnReadAllDataForInit( ...@@ -536,16 +563,7 @@ void SyncableServiceBasedBridge::OnReadAllDataForInit(
return; return;
} }
for (const ModelTypeStore::Record& record : *record_list) { in_memory_store_ = std::move(*in_memory_store);
sync_pb::PersistedEntityData persisted_entity;
if (!persisted_entity.ParseFromString(record.value)) {
change_processor()->ReportError(
{FROM_HERE, "Failed deserializing data."});
return;
}
in_memory_store_[record.id] = persisted_entity;
}
store_->ReadAllMetadata( store_->ReadAllMetadata(
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllMetadataForInit, base::BindOnce(&SyncableServiceBasedBridge::OnReadAllMetadataForInit,
......
...@@ -38,6 +38,8 @@ class SyncableService; ...@@ -38,6 +38,8 @@ class SyncableService;
// considered an implementation detail. // considered an implementation detail.
class SyncableServiceBasedBridge : public ModelTypeSyncBridge { class SyncableServiceBasedBridge : public ModelTypeSyncBridge {
public: public:
using InMemoryStore = std::map<std::string, sync_pb::PersistedEntityData>;
// Used for passwords only. // Used for passwords only.
// TODO(crbug.com/856941): Remove when PASSWORDS are migrated to USS, which // TODO(crbug.com/856941): Remove when PASSWORDS are migrated to USS, which
// will likely make this API unnecessary. // will likely make this API unnecessary.
...@@ -96,18 +98,16 @@ class SyncableServiceBasedBridge : public ModelTypeSyncBridge { ...@@ -96,18 +98,16 @@ class SyncableServiceBasedBridge : public ModelTypeSyncBridge {
// For testing. // For testing.
static std::unique_ptr<SyncChangeProcessor> static std::unique_ptr<SyncChangeProcessor>
CreateLocalChangeProcessorForTesting( CreateLocalChangeProcessorForTesting(ModelType type,
ModelType type,
ModelTypeStore* store, ModelTypeStore* store,
std::map<std::string, sync_pb::PersistedEntityData>* in_memory_store, InMemoryStore* in_memory_store,
ModelTypeChangeProcessor* other); ModelTypeChangeProcessor* other);
private: private:
void OnStoreCreated(const base::Optional<ModelError>& error, void OnStoreCreated(const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore> store); std::unique_ptr<ModelTypeStore> store);
void OnReadAllDataForInit( void OnReadAllDataForInit(std::unique_ptr<InMemoryStore> in_memory_store,
const base::Optional<ModelError>& error, const base::Optional<ModelError>& error);
std::unique_ptr<ModelTypeStore::RecordList> record_list);
void OnReadAllMetadataForInit(const base::Optional<ModelError>& error, void OnReadAllMetadataForInit(const base::Optional<ModelError>& error,
std::unique_ptr<MetadataBatch> metadata_batch); std::unique_ptr<MetadataBatch> metadata_batch);
base::Optional<ModelError> MaybeStartSyncableService() WARN_UNUSED_RESULT; base::Optional<ModelError> MaybeStartSyncableService() WARN_UNUSED_RESULT;
...@@ -134,7 +134,7 @@ class SyncableServiceBasedBridge : public ModelTypeSyncBridge { ...@@ -134,7 +134,7 @@ class SyncableServiceBasedBridge : public ModelTypeSyncBridge {
// In-memory copy of |store_|, needed for remote deletions, because we need to // In-memory copy of |store_|, needed for remote deletions, because we need to
// provide specifics of the deleted entity to the SyncableService. // provide specifics of the deleted entity to the SyncableService.
std::map<std::string, sync_pb::PersistedEntityData> in_memory_store_; InMemoryStore in_memory_store_;
SEQUENCE_CHECKER(sequence_checker_); SEQUENCE_CHECKER(sequence_checker_);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment