Commit 67f4a1d4 authored by Mohamed Amir Yosef's avatar Mohamed Amir Yosef Committed by Commit Bot

[Sync USS] Make ModelTypeWorker::entities_ keyed by sever ids

Currently, entities are tracked in a map inside the ModelTypeWorker.
This is a left over from the old arch. before introducing the poll model
between the worker and the processor.

The main motivation for this refactoring is the map is currently keyed
by the client tag hash which bookmarks lack. After this CL, the map
will be keyed on server ids to support all types of entities.
And since the map is now used only when receiving remote updates,
all entities should have sever ids.

This is also in prepartion for migrating bookmarks.

Bug: 516866
Change-Id: I1e8c8c7073a53e0c67b0da2551a20e2d4db93cb1
Reviewed-on: https://chromium-review.googlesource.com/982620
Commit-Queue: Mohamed Amir Yosef <mamir@chromium.org>
Reviewed-by: default avatarPavel Yatsuk <pavely@chromium.org>
Reviewed-by: default avatarMikel Astiz <mastiz@chromium.org>
Cr-Commit-Position: refs/heads/master@{#549805}
parent a7104a08
......@@ -864,7 +864,6 @@ source_set("unit_tests") {
"engine_impl/syncer_unittest.cc",
"engine_impl/syncer_util_unittest.cc",
"engine_impl/uss_migrator_unittest.cc",
"engine_impl/worker_entity_tracker_unittest.cc",
"js/js_event_details_unittest.cc",
"js/sync_js_controller_unittest.cc",
"model/entity_data_unittest.cc",
......
......@@ -130,20 +130,6 @@ SyncerError ModelTypeWorker::ProcessGetUpdatesResponse(
if (!update_entity->server_defined_unique_tag().empty())
continue;
// Normal updates are handled here.
const std::string& client_tag_hash =
update_entity->client_defined_unique_tag();
// TODO(crbug.com/516866): this wouldn't be true for bookmarks.
DCHECK(!client_tag_hash.empty());
WorkerEntityTracker* entity = GetOrCreateEntityTracker(client_tag_hash);
if (!entity->UpdateContainsNewVersion(update_entity->version())) {
status->increment_num_reflected_updates_downloaded_by(1);
++counters->num_reflected_updates_received;
}
if (update_entity->deleted()) {
status->increment_num_tombstone_updates_downloaded_by(1);
++counters->num_tombstone_updates_received;
......@@ -153,17 +139,18 @@ SyncerError ModelTypeWorker::ProcessGetUpdatesResponse(
switch (PopulateUpdateResponseData(cryptographer_.get(), *update_entity,
&response_data)) {
case SUCCESS:
entity->ReceiveUpdate(response_data);
pending_updates_.push_back(response_data);
break;
case DECRYPTION_PENDING:
case DECRYPTION_PENDING: {
auto entity = std::make_unique<WorkerEntityTracker>();
entity->ReceiveEncryptedUpdate(response_data);
entries_pending_decryption_[update_entity->id_string()] =
std::move(entity);
has_encrypted_updates_ = true;
break;
}
case FAILED_TO_DECRYPT:
// Failed to decrypt the entity. Likely it is corrupt. Drop the entity
// and move on.
entities_.erase(client_tag_hash);
// Failed to decrypt the entity. Likely it is corrupt. Move on.
break;
}
}
......@@ -273,9 +260,7 @@ void ModelTypeWorker::ApplyPendingUpdates() {
debug_info_emitter_->EmitUpdateCountersUpdate();
debug_info_emitter_->EmitStatusCountersUpdate();
DCHECK_EQ(pending_updates_.size(), entities_.size());
pending_updates_.clear();
entities_.clear();
}
void ModelTypeWorker::NudgeForCommit() {
......@@ -298,7 +283,10 @@ std::unique_ptr<CommitContribution> ModelTypeWorker::GetContribution(
// cryptographer has pending keys).
if (!CanCommitItems())
return std::unique_ptr<CommitContribution>();
DCHECK(entities_.empty());
// Client shouldn't be committing data to server when it hasn't processed all
// updates it received.
DCHECK(entries_pending_decryption_.empty());
// Request model type for local changes.
scoped_refptr<GetLocalChangesRequest> request =
......@@ -330,16 +318,10 @@ void ModelTypeWorker::OnCommitResponse(CommitResponseDataList* response_list) {
model_type_processor_->OnCommitCompleted(model_type_state_, *response_list);
}
void ModelTypeWorker::CleanupAfterCommit() {
// Clear all tracked entities. The ones that didn't get committed will be
// retried next time by the processor.
entities_.clear();
}
void ModelTypeWorker::AbortMigration() {
DCHECK(!model_type_state_.initial_sync_done());
model_type_state_ = sync_pb::ModelTypeState();
entities_.clear();
entries_pending_decryption_.clear();
pending_updates_.clear();
has_encrypted_updates_ = false;
nudge_handler_->NudgeForInitialDownload(type_);
......@@ -349,7 +331,7 @@ size_t ModelTypeWorker::EstimateMemoryUsage() const {
using base::trace_event::EstimateMemoryUsage;
size_t memory_usage = 0;
memory_usage += EstimateMemoryUsage(model_type_state_);
memory_usage += EstimateMemoryUsage(entities_);
memory_usage += EstimateMemoryUsage(entries_pending_decryption_);
memory_usage += EstimateMemoryUsage(pending_updates_);
return memory_usage;
}
......@@ -388,32 +370,34 @@ bool ModelTypeWorker::UpdateEncryptionKeyName() {
void ModelTypeWorker::DecryptStoredEntities() {
has_encrypted_updates_ = false;
for (const auto& kv : entities_) {
for (const auto& kv : entries_pending_decryption_) {
WorkerEntityTracker* entity = kv.second.get();
if (entity->HasEncryptedUpdate()) {
const UpdateResponseData& encrypted_update = entity->GetEncryptedUpdate();
EntityDataPtr data = encrypted_update.entity;
DCHECK(data->specifics.has_encrypted());
if (cryptographer_->CanDecrypt(data->specifics.encrypted())) {
sync_pb::EntitySpecifics specifics;
if (DecryptSpecifics(*cryptographer_, data->specifics, &specifics)) {
UpdateResponseData decrypted_update;
decrypted_update.response_version = encrypted_update.response_version;
// Copy the encryption_key_name from data->specifics before it gets
// overriden in data->UpdateSpecifics().
decrypted_update.encryption_key_name =
data->specifics.encrypted().key_name();
decrypted_update.entity = data->UpdateSpecifics(specifics);
pending_updates_.push_back(decrypted_update);
entity->ClearEncryptedUpdate();
}
} else {
has_encrypted_updates_ = true;
DCHECK(entity->HasEncryptedUpdate());
const UpdateResponseData& encrypted_update = entity->GetEncryptedUpdate();
EntityDataPtr data = encrypted_update.entity;
DCHECK(data->specifics.has_encrypted());
if (cryptographer_->CanDecrypt(data->specifics.encrypted())) {
sync_pb::EntitySpecifics specifics;
if (DecryptSpecifics(*cryptographer_, data->specifics, &specifics)) {
UpdateResponseData decrypted_update;
decrypted_update.response_version = encrypted_update.response_version;
// Copy the encryption_key_name from data->specifics before it gets
// overriden in data->UpdateSpecifics().
decrypted_update.encryption_key_name =
data->specifics.encrypted().key_name();
decrypted_update.entity = data->UpdateSpecifics(specifics);
pending_updates_.push_back(decrypted_update);
entity->ClearEncryptedUpdate();
}
} else {
has_encrypted_updates_ = true;
}
}
if (!has_encrypted_updates_) {
entries_pending_decryption_.clear();
}
}
// static
......@@ -435,28 +419,6 @@ bool ModelTypeWorker::DecryptSpecifics(const Cryptographer& cryptographer,
return true;
}
WorkerEntityTracker* ModelTypeWorker::GetEntityTracker(
const std::string& tag_hash) {
auto it = entities_.find(tag_hash);
return it != entities_.end() ? it->second.get() : nullptr;
}
WorkerEntityTracker* ModelTypeWorker::CreateEntityTracker(
const std::string& tag_hash) {
DCHECK(entities_.find(tag_hash) == entities_.end());
std::unique_ptr<WorkerEntityTracker> entity =
std::make_unique<WorkerEntityTracker>(tag_hash);
WorkerEntityTracker* entity_ptr = entity.get();
entities_[tag_hash] = std::move(entity);
return entity_ptr;
}
WorkerEntityTracker* ModelTypeWorker::GetOrCreateEntityTracker(
const std::string& tag_hash) {
WorkerEntityTracker* entity = GetEntityTracker(tag_hash);
return entity ? entity : CreateEntityTracker(tag_hash);
}
GetLocalChangesRequest::GetLocalChangesRequest(
CancelationSignal* cancelation_signal)
: cancelation_signal_(cancelation_signal),
......
......@@ -111,9 +111,6 @@ class ModelTypeWorker : public UpdateHandler,
// Callback for when our contribution gets a response.
void OnCommitResponse(CommitResponseDataList* response_list);
// Called at the end of commit regardless of commit success.
void CleanupAfterCommit();
// If migration the directory encounters an error partway through, we need to
// clear the update data that has been added so far.
void AbortMigration();
......@@ -161,9 +158,10 @@ class ModelTypeWorker : public UpdateHandler,
// an update occurred.
bool UpdateEncryptionKeyName();
// Iterates through all elements in |entities_| and tries to decrypt anything
// that has encrypted data. Also updates |has_encrypted_updates_| to reflect
// whether anything in |entities_| was not decryptable by |cryptographer_|.
// Iterates through all elements in |entries_pending_decryption_| and tries to
// decrypt anything that has encrypted data. Also updates
// |has_encrypted_updates_| to reflect whether anything in
// |entries_pending_decryption_| was not decryptable by |cryptographer_|.
// Should only be called during a GetUpdates cycle.
void DecryptStoredEntities();
......@@ -199,22 +197,16 @@ class ModelTypeWorker : public UpdateHandler,
// Interface used to access and send nudges to the sync scheduler. Not owned.
NudgeHandler* nudge_handler_;
// A map of per-entity information, keyed by client_tag_hash.
//
// When commits are pending, their information is stored here. This
// information is dropped from memory when the commit succeeds or gets
// canceled.
//
// This also stores some information related to received server state in
// order to implement reflection blocking and conflict detection. This
// information is kept in memory indefinitely.
EntityMap entities_;
// A map of per-entity information, keyed by server_id.
// Holds updates encrypted with pending keys.
EntityMap entries_pending_decryption_;
// Accumulates all the updates from a single GetUpdates cycle in memory so
// they can all be sent to the processor at once.
UpdateResponseDataList pending_updates_;
// Whether there are outstanding encrypted updates in |entities_|.
// Whether there are outstanding encrypted updates in
// |entries_pending_decryption_|.
bool has_encrypted_updates_ = false;
// Indicates if processor has local changes. Processor only nudges worker once
......
......@@ -141,8 +141,6 @@ void NonBlockingTypeCommitContribution::CleanUp() {
debug_info_emitter_->EmitCommitCountersUpdate();
debug_info_emitter_->EmitStatusCountersUpdate();
worker_->CleanupAfterCommit();
}
size_t NonBlockingTypeCommitContribution::GetNumEntries() const {
......
......@@ -14,38 +14,13 @@
namespace syncer {
WorkerEntityTracker::WorkerEntityTracker(const std::string& client_tag_hash)
: client_tag_hash_(client_tag_hash) {
DCHECK(!client_tag_hash_.empty());
}
WorkerEntityTracker::WorkerEntityTracker() {}
WorkerEntityTracker::~WorkerEntityTracker() {}
void WorkerEntityTracker::ReceiveUpdate(const UpdateResponseData& update) {
if (!UpdateContainsNewVersion(update.response_version))
return;
highest_gu_response_version_ = update.response_version;
DCHECK(!update.entity->id.empty());
id_ = update.entity->id;
// Got an applicable update newer than any pending updates. It must be safe
// to discard the old encrypted update, if there was one.
ClearEncryptedUpdate();
}
bool WorkerEntityTracker::UpdateContainsNewVersion(int64_t update_version) {
return update_version > highest_gu_response_version_;
}
bool WorkerEntityTracker::ReceiveEncryptedUpdate(
void WorkerEntityTracker::ReceiveEncryptedUpdate(
const UpdateResponseData& data) {
if (data.response_version < highest_gu_response_version_)
return false;
highest_gu_response_version_ = data.response_version;
encrypted_update_ = std::make_unique<UpdateResponseData>(data);
return true;
}
bool WorkerEntityTracker::HasEncryptedUpdate() const {
......@@ -63,8 +38,6 @@ void WorkerEntityTracker::ClearEncryptedUpdate() {
size_t WorkerEntityTracker::EstimateMemoryUsage() const {
using base::trace_event::EstimateMemoryUsage;
size_t memory_usage = 0;
memory_usage += EstimateMemoryUsage(client_tag_hash_);
memory_usage += EstimateMemoryUsage(id_);
memory_usage += EstimateMemoryUsage(encrypted_update_);
return memory_usage;
}
......
......@@ -5,15 +5,9 @@
#ifndef COMPONENTS_SYNC_ENGINE_IMPL_WORKER_ENTITY_TRACKER_H_
#define COMPONENTS_SYNC_ENGINE_IMPL_WORKER_ENTITY_TRACKER_H_
#include <stdint.h>
#include <memory>
#include <string>
#include "base/macros.h"
#include "base/time/time.h"
#include "components/sync/engine/non_blocking_sync_common.h"
#include "components/sync/protocol/sync.pb.h"
namespace syncer {
......@@ -31,24 +25,11 @@ namespace syncer {
// update, or both.
class WorkerEntityTracker {
public:
// Initializes the entity tracker's main fields. Does not initialize state
// related to a pending commit.
explicit WorkerEntityTracker(const std::string& client_tag_hash);
WorkerEntityTracker();
~WorkerEntityTracker();
// Handles receipt of an update from the server.
void ReceiveUpdate(const UpdateResponseData& update);
// Check if |update_version| is newer than local.
bool UpdateContainsNewVersion(int64_t update_version);
// Handles the receipt of an encrypted update from the server.
//
// Returns true if the tracker decides this item is worth keeping. Returns
// false if the item is discarded, which could happen if the version number
// is out of date.
bool ReceiveEncryptedUpdate(const UpdateResponseData& data);
void ReceiveEncryptedUpdate(const UpdateResponseData& data);
// Functions to fetch the latest encrypted update.
bool HasEncryptedUpdate() const;
......@@ -60,19 +41,7 @@ class WorkerEntityTracker {
// Returns the estimate of dynamically allocated memory in bytes.
size_t EstimateMemoryUsage() const;
const std::string& id() const { return id_; }
const std::string& client_tag_hash() const { return client_tag_hash_; }
private:
// The hashed client tag for this entry.
const std::string client_tag_hash_;
// The ID for this entry. May be empty if the entry has never been committed.
std::string id_;
// The highest version seen in a GU response for this entry.
int64_t highest_gu_response_version_ = kUncommittedVersion;
// An update for this entity which can't be applied right now. The presence
// of an pending update prevents commits. As of this writing, the only
// source of pending updates is updates that can't currently be decrypted.
......
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/engine_impl/worker_entity_tracker.h"
#include "components/sync/base/hash_util.h"
#include "components/sync/base/model_type.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace syncer {
// Some simple tests for the WorkerEntityTracker.
//
// The WorkerEntityTracker is an implementation detail of the ModelTypeWorker.
// As such, it doesn't make much sense to test it exhaustively, since it
// already gets a lot of test coverage from the ModelTypeWorker unit tests.
//
// These tests are intended as a basic sanity check. Anything more complicated
// would be redundant.
class WorkerEntityTrackerTest : public ::testing::Test {
public:
WorkerEntityTrackerTest()
: kServerId("ServerID"),
kClientTag("some.sample.tag"),
kClientTagHash(GenerateSyncableHash(PREFERENCES, kClientTag)),
entity_(new WorkerEntityTracker(kClientTagHash)) {
specifics.mutable_preference()->set_name(kClientTag);
specifics.mutable_preference()->set_value("pref.value");
}
~WorkerEntityTrackerTest() override {}
UpdateResponseData MakeUpdateResponseData(int64_t response_version) {
EntityData data;
data.id = kServerId;
data.client_tag_hash = kClientTagHash;
UpdateResponseData response_data;
response_data.entity = data.PassToPtr();
response_data.response_version = response_version;
return response_data;
}
const std::string kServerId;
const std::string kClientTag;
const std::string kClientTagHash;
sync_pb::EntitySpecifics specifics;
std::unique_ptr<WorkerEntityTracker> entity_;
};
// Construct a new entity from a server update. Then receive another update.
TEST_F(WorkerEntityTrackerTest, FromUpdateResponse) {
EXPECT_EQ("", entity_->id());
entity_->ReceiveUpdate(MakeUpdateResponseData(20));
EXPECT_EQ(kServerId, entity_->id());
}
} // namespace syncer
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment