Commit 2c8259e6 authored by Mikel Astiz's avatar Mikel Astiz Committed by Commit Bot

Introduce generic bridge for SyncableService-s

The class allows legacy datatypes (non-USS), which implement
SyncableService, to be integrated within the USS architecture, making
it possible to reuse central objects like
ClientTagBasedModelTypeProcessor and ModelTypeWorker.

Design Doc (Googlers only):
https://docs.google.com/document/d/14ScYZ0sop921gjBwXuReIEuQJlwftqkuSM1jMK_A_x4

In this first patch, we introduce the bridge itself. Follow-up patches
will introduce the necessary plumbing to exercise it.

Expected main benefits:
- Unify logic across datatypes by avoiding subtle behavioral differences
  across architectures.
- Remove lots of code (~25K LoC), hence:
  a) reduce Chrome binary size.
  b) reduce maintenance cost.
  c) eng ramp-up time.
- Improve resource footprint (less RAM, ~50% savings).
- Reduce the gap for datatypes to actually migrate to USS.
- Unblock multiple cleanup work, including a massive simplification of
  DataTypeManager and related classes, including controllers.

Bug: 870624
Change-Id: I1bd7f553bf22886c5136e7e12f13b37a3dc77a39
Reviewed-on: https://chromium-review.googlesource.com/1164742Reviewed-by: default avatarMarc Treib <treib@chromium.org>
Commit-Queue: Mikel Astiz <mastiz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#595017}
parent aac52ed0
...@@ -450,6 +450,8 @@ jumbo_static_library("sync") { ...@@ -450,6 +450,8 @@ jumbo_static_library("sync") {
"model_impl/proxy_model_type_controller_delegate.h", "model_impl/proxy_model_type_controller_delegate.h",
"model_impl/sync_metadata_store_change_list.cc", "model_impl/sync_metadata_store_change_list.cc",
"model_impl/sync_metadata_store_change_list.h", "model_impl/sync_metadata_store_change_list.h",
"model_impl/syncable_service_based_bridge.cc",
"model_impl/syncable_service_based_bridge.h",
"protocol/proto_enum_conversions.cc", "protocol/proto_enum_conversions.cc",
"protocol/proto_enum_conversions.h", "protocol/proto_enum_conversions.h",
"protocol/proto_memory_estimations.cc", "protocol/proto_memory_estimations.cc",
...@@ -902,6 +904,7 @@ source_set("unit_tests") { ...@@ -902,6 +904,7 @@ source_set("unit_tests") {
"model_impl/model_type_store_backend_unittest.cc", "model_impl/model_type_store_backend_unittest.cc",
"model_impl/model_type_store_impl_unittest.cc", "model_impl/model_type_store_impl_unittest.cc",
"model_impl/processor_entity_tracker_unittest.cc", "model_impl/processor_entity_tracker_unittest.cc",
"model_impl/syncable_service_based_bridge_unittest.cc",
"protocol/proto_enum_conversions_unittest.cc", "protocol/proto_enum_conversions_unittest.cc",
"protocol/proto_value_conversions_unittest.cc", "protocol/proto_value_conversions_unittest.cc",
"syncable/change_record_unittest.cc", "syncable/change_record_unittest.cc",
......
...@@ -108,10 +108,10 @@ class ModelTypeSyncBridge { ...@@ -108,10 +108,10 @@ class ModelTypeSyncBridge {
// Get or generate a client tag for |entity_data|. This must be the same tag // Get or generate a client tag for |entity_data|. This must be the same tag
// that was/would have been generated in the SyncableService/Directory world // that was/would have been generated in the SyncableService/Directory world
// for backward compatibility with pre-USS clients. The only time this // for backward compatibility with pre-USS clients. The only time this
// theoretically needs to be called is on the creation of local data, however // theoretically needs to be called is on the creation of local data.
// it is also used to verify the hash of remote data. If a model type was //
// never launched pre-USS, then method does not need to be different from // If a model type was never launched pre-USS, then method does not need to be
// GetStorageKey(). Only the hash of this value is kept. // different from GetStorageKey(). Only the hash of this value is kept.
virtual std::string GetClientTag(const EntityData& entity_data) = 0; virtual std::string GetClientTag(const EntityData& entity_data) = 0;
// Must not be called unless SupportsGetStorageKey() returns true. // Must not be called unless SupportsGetStorageKey() returns true.
......
...@@ -1169,6 +1169,8 @@ size_t ClientTagBasedModelTypeProcessor::EstimateMemoryUsage() const { ...@@ -1169,6 +1169,8 @@ size_t ClientTagBasedModelTypeProcessor::EstimateMemoryUsage() const {
memory_usage += EstimateMemoryUsage(model_type_state_); memory_usage += EstimateMemoryUsage(model_type_state_);
memory_usage += EstimateMemoryUsage(entities_); memory_usage += EstimateMemoryUsage(entities_);
memory_usage += EstimateMemoryUsage(storage_key_to_tag_hash_); memory_usage += EstimateMemoryUsage(storage_key_to_tag_hash_);
// TODO(crbug.com/870624): Let bridges provide custom additional memory
// overhead, which is important for SyncableServiceBasedBridge.
return memory_usage; return memory_usage;
} }
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/model_impl/syncable_service_based_bridge.h"
#include <stdint.h>
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "components/sync/base/hash_util.h"
#include "components/sync/model/mutable_data_batch.h"
#include "components/sync/model/sync_change.h"
#include "components/sync/model/sync_error_factory.h"
#include "components/sync/model/syncable_service.h"
#include "components/sync/model_impl/client_tag_based_model_type_processor.h"
#include "components/sync/protocol/persisted_entity_data.pb.h"
namespace syncer {
namespace {
// Same as kInvalidId in syncable/base_node.h.
constexpr int64_t kInvalidId = 0;
std::unique_ptr<EntityData> ConvertPersistedToEntityData(
const std::string& client_tag_hash,
std::unique_ptr<sync_pb::PersistedEntityData> data) {
DCHECK(data);
DCHECK(!client_tag_hash.empty());
DCHECK(!data->non_unique_name().empty());
auto entity_data = std::make_unique<EntityData>();
entity_data->non_unique_name = std::move(*data->mutable_non_unique_name());
entity_data->specifics.Swap(data->mutable_specifics());
entity_data->client_tag_hash = client_tag_hash;
return entity_data;
}
sync_pb::PersistedEntityData CreatePersistedFromEntityData(
const EntityData& entity_data) {
DCHECK(!entity_data.non_unique_name.empty());
sync_pb::PersistedEntityData persisted;
persisted.set_non_unique_name(entity_data.non_unique_name);
*persisted.mutable_specifics() = entity_data.specifics;
return persisted;
}
std::unique_ptr<sync_pb::PersistedEntityData> CreatePersistedFromSyncData(
const SyncDataLocal& sync_data) {
DCHECK(!sync_data.GetTitle().empty());
auto persisted = std::make_unique<sync_pb::PersistedEntityData>();
persisted->set_non_unique_name(sync_data.GetTitle());
*persisted->mutable_specifics() = sync_data.GetSpecifics();
return persisted;
}
SyncChange::SyncChangeType ConvertToSyncChangeType(
EntityChange::ChangeType type) {
switch (type) {
case EntityChange::ACTION_DELETE:
return SyncChange::ACTION_DELETE;
case EntityChange::ACTION_ADD:
return SyncChange::ACTION_ADD;
case EntityChange::ACTION_UPDATE:
return SyncChange::ACTION_UPDATE;
}
NOTREACHED();
return SyncChange::ACTION_INVALID;
}
base::Optional<ModelError> ConvertToModelError(const SyncError& sync_error) {
if (sync_error.IsSet()) {
return ModelError(sync_error.location(), sync_error.message());
}
return base::nullopt;
}
// Object to propagate local changes to the bridge, which will ultimately
// propagate them to the server.
class ChangeProcessorImpl : public SyncChangeProcessor {
public:
ChangeProcessorImpl(
ModelType type,
const base::RepeatingCallback<void(const base::Optional<ModelError>&)>&
error_callback,
ModelTypeStore* store,
std::map<std::string, sync_pb::EntitySpecifics>* in_memory_store,
ModelTypeChangeProcessor* other)
: type_(type),
error_callback_(error_callback),
store_(store),
in_memory_store_(in_memory_store),
other_(other) {
DCHECK(store);
DCHECK(other);
}
~ChangeProcessorImpl() override {}
SyncError ProcessSyncChanges(const base::Location& from_here,
const SyncChangeList& change_list) override {
std::unique_ptr<ModelTypeStore::WriteBatch> batch =
store_->CreateWriteBatch();
for (const SyncChange& change : change_list) {
DCHECK(change.sync_data().IsLocal())
<< " from " << change.location().ToString();
SyncDataLocal sync_data(change.sync_data());
DCHECK(sync_data.IsValid()) << " from " << change.location().ToString();
const std::string storage_key =
GenerateSyncableHash(type_, sync_data.GetTag());
DCHECK(!storage_key.empty());
switch (change.change_type()) {
case SyncChange::ACTION_INVALID:
NOTREACHED() << " from " << change.location().ToString();
break;
case SyncChange::ACTION_ADD:
case SyncChange::ACTION_UPDATE: {
(*in_memory_store_)[storage_key] = sync_data.GetSpecifics();
std::unique_ptr<sync_pb::PersistedEntityData> persisted_entity =
CreatePersistedFromSyncData(sync_data);
batch->WriteData(storage_key, persisted_entity->SerializeAsString());
other_->Put(
storage_key,
ConvertPersistedToEntityData(
/*client_tag_hash=*/storage_key, std::move(persisted_entity)),
batch->GetMetadataChangeList());
break;
}
case SyncChange::ACTION_DELETE:
in_memory_store_->erase(storage_key);
other_->Delete(storage_key, batch->GetMetadataChangeList());
batch->DeleteData(storage_key);
break;
}
}
store_->CommitWriteBatch(std::move(batch), error_callback_);
return SyncError();
}
SyncDataList GetAllSyncData(ModelType type) const override {
// This function is not supported and not exercised by the relevant
// datatypes (that are integrated with this bridge).
NOTREACHED();
return SyncDataList();
}
SyncError UpdateDataTypeContext(ModelType type,
ContextRefreshStatus refresh_status,
const std::string& context) override {
// This function is not supported and not exercised by anyone, since
// the USS flow doesn't use SharedChangeProcessor.
// TODO(crbug.com/870624): Remove this function altogether when the
// directory codebase is removed.
NOTREACHED();
return SyncError();
}
void AddLocalChangeObserver(LocalChangeObserver* observer) override {
// This function is not supported and not exercised by the relevant
// datatypes (that are integrated with this bridge).
NOTREACHED();
}
void RemoveLocalChangeObserver(LocalChangeObserver* observer) override {
// This function is not supported and not exercised by the relevant
// datatypes (that are integrated with this bridge).
NOTREACHED();
}
private:
const ModelType type_;
const base::RepeatingCallback<void(const base::Optional<ModelError>&)>
error_callback_;
ModelTypeStore* const store_;
std::map<std::string, sync_pb::EntitySpecifics>* const in_memory_store_;
ModelTypeChangeProcessor* const other_;
DISALLOW_COPY_AND_ASSIGN(ChangeProcessorImpl);
};
class SyncErrorFactoryImpl : public SyncErrorFactory {
public:
explicit SyncErrorFactoryImpl(ModelType type) : type_(type) {}
~SyncErrorFactoryImpl() override = default;
SyncError CreateAndUploadError(const base::Location& location,
const std::string& message) override {
// Uploading is not supported, we simply return the error.
return SyncError(location, SyncError::DATATYPE_ERROR, message, type_);
}
private:
const ModelType type_;
DISALLOW_COPY_AND_ASSIGN(SyncErrorFactoryImpl);
};
} // namespace
SyncableServiceBasedBridge::SyncableServiceBasedBridge(
ModelType type,
OnceModelTypeStoreFactory store_factory,
std::unique_ptr<ModelTypeChangeProcessor> change_processor,
SyncableService* syncable_service)
: ModelTypeSyncBridge(std::move(change_processor)),
type_(type),
syncable_service_(syncable_service),
store_factory_(std::move(store_factory)),
syncable_service_started_(false),
weak_ptr_factory_(this) {
DCHECK(store_factory_);
DCHECK(syncable_service_);
}
SyncableServiceBasedBridge::~SyncableServiceBasedBridge() {
// Stop the syncable service to make sure instances of ChangeProcessorImpl are
// not continued to be used.
if (syncable_service_started_) {
syncable_service_->StopSyncing(type_);
}
}
std::unique_ptr<MetadataChangeList>
SyncableServiceBasedBridge::CreateMetadataChangeList() {
return ModelTypeStore::WriteBatch::CreateMetadataChangeList();
}
void SyncableServiceBasedBridge::OnSyncStarting(
const DataTypeActivationRequest& request) {
DCHECK(!syncable_service_started_);
if (!store_factory_) {
// Sync was have been started earlier, and |store_| is guaranteed to be
// initialized because stopping of the datatype cannot be completed before
// ModelReadyToSync().
DCHECK(store_);
MaybeStartSyncableService();
return;
}
std::move(store_factory_)
.Run(type_, base::BindOnce(&SyncableServiceBasedBridge::OnStoreCreated,
weak_ptr_factory_.GetWeakPtr()));
DCHECK(!store_factory_);
}
base::Optional<ModelError> SyncableServiceBasedBridge::MergeSyncData(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) {
DCHECK(store_);
DCHECK(change_processor()->IsTrackingMetadata());
DCHECK(!syncable_service_started_);
DCHECK(in_memory_store_.empty());
const SyncChangeList sync_change_list = StoreAndConvertRemoteChanges(
std::move(metadata_change_list), std::move(entity_change_list));
SyncDataList initial_sync_data;
initial_sync_data.reserve(sync_change_list.size());
for (const SyncChange& change : sync_change_list) {
initial_sync_data.push_back(change.sync_data());
}
auto error_callback =
base::BindRepeating(&SyncableServiceBasedBridge::ReportErrorIfSet,
weak_ptr_factory_.GetWeakPtr());
auto processor_impl = std::make_unique<ChangeProcessorImpl>(
type_, error_callback, store_.get(), &in_memory_store_,
change_processor());
const base::Optional<ModelError> merge_error = ConvertToModelError(
syncable_service_
->MergeDataAndStartSyncing(
type_, initial_sync_data, std::move(processor_impl),
std::make_unique<SyncErrorFactoryImpl>(type_))
.error());
if (!merge_error) {
syncable_service_started_ = true;
}
return merge_error;
}
base::Optional<ModelError> SyncableServiceBasedBridge::ApplySyncChanges(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) {
DCHECK(store_);
DCHECK(change_processor()->IsTrackingMetadata());
DCHECK(syncable_service_started_);
const SyncChangeList sync_change_list = StoreAndConvertRemoteChanges(
std::move(metadata_change_list), std::move(entity_change_list));
if (sync_change_list.empty()) {
return base::nullopt;
}
return ConvertToModelError(
syncable_service_->ProcessSyncChanges(FROM_HERE, sync_change_list));
}
void SyncableServiceBasedBridge::GetData(StorageKeyList storage_keys,
DataCallback callback) {
DCHECK(store_);
store_->ReadData(
storage_keys,
base::BindOnce(&SyncableServiceBasedBridge::OnReadDataForProcessor,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
void SyncableServiceBasedBridge::GetAllDataForDebugging(DataCallback callback) {
DCHECK(store_);
store_->ReadAllData(
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllDataForProcessor,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
std::string SyncableServiceBasedBridge::GetClientTag(
const EntityData& entity_data) {
NOTREACHED();
return std::string();
}
std::string SyncableServiceBasedBridge::GetStorageKey(
const EntityData& entity_data) {
// Not supported as per SupportsGetStorageKey().
NOTREACHED();
return std::string();
}
bool SyncableServiceBasedBridge::SupportsGetClientTag() const {
return false;
}
bool SyncableServiceBasedBridge::SupportsGetStorageKey() const {
return false;
}
ConflictResolution SyncableServiceBasedBridge::ResolveConflict(
const EntityData& local_data,
const EntityData& remote_data) const {
if (!remote_data.is_deleted()) {
return ConflictResolution::UseRemote();
}
DCHECK(!local_data.is_deleted());
// Ignore local changes for extensions/apps when server had a delete, to
// avoid unwanted reinstall of an uninstalled extension.
if (type_ == EXTENSIONS || type_ == APPS) {
DVLOG(1) << "Resolving conflict, ignoring local changes for extension/app";
return ConflictResolution::UseRemote();
}
return ConflictResolution::UseLocal();
}
ModelTypeSyncBridge::StopSyncResponse
SyncableServiceBasedBridge::ApplyStopSyncChanges(
std::unique_ptr<MetadataChangeList> delete_metadata_change_list) {
DCHECK(store_);
if (delete_metadata_change_list) {
in_memory_store_.clear();
store_->DeleteAllDataAndMetadata(base::DoNothing());
}
if (syncable_service_started_) {
syncable_service_->StopSyncing(type_);
syncable_service_started_ = false;
}
return StopSyncResponse::kModelStillReadyToSync;
}
void SyncableServiceBasedBridge::OnStoreCreated(
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore> store) {
if (error) {
change_processor()->ReportError(*error);
return;
}
DCHECK(store);
store_ = std::move(store);
store_->ReadAllData(
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllDataForInit,
weak_ptr_factory_.GetWeakPtr()));
}
void SyncableServiceBasedBridge::OnReadAllDataForInit(
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list) {
DCHECK(in_memory_store_.empty());
if (error) {
change_processor()->ReportError(*error);
return;
}
for (const ModelTypeStore::Record& record : *record_list) {
sync_pb::PersistedEntityData persisted_entity;
if (!persisted_entity.ParseFromString(record.value)) {
change_processor()->ReportError(
{FROM_HERE, "Failed deserializing data."});
return;
}
in_memory_store_[record.id] = persisted_entity.specifics();
}
store_->ReadAllMetadata(
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllMetadataForInit,
weak_ptr_factory_.GetWeakPtr()));
}
void SyncableServiceBasedBridge::OnReadAllMetadataForInit(
const base::Optional<ModelError>& error,
std::unique_ptr<MetadataBatch> metadata_batch) {
DCHECK(!syncable_service_started_);
if (error) {
change_processor()->ReportError(*error);
return;
}
change_processor()->ModelReadyToSync(std::move(metadata_batch));
MaybeStartSyncableService();
}
void SyncableServiceBasedBridge::MaybeStartSyncableService() {
DCHECK(!syncable_service_started_);
DCHECK(store_);
// If sync wasn't enabled according to the loaded metadata, let's wait until
// MergeSyncData() is called before starting the SyncableService.
if (!change_processor()->IsTrackingMetadata()) {
return;
}
// Sync enabled, so exercise MergeDataAndStartSyncing() immediately, since
// this function is reached only if sync is starting already.
SyncDataList initial_sync_data;
initial_sync_data.reserve(in_memory_store_.size());
for (const std::pair<std::string, sync_pb::EntitySpecifics>& record :
in_memory_store_) {
initial_sync_data.push_back(SyncData::CreateRemoteData(
/*id=*/kInvalidId, record.second,
/*last_modified_time=*/base::Time(), // Used by legacy sessions only.
/*client_tag_hash=*/record.first));
}
auto error_callback =
base::BindRepeating(&SyncableServiceBasedBridge::ReportErrorIfSet,
weak_ptr_factory_.GetWeakPtr());
auto processor_impl = std::make_unique<ChangeProcessorImpl>(
type_, error_callback, store_.get(), &in_memory_store_,
change_processor());
const base::Optional<ModelError> merge_error = ConvertToModelError(
syncable_service_
->MergeDataAndStartSyncing(
type_, initial_sync_data, std::move(processor_impl),
std::make_unique<SyncErrorFactoryImpl>(type_))
.error());
if (merge_error) {
change_processor()->ReportError(*merge_error);
} else {
syncable_service_started_ = true;
}
}
SyncChangeList SyncableServiceBasedBridge::StoreAndConvertRemoteChanges(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) {
std::unique_ptr<ModelTypeStore::WriteBatch> batch =
store_->CreateWriteBatch();
SyncChangeList output_change_list;
output_change_list.reserve(entity_change_list.size());
for (const EntityChange& change : entity_change_list) {
switch (change.type()) {
case EntityChange::ACTION_DELETE: {
const std::string& storage_key = change.storage_key();
DCHECK_NE(0U, in_memory_store_.count(storage_key));
DVLOG(1) << ModelTypeToString(type_)
<< ": Processing deletion with storage key: " << storage_key;
output_change_list.emplace_back(
FROM_HERE, SyncChange::ACTION_DELETE,
SyncData::CreateRemoteData(
/*id=*/kInvalidId, in_memory_store_[storage_key],
change.data().modification_time,
change.data().client_tag_hash));
// For tombstones, there is no actual data, which means no client tag
// hash either, but the processor provides the storage key.
DCHECK(!storage_key.empty());
batch->DeleteData(storage_key);
in_memory_store_.erase(storage_key);
break;
}
case EntityChange::ACTION_ADD:
// Because we use the client tag hash as storage key, let the processor
// know.
change_processor()->UpdateStorageKey(
change.data(), /*storage_key=*/change.data().client_tag_hash,
metadata_change_list.get());
FALLTHROUGH;
case EntityChange::ACTION_UPDATE: {
const std::string& storage_key = change.data().client_tag_hash;
DVLOG(1) << ModelTypeToString(type_)
<< ": Processing add/update with key: " << storage_key;
output_change_list.emplace_back(
FROM_HERE, ConvertToSyncChangeType(change.type()),
SyncData::CreateRemoteData(
/*id=*/kInvalidId, change.data().specifics,
change.data().modification_time,
change.data().client_tag_hash));
batch->WriteData(
storage_key,
CreatePersistedFromEntityData(change.data()).SerializeAsString());
in_memory_store_[storage_key] = change.data().specifics;
break;
}
}
}
store_->CommitWriteBatch(
std::move(batch),
base::BindOnce(&SyncableServiceBasedBridge::ReportErrorIfSet,
weak_ptr_factory_.GetWeakPtr()));
return output_change_list;
}
void SyncableServiceBasedBridge::OnReadDataForProcessor(
DataCallback callback,
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list,
std::unique_ptr<ModelTypeStore::IdList> missing_id_list) {
OnReadAllDataForProcessor(std::move(callback), error, std::move(record_list));
}
void SyncableServiceBasedBridge::OnReadAllDataForProcessor(
DataCallback callback,
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list) {
if (error) {
change_processor()->ReportError(*error);
return;
}
auto batch = std::make_unique<MutableDataBatch>();
for (const ModelTypeStore::Record& record : *record_list) {
auto persisted_entity = std::make_unique<sync_pb::PersistedEntityData>();
if (record.id.empty() || !persisted_entity->ParseFromString(record.value)) {
change_processor()->ReportError(
{FROM_HERE, "Failed deserializing data."});
return;
}
// Note that client tag hash is used as storage key too.
batch->Put(record.id,
ConvertPersistedToEntityData(
/*client_tag_hash=*/record.id, std::move(persisted_entity)));
}
std::move(callback).Run(std::move(batch));
}
void SyncableServiceBasedBridge::ReportErrorIfSet(
const base::Optional<ModelError>& error) {
if (error) {
change_processor()->ReportError(*error);
}
}
} // namespace syncer
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_SYNC_MODEL_IMPL_SYNCABLE_SERVICE_BASED_BRIDGE_H_
#define COMPONENTS_SYNC_MODEL_IMPL_SYNCABLE_SERVICE_BASED_BRIDGE_H_
#include <map>
#include <memory>
#include <string>
#include "base/callback_forward.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "components/sync/model/model_error.h"
#include "components/sync/model/model_type_store.h"
#include "components/sync/model/model_type_sync_bridge.h"
#include "components/sync/model/sync_change_processor.h"
namespace syncer {
class MetadataBatch;
class ModelTypeChangeProcessor;
class SyncableService;
// Implementation of ModelTypeSyncBridge that allows integrating legacy
// datatypes that implement SyncableService. Internally, it uses a database to
// persist and mimic the legacy directory's behavior, but as opposed to the
// legacy directory, it's not exposed anywhere outside this bridge, and is
// considered an implementation detail.
class SyncableServiceBasedBridge : public ModelTypeSyncBridge {
public:
// Pointers must not be null and |syncable_service| must outlive this object.
SyncableServiceBasedBridge(
ModelType type,
OnceModelTypeStoreFactory store_factory,
std::unique_ptr<ModelTypeChangeProcessor> change_processor,
SyncableService* syncable_service);
~SyncableServiceBasedBridge() override;
// ModelTypeSyncBridge implementation.
void OnSyncStarting(const DataTypeActivationRequest& request) override;
std::unique_ptr<MetadataChangeList> CreateMetadataChangeList() override;
base::Optional<ModelError> MergeSyncData(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) override;
base::Optional<ModelError> ApplySyncChanges(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) override;
void GetData(StorageKeyList storage_keys, DataCallback callback) override;
void GetAllDataForDebugging(DataCallback callback) override;
std::string GetClientTag(const EntityData& entity_data) override;
std::string GetStorageKey(const EntityData& entity_data) override;
bool SupportsGetClientTag() const override;
bool SupportsGetStorageKey() const override;
ConflictResolution ResolveConflict(
const EntityData& local_data,
const EntityData& remote_data) const override;
StopSyncResponse ApplyStopSyncChanges(
std::unique_ptr<MetadataChangeList> delete_metadata_change_list) override;
private:
void OnStoreCreated(const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore> store);
void OnReadAllDataForInit(
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list);
void OnReadAllMetadataForInit(const base::Optional<ModelError>& error,
std::unique_ptr<MetadataBatch> metadata_batch);
void MaybeStartSyncableService();
SyncChangeList StoreAndConvertRemoteChanges(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list);
void OnReadDataForProcessor(
DataCallback callback,
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list,
std::unique_ptr<ModelTypeStore::IdList> missing_id_list);
void OnReadAllDataForProcessor(
DataCallback callback,
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list);
void ReportErrorIfSet(const base::Optional<ModelError>& error);
const ModelType type_;
SyncableService* const syncable_service_;
OnceModelTypeStoreFactory store_factory_;
std::unique_ptr<ModelTypeStore> store_;
bool syncable_service_started_;
// In-memory copy of |store_|, needed for remote deletions, because we need to
// provide specifics of the deleted entity to the SyncableService.
std::map<std::string, sync_pb::EntitySpecifics> in_memory_store_;
base::WeakPtrFactory<SyncableServiceBasedBridge> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(SyncableServiceBasedBridge);
};
} // namespace syncer
#endif // COMPONENTS_SYNC_MODEL_IMPL_SYNCABLE_SERVICE_BASED_BRIDGE_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/model_impl/syncable_service_based_bridge.h"
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/test/bind_test_util.h"
#include "base/test/mock_callback.h"
#include "components/sync/base/hash_util.h"
#include "components/sync/model/mock_model_type_change_processor.h"
#include "components/sync/model/model_error.h"
#include "components/sync/model/model_type_store_test_util.h"
#include "components/sync/model/sync_change.h"
#include "components/sync/model/sync_error_factory.h"
#include "components/sync/model/sync_merge_result.h"
#include "components/sync/model/syncable_service.h"
#include "components/sync/model_impl/client_tag_based_model_type_processor.h"
#include "components/sync/protocol/sync.pb.h"
#include "components/sync/test/engine/mock_model_type_worker.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace syncer {
namespace {
using testing::DoAll;
using testing::ElementsAre;
using testing::IsEmpty;
using testing::NotNull;
using testing::Pair;
using testing::Return;
using testing::SaveArg;
using testing::_;
const ModelType kModelType = PREFERENCES;
sync_pb::EntitySpecifics GetTestSpecifics(const std::string& name = "name") {
sync_pb::EntitySpecifics specifics;
// Make specifics non empty, to avoid it being interpreted as a tombstone.
specifics.mutable_preference()->set_name(name);
return specifics;
}
MATCHER_P(SyncDataRemoteMatches, name, "") {
return arg.IsValid() && !arg.IsLocal() && arg.GetDataType() == kModelType &&
arg.GetSpecifics().preference().name() == name;
}
MATCHER_P2(SyncChangeMatches, change_type, name, "") {
return arg.IsValid() && change_type == arg.change_type() &&
arg.sync_data().GetDataType() == kModelType &&
arg.sync_data().GetSpecifics().preference().name() == name;
}
MATCHER_P(HasName, name, "") {
return arg && arg->specifics.preference().name() == name;
}
class MockSyncableService : public SyncableService {
public:
MOCK_METHOD4(
MergeDataAndStartSyncing,
SyncMergeResult(ModelType type,
const SyncDataList& initial_sync_data,
std::unique_ptr<SyncChangeProcessor> sync_processor,
std::unique_ptr<SyncErrorFactory> sync_error_factory));
MOCK_METHOD1(StopSyncing, void(ModelType type));
MOCK_METHOD2(ProcessSyncChanges,
SyncError(const base::Location& from_here,
const SyncChangeList& change_list));
MOCK_CONST_METHOD1(GetAllSyncData, SyncDataList(ModelType type));
};
class SyncableServiceBasedBridgeTest : public ::testing::Test {
protected:
SyncableServiceBasedBridgeTest() {
ON_CALL(syncable_service_, MergeDataAndStartSyncing(_, _, _, _))
.WillByDefault(
[&](ModelType type, const SyncDataList& initial_sync_data,
std::unique_ptr<SyncChangeProcessor> sync_processor,
std::unique_ptr<SyncErrorFactory> sync_error_factory) {
start_syncing_sync_processor_ = std::move(sync_processor);
return SyncMergeResult(kModelType);
});
}
~SyncableServiceBasedBridgeTest() override {}
void InitializeBridge() {
real_processor_ =
std::make_unique<syncer::ClientTagBasedModelTypeProcessor>(
kModelType, /*dump_stack=*/base::DoNothing(),
/*commit_only=*/false);
mock_processor_.DelegateCallsByDefaultTo(real_processor_.get());
bridge_ = std::make_unique<SyncableServiceBasedBridge>(
kModelType, ModelTypeStoreTestUtil::FactoryForInMemoryStoreForTest(),
mock_processor_.CreateForwardingProcessor(), &syncable_service_);
}
void ShutdownBridge() {
bridge_.reset();
// The mock is still delegating to |real_processor_|, so we reset it too.
ASSERT_TRUE(testing::Mock::VerifyAndClear(&mock_processor_));
real_processor_.reset();
}
void StartSyncing() {
syncer::DataTypeActivationRequest request;
request.error_handler = mock_error_handler_.Get();
request.cache_guid = "TestCacheGuid";
request.authenticated_account_id = "SomeAccountId";
base::RunLoop loop;
real_processor_->OnSyncStarting(
request,
base::BindLambdaForTesting(
[&](std::unique_ptr<syncer::DataTypeActivationResponse> response) {
worker_ = std::make_unique<MockModelTypeWorker>(
response->model_type_state, real_processor_.get());
loop.Quit();
}));
loop.Run();
}
std::map<std::string, std::unique_ptr<EntityData>> GetAllData() {
base::RunLoop loop;
std::unique_ptr<DataBatch> batch;
bridge_->GetAllDataForDebugging(base::BindLambdaForTesting(
[&loop, &batch](std::unique_ptr<DataBatch> input_batch) {
batch = std::move(input_batch);
loop.Quit();
}));
loop.Run();
EXPECT_NE(nullptr, batch);
std::map<std::string, std::unique_ptr<EntityData>> storage_key_to_data;
while (batch && batch->HasNext()) {
storage_key_to_data.insert(batch->Next());
}
return storage_key_to_data;
}
const std::string kClientTag = "clienttag";
const std::string kClientTagHash =
GenerateSyncableHash(kModelType, kClientTag);
base::MessageLoop message_loop_;
testing::NiceMock<MockSyncableService> syncable_service_;
testing::NiceMock<MockModelTypeChangeProcessor> mock_processor_;
base::MockCallback<ModelErrorHandler> mock_error_handler_;
std::unique_ptr<syncer::ClientTagBasedModelTypeProcessor> real_processor_;
std::unique_ptr<SyncableServiceBasedBridge> bridge_;
std::unique_ptr<MockModelTypeWorker> worker_;
// SyncChangeProcessor received via MergeDataAndStartSyncing(), or null if it
// hasn't been called.
std::unique_ptr<SyncChangeProcessor> start_syncing_sync_processor_;
private:
DISALLOW_COPY_AND_ASSIGN(SyncableServiceBasedBridgeTest);
};
TEST_F(SyncableServiceBasedBridgeTest,
ShouldStartSyncingWithEmptyInitialRemoteData) {
// Bridge initialization alone, without sync itself starting, should not
// issue calls to the syncable service.
EXPECT_CALL(syncable_service_, MergeDataAndStartSyncing(_, _, _, _)).Times(0);
InitializeBridge();
// Starting sync itself is also not sufficient, until initial remote data is
// received.
StartSyncing();
// Once the initial data is fetched from the server,
// MergeDataAndStartSyncing() should be exercised.
EXPECT_CALL(
syncable_service_,
MergeDataAndStartSyncing(kModelType, IsEmpty(), NotNull(), NotNull()));
worker_->UpdateFromServer();
EXPECT_THAT(GetAllData(), IsEmpty());
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldStartSyncingWithNonEmptyInitialRemoteData) {
InitializeBridge();
StartSyncing();
// Once the initial data is fetched from the server,
// MergeDataAndStartSyncing() should be exercised.
EXPECT_CALL(syncable_service_,
MergeDataAndStartSyncing(
kModelType, ElementsAre(SyncDataRemoteMatches("name1")),
NotNull(), NotNull()));
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
EXPECT_THAT(GetAllData(), ElementsAre(Pair(kClientTagHash, _)));
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldStopSyncableServiceIfPreviouslyStarted) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer();
EXPECT_CALL(syncable_service_, StopSyncing(kModelType));
real_processor_->OnSyncStopping(KEEP_METADATA);
EXPECT_CALL(syncable_service_, StopSyncing(_)).Times(0);
ShutdownBridge();
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldStopSyncableServiceDuringShutdownIfPreviouslyStarted) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer();
EXPECT_CALL(syncable_service_, StopSyncing(kModelType));
ShutdownBridge();
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldNotStopSyncableServiceIfNotPreviouslyStarted) {
EXPECT_CALL(syncable_service_, StopSyncing(_)).Times(0);
InitializeBridge();
StartSyncing();
real_processor_->OnSyncStopping(KEEP_METADATA);
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldNotStopSyncableServiceDuringShutdownIfNotPreviouslyStarted) {
EXPECT_CALL(syncable_service_, StopSyncing(_)).Times(0);
InitializeBridge();
StartSyncing();
ShutdownBridge();
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateErrorDuringStart) {
// Instrument MergeDataAndStartSyncing() to return an error.
SyncMergeResult merge_result(kModelType);
merge_result.set_error(SyncError(FROM_HERE, SyncError::PERSISTENCE_ERROR,
"Test error", kModelType));
ON_CALL(syncable_service_, MergeDataAndStartSyncing(_, _, _, _))
.WillByDefault(Return(merge_result));
EXPECT_CALL(mock_error_handler_, Run(_));
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer();
// Since the syncable service failed to start, it shouldn't be stopped.
EXPECT_CALL(syncable_service_, StopSyncing(_)).Times(0);
ShutdownBridge();
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldStartSyncingWithPreviousDirectoryData) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
real_processor_->OnSyncStopping(KEEP_METADATA);
EXPECT_THAT(GetAllData(), ElementsAre(Pair(kClientTagHash, _)));
EXPECT_CALL(syncable_service_,
MergeDataAndStartSyncing(
kModelType, ElementsAre(SyncDataRemoteMatches("name1")),
NotNull(), NotNull()));
StartSyncing();
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldSupportDisableReenableSequence) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics());
real_processor_->OnSyncStopping(CLEAR_METADATA);
EXPECT_THAT(GetAllData(), IsEmpty());
EXPECT_CALL(syncable_service_, MergeDataAndStartSyncing(_, _, _, _)).Times(0);
StartSyncing();
EXPECT_CALL(
syncable_service_,
MergeDataAndStartSyncing(kModelType, IsEmpty(), NotNull(), NotNull()));
worker_->UpdateFromServer();
}
TEST_F(SyncableServiceBasedBridgeTest,
ShouldPropagateLocalEntitiesDuringMerge) {
ON_CALL(syncable_service_, MergeDataAndStartSyncing(_, _, _, _))
.WillByDefault([&](ModelType type, const SyncDataList& initial_sync_data,
std::unique_ptr<SyncChangeProcessor> sync_processor,
std::unique_ptr<SyncErrorFactory> sync_error_factory) {
SyncChangeList change_list;
change_list.emplace_back(
FROM_HERE, SyncChange::ACTION_ADD,
SyncData::CreateLocalData(kClientTag, "title", GetTestSpecifics()));
const SyncError error =
sync_processor->ProcessSyncChanges(FROM_HERE, change_list);
EXPECT_FALSE(error.IsSet());
return SyncMergeResult(kModelType);
});
InitializeBridge();
StartSyncing();
EXPECT_CALL(mock_processor_, Put(kClientTagHash, NotNull(), NotNull()));
worker_->UpdateFromServer();
EXPECT_THAT(GetAllData(), ElementsAre(Pair(kClientTagHash, _)));
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateLocalCreation) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer();
ASSERT_THAT(start_syncing_sync_processor_, NotNull());
ASSERT_THAT(GetAllData(), IsEmpty());
EXPECT_CALL(mock_processor_, Put(kClientTagHash, NotNull(), NotNull()));
SyncChangeList change_list;
change_list.emplace_back(
FROM_HERE, SyncChange::ACTION_ADD,
SyncData::CreateLocalData(kClientTag, "title", GetTestSpecifics()));
const SyncError error =
start_syncing_sync_processor_->ProcessSyncChanges(FROM_HERE, change_list);
EXPECT_FALSE(error.IsSet());
EXPECT_THAT(GetAllData(), ElementsAre(Pair(kClientTagHash, _)));
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateLocalUpdate) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
ASSERT_THAT(start_syncing_sync_processor_, NotNull());
ASSERT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name1"))));
EXPECT_CALL(mock_processor_, Put(GenerateSyncableHash(kModelType, kClientTag),
NotNull(), NotNull()));
SyncChangeList change_list;
change_list.emplace_back(FROM_HERE, SyncChange::ACTION_UPDATE,
SyncData::CreateLocalData(
kClientTag, "title", GetTestSpecifics("name2")));
const SyncError error =
start_syncing_sync_processor_->ProcessSyncChanges(FROM_HERE, change_list);
EXPECT_FALSE(error.IsSet());
EXPECT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name2"))));
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateLocalDeletion) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
ASSERT_THAT(start_syncing_sync_processor_, NotNull());
ASSERT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name1"))));
EXPECT_CALL(mock_processor_,
Delete(GenerateSyncableHash(kModelType, kClientTag), NotNull()));
SyncChangeList change_list;
change_list.emplace_back(FROM_HERE, SyncChange::ACTION_DELETE,
SyncData::CreateLocalDelete(kClientTag, kModelType));
const SyncError error =
start_syncing_sync_processor_->ProcessSyncChanges(FROM_HERE, change_list);
EXPECT_FALSE(error.IsSet());
EXPECT_THAT(GetAllData(), IsEmpty());
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateRemoteCreation) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer();
ASSERT_THAT(start_syncing_sync_processor_, NotNull());
ASSERT_THAT(GetAllData(), IsEmpty());
EXPECT_CALL(syncable_service_,
ProcessSyncChanges(_, ElementsAre(SyncChangeMatches(
SyncChange::ACTION_ADD, "name1"))));
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
EXPECT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name1"))));
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateRemoteUpdates) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
ASSERT_THAT(start_syncing_sync_processor_, NotNull());
ASSERT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name1"))));
EXPECT_CALL(syncable_service_,
ProcessSyncChanges(_, ElementsAre(SyncChangeMatches(
SyncChange::ACTION_UPDATE, "name2"))));
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name2"));
EXPECT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name2"))));
// A second update for the same entity.
EXPECT_CALL(syncable_service_,
ProcessSyncChanges(_, ElementsAre(SyncChangeMatches(
SyncChange::ACTION_UPDATE, "name3"))));
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name3"));
EXPECT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name3"))));
}
TEST_F(SyncableServiceBasedBridgeTest, ShouldPropagateRemoteDeletion) {
InitializeBridge();
StartSyncing();
worker_->UpdateFromServer(kClientTagHash, GetTestSpecifics("name1"));
ASSERT_THAT(start_syncing_sync_processor_, NotNull());
ASSERT_THAT(GetAllData(),
ElementsAre(Pair(kClientTagHash, HasName("name1"))));
EXPECT_CALL(syncable_service_,
ProcessSyncChanges(_, ElementsAre(SyncChangeMatches(
SyncChange::ACTION_DELETE, "name1"))));
worker_->TombstoneFromServer(kClientTagHash);
EXPECT_THAT(GetAllData(), IsEmpty());
}
} // namespace
} // namespace syncer
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
syntax = "proto2";
option optimize_for = LITE_RUNTIME;
package sync_pb;
import "sync.proto";
// Sync proto to store entity data similar to what directory stores, used to
// persist data locally and never sent through the wire.
//
// Because it's conceptually similar to SyncEntity (actual protocol) and it's
// unclear how big this'll grow, we've kept compatibility with SyncEntity by
// using the same field numbers.
message PersistedEntityData {
// See corresponding fields in SyncEntity for details.
optional string non_unique_name = 8;
optional EntitySpecifics specifics = 21;
}
...@@ -37,6 +37,7 @@ sync_protocol_bases = [ ...@@ -37,6 +37,7 @@ sync_protocol_bases = [
"mountain_share_specifics", "mountain_share_specifics",
"nigori_specifics", "nigori_specifics",
"password_specifics", "password_specifics",
"persisted_entity_data",
"preference_specifics", "preference_specifics",
"printer_specifics", "printer_specifics",
"priority_preference_specifics", "priority_preference_specifics",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment