sync: Add non-blocking type encryption support

Introduces the framework for dealing with sync encryption in
non-blocking types.  Unlike directory sync types, non-blocking type
encryption only encrypts data before it is sent to the server.
Encrypting the data on-disk is a separate problem.

Adds code to the ModelTypeSyncWorker so it can access the directory's
cryptographer (through a CryptographerProvider interface) and use it to
encrypt entities before it sends them to the server.  If the
cryptographer is unable to encrypt with the desired key, the worker will
not commit until the cryptographer returns to a good state.

Adds the concept of a "desired encryption key" to the data type state.
When the cryptographer key to be used to encrypt a type changes, this
will be reflected in the data type state.  The ModelTypeSyncProxy is
responsible for ensuring that all items which have not yet been
encrypted with this desired key are enqueued for commit.

Makes the ModelTypeSyncWorker, EntityTracker, and ModelTypeSyncProxy
collaborate on the management of undecryptable (inapplicable) updates.
The EntityTracker keeps track of their version numbers and content, and
prevents the committing of new items to the server until the
inapplicable update has been dealt with.  The ModelTypeSyncProxy is
responsible for saving inapplicable updates across restarts.

This CL alone is not enough to enable encryption support for
non-blocking types.  It requires additional code to hook up the
ModelTypeSyncWorkers to receive cryptographer events.  This will be
added in a future commit.  In the meantime, this CL includes plenty
of unit tests to verify the functionality that's being added.

BUG=351005

Review URL: https://codereview.chromium.org/423193002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@287428 0039d316-1c4b-4281-b951-d872f2087c98
parent 87a0a999
......@@ -85,6 +85,7 @@ class MockSyncContextProxy : public syncer::SyncContextProxy {
virtual void ConnectTypeToSync(
syncer::ModelType type,
const syncer::DataTypeState& data_type_state,
const syncer::UpdateResponseDataList& saved_pending_updates,
const base::WeakPtr<syncer::ModelTypeSyncProxyImpl>& type_proxy)
OVERRIDE {
// Normally we'd use MessageLoopProxy::current() as the TaskRunner argument
......
......@@ -193,8 +193,14 @@ void EntityTracker::ReceiveCommitResponse(const std::string& response_id,
}
void EntityTracker::ReceiveUpdate(int64 version) {
highest_gu_response_version_ =
std::max(highest_gu_response_version_, version);
if (version <= highest_gu_response_version_)
return;
highest_gu_response_version_ = version;
// Got an applicable update newer than any pending updates. It must be safe
// to discard the old pending update, if there was one.
ClearPendingUpdate();
if (IsInConflict()) {
// Incoming update clobbers the pending commit on the sync thread.
......@@ -203,10 +209,35 @@ void EntityTracker::ReceiveUpdate(int64 version) {
}
}
bool EntityTracker::ReceivePendingUpdate(const UpdateResponseData& data) {
if (data.response_version < highest_gu_response_version_)
return false;
highest_gu_response_version_ = data.response_version;
pending_update_.reset(new UpdateResponseData(data));
ClearPendingCommit();
return true;
}
bool EntityTracker::HasPendingUpdate() const {
return !!pending_update_;
}
UpdateResponseData EntityTracker::GetPendingUpdate() const {
return *pending_update_;
}
void EntityTracker::ClearPendingUpdate() {
pending_update_.reset();
}
bool EntityTracker::IsInConflict() const {
if (!is_commit_pending_)
return false;
if (HasPendingUpdate())
return true;
if (highest_gu_response_version_ <= highest_commit_response_version_) {
// The most recent server state was created in a commit made by this
// client. We're fully up to date, and therefore not in conflict.
......
......@@ -8,8 +8,10 @@
#include <string>
#include "base/basictypes.h"
#include "base/memory/scoped_ptr.h"
#include "base/time/time.h"
#include "sync/base/sync_export.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/protocol/sync.pb.h"
namespace syncer {
......@@ -81,6 +83,20 @@ class SYNC_EXPORT EntityTracker {
// Handles receipt of an update from the server.
void ReceiveUpdate(int64 version);
// Handles the receipt of an pending update from the server.
//
// Returns true if the tracker decides this item is worth keeping. Returns
// false if the item is discarded, which could happen if the version number
// is out of date.
bool ReceivePendingUpdate(const UpdateResponseData& data);
// Functions to fetch the latest pending update.
bool HasPendingUpdate() const;
UpdateResponseData GetPendingUpdate() const;
// Clears the pending update. Allows us to resume regular commit behavior.
void ClearPendingUpdate();
private:
// Initializes received update state. Does not initialize state related to
// pending commits and sets |is_commit_pending_| to false.
......@@ -146,6 +162,11 @@ class SYNC_EXPORT EntityTracker {
bool deleted_;
sync_pb::EntitySpecifics specifics_;
// An update for this item which can't be applied right now. The presence of
// an pending update prevents commits. As of this writing, the only source
// of pending updates is updates we can't decrypt right now.
scoped_ptr<UpdateResponseData> pending_update_;
DISALLOW_COPY_AND_ASSIGN(EntityTracker);
};
......
......@@ -24,7 +24,8 @@ scoped_ptr<ModelTypeEntity> ModelTypeEntity::NewLocalItem(
specifics,
false,
now,
now));
now,
std::string()));
}
scoped_ptr<ModelTypeEntity> ModelTypeEntity::FromServerUpdate(
......@@ -35,7 +36,8 @@ scoped_ptr<ModelTypeEntity> ModelTypeEntity::FromServerUpdate(
const sync_pb::EntitySpecifics& specifics,
bool deleted,
base::Time ctime,
base::Time mtime) {
base::Time mtime,
const std::string& encryption_key_name) {
return scoped_ptr<ModelTypeEntity>(new ModelTypeEntity(0,
0,
0,
......@@ -47,7 +49,8 @@ scoped_ptr<ModelTypeEntity> ModelTypeEntity::FromServerUpdate(
specifics,
deleted,
ctime,
mtime));
mtime,
encryption_key_name));
}
ModelTypeEntity::ModelTypeEntity(int64 sequence_number,
......@@ -61,7 +64,8 @@ ModelTypeEntity::ModelTypeEntity(int64 sequence_number,
const sync_pb::EntitySpecifics& specifics,
bool deleted,
base::Time ctime,
base::Time mtime)
base::Time mtime,
const std::string& encryption_key_name)
: sequence_number_(sequence_number),
commit_requested_sequence_number_(commit_requested_sequence_number),
acked_sequence_number_(acked_sequence_number),
......@@ -73,7 +77,8 @@ ModelTypeEntity::ModelTypeEntity(int64 sequence_number,
specifics_(specifics),
deleted_(deleted),
ctime_(ctime),
mtime_(mtime) {
mtime_(mtime),
encryption_key_name_(encryption_key_name) {
}
ModelTypeEntity::~ModelTypeEntity() {
......@@ -103,7 +108,8 @@ void ModelTypeEntity::ApplyUpdateFromServer(
int64 update_version,
bool deleted,
const sync_pb::EntitySpecifics& specifics,
base::Time mtime) {
base::Time mtime,
const std::string& encryption_key_name) {
// There was a conflict and the server just won it.
// This implicitly acks all outstanding commits because a received update
// will clobber any pending commits on the sync thread.
......@@ -121,6 +127,15 @@ void ModelTypeEntity::MakeLocalChange(
specifics_ = specifics;
}
void ModelTypeEntity::UpdateDesiredEncryptionKey(const std::string& name) {
if (encryption_key_name_ == name)
return;
// Schedule commit with the expectation that the worker will re-encrypt with
// the latest encryption key as it does.
sequence_number_++;
}
void ModelTypeEntity::Delete() {
sequence_number_++;
specifics_.Clear();
......@@ -144,12 +159,15 @@ void ModelTypeEntity::SetCommitRequestInProgress() {
commit_requested_sequence_number_ = sequence_number_;
}
void ModelTypeEntity::ReceiveCommitResponse(const std::string& id,
int64 sequence_number,
int64 response_version) {
void ModelTypeEntity::ReceiveCommitResponse(
const std::string& id,
int64 sequence_number,
int64 response_version,
const std::string& encryption_key_name) {
id_ = id; // The server can assign us a new ID in a commit response.
acked_sequence_number_ = sequence_number;
base_version_ = response_version;
encryption_key_name_ = encryption_key_name;
}
void ModelTypeEntity::ClearTransientSyncState() {
......
......@@ -46,7 +46,8 @@ class SYNC_EXPORT_PRIVATE ModelTypeEntity {
const sync_pb::EntitySpecifics& specifics,
bool deleted,
base::Time ctime,
base::Time mtime);
base::Time mtime,
const std::string& encryption_key_name);
// TODO(rlarocque): Implement FromDisk constructor when we implement storage.
......@@ -79,11 +80,17 @@ class SYNC_EXPORT_PRIVATE ModelTypeEntity {
void ApplyUpdateFromServer(int64 update_version,
bool deleted,
const sync_pb::EntitySpecifics& specifics,
base::Time mtime);
base::Time mtime,
const std::string& encryption_key_name);
// Applies a local change to this item.
void MakeLocalChange(const sync_pb::EntitySpecifics& specifics);
// Schedule a commit if the |name| does not match this item's last known
// encryption key. The worker that performs the commit is expected to
// encrypt the item using the latest available key.
void UpdateDesiredEncryptionKey(const std::string& name);
// Applies a local deletion to this item.
void Delete();
......@@ -104,7 +111,8 @@ class SYNC_EXPORT_PRIVATE ModelTypeEntity {
// reached the server.
void ReceiveCommitResponse(const std::string& id,
int64 sequence_number,
int64 response_version);
int64 response_version,
const std::string& encryption_key_name);
// Clears any in-memory sync state associated with outstanding commits.
void ClearTransientSyncState();
......@@ -124,7 +132,8 @@ class SYNC_EXPORT_PRIVATE ModelTypeEntity {
const sync_pb::EntitySpecifics& specifics,
bool deleted,
base::Time ctime,
base::Time mtime);
base::Time mtime,
const std::string& encryption_key_name);
// A sequence number used to track in-progress commits. Each local change
// increments this number.
......@@ -185,6 +194,10 @@ class SYNC_EXPORT_PRIVATE ModelTypeEntity {
// doesn't bother to inspect their values.
base::Time ctime_;
base::Time mtime_;
// The name of the encryption key used to encrypt this item on the server.
// Empty when no encryption is in use.
std::string encryption_key_name_;
};
} // namespace syncer
......
......@@ -64,7 +64,8 @@ TEST_F(ModelTypeEntityTest, FromServerUpdate) {
specifics,
false,
kCtime,
kMtime));
kMtime,
std::string()));
EXPECT_TRUE(entity->IsWriteRequired());
EXPECT_FALSE(entity->IsUnsynced());
......@@ -87,7 +88,8 @@ TEST_F(ModelTypeEntityTest, TombstoneUpdate) {
sync_pb::EntitySpecifics(),
true,
kCtime,
kMtime));
kMtime,
std::string()));
EXPECT_TRUE(entity->IsWriteRequired());
EXPECT_FALSE(entity->IsUnsynced());
......@@ -107,13 +109,15 @@ TEST_F(ModelTypeEntityTest, ApplyUpdate) {
specifics,
false,
kCtime,
kMtime));
kMtime,
std::string()));
// A deletion update one version later.
entity->ApplyUpdateFromServer(11,
true,
sync_pb::EntitySpecifics(),
kMtime + base::TimeDelta::FromSeconds(10));
kMtime + base::TimeDelta::FromSeconds(10),
std::string());
EXPECT_TRUE(entity->IsWriteRequired());
EXPECT_FALSE(entity->IsUnsynced());
......@@ -130,7 +134,8 @@ TEST_F(ModelTypeEntityTest, LocalChange) {
specifics,
false,
kCtime,
kMtime));
kMtime,
std::string()));
sync_pb::EntitySpecifics specifics2;
specifics2.CopyFrom(specifics);
......@@ -156,7 +161,8 @@ TEST_F(ModelTypeEntityTest, LocalDeletion) {
specifics,
false,
kCtime,
kMtime));
kMtime,
std::string()));
entity->Delete();
......
......@@ -21,7 +21,8 @@ class SYNC_EXPORT_PRIVATE ModelTypeSyncProxy {
const CommitResponseDataList& response_list) = 0;
virtual void OnUpdateReceived(
const DataTypeState& type_state,
const UpdateResponseDataList& response_list) = 0;
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) = 0;
};
} // namespace syncer
......
......@@ -18,6 +18,7 @@ ModelTypeSyncProxyImpl::ModelTypeSyncProxyImpl(ModelType type)
is_preferred_(false),
is_connected_(false),
entities_deleter_(&entities_),
pending_updates_map_deleter_(&pending_updates_map_),
weak_ptr_factory_for_ui_(this),
weak_ptr_factory_for_sync_(this) {
}
......@@ -51,10 +52,12 @@ void ModelTypeSyncProxyImpl::Enable(
data_type_state_.progress_marker.set_data_type_id(
GetSpecificsFieldNumberFromModelType(type_));
UpdateResponseDataList saved_pending_updates = GetPendingUpdates();
sync_context_proxy_ = sync_context_proxy.Pass();
sync_context_proxy_->ConnectTypeToSync(
GetModelType(),
data_type_state_,
saved_pending_updates,
weak_ptr_factory_for_sync_.GetWeakPtr());
}
......@@ -180,16 +183,18 @@ void ModelTypeSyncProxyImpl::OnCommitCompleted(
} else {
it->second->ReceiveCommitResponse(response_data.id,
response_data.sequence_number,
response_data.response_version);
response_data.response_version,
data_type_state_.encryption_key_name);
}
}
}
void ModelTypeSyncProxyImpl::OnUpdateReceived(
const DataTypeState& data_type_state,
const UpdateResponseDataList& response_list) {
bool initial_sync_just_finished =
!data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) {
bool got_new_encryption_requirements = data_type_state_.encryption_key_name !=
data_type_state.encryption_key_name;
data_type_state_ = data_type_state;
......@@ -199,6 +204,14 @@ void ModelTypeSyncProxyImpl::OnUpdateReceived(
const UpdateResponseData& response_data = *list_it;
const std::string& client_tag_hash = response_data.client_tag_hash;
UpdateMap::iterator old_it = pending_updates_map_.find(client_tag_hash);
if (old_it != pending_updates_map_.end()) {
// If we're being asked to apply an update to this entity, this overrides
// the previous pending updates.
delete old_it->second;
pending_updates_map_.erase(old_it);
}
EntityMap::iterator it = entities_.find(client_tag_hash);
if (it == entities_.end()) {
scoped_ptr<ModelTypeEntity> entity =
......@@ -209,22 +222,74 @@ void ModelTypeSyncProxyImpl::OnUpdateReceived(
response_data.specifics,
response_data.deleted,
response_data.ctime,
response_data.mtime);
response_data.mtime,
response_data.encryption_key_name);
entities_.insert(std::make_pair(client_tag_hash, entity.release()));
} else {
ModelTypeEntity* entity = it->second;
entity->ApplyUpdateFromServer(response_data.response_version,
response_data.deleted,
response_data.specifics,
response_data.mtime);
response_data.mtime,
response_data.encryption_key_name);
// TODO: Do something special when conflicts are detected.
}
// If the received entity has out of date encryption, we schedule another
// commit to fix it.
if (data_type_state_.encryption_key_name !=
response_data.encryption_key_name) {
EntityMap::iterator it2 = entities_.find(client_tag_hash);
it2->second->UpdateDesiredEncryptionKey(
data_type_state_.encryption_key_name);
}
}
if (initial_sync_just_finished)
FlushPendingCommitRequests();
// Save pending updates in the appropriate data structure.
for (UpdateResponseDataList::const_iterator list_it = pending_updates.begin();
list_it != pending_updates.end();
++list_it) {
const UpdateResponseData& update = *list_it;
const std::string& client_tag_hash = update.client_tag_hash;
UpdateMap::iterator lookup_it = pending_updates_map_.find(client_tag_hash);
if (lookup_it == pending_updates_map_.end()) {
pending_updates_map_.insert(
std::make_pair(client_tag_hash, new UpdateResponseData(update)));
} else if (lookup_it->second->response_version <= update.response_version) {
delete lookup_it->second;
pending_updates_map_.erase(lookup_it);
pending_updates_map_.insert(
std::make_pair(client_tag_hash, new UpdateResponseData(update)));
} else {
// Received update is stale, do not overwrite existing.
}
}
if (got_new_encryption_requirements) {
for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
++it) {
it->second->UpdateDesiredEncryptionKey(
data_type_state_.encryption_key_name);
}
}
// We may have new reasons to commit by the time this function is done.
FlushPendingCommitRequests();
// TODO: Inform the model of the new or updated data.
// TODO: Persist the new data on disk.
}
UpdateResponseDataList ModelTypeSyncProxyImpl::GetPendingUpdates() {
UpdateResponseDataList pending_updates_list;
for (UpdateMap::const_iterator it = pending_updates_map_.begin();
it != pending_updates_map_.end();
++it) {
pending_updates_list.push_back(*it->second);
}
return pending_updates_list;
}
void ModelTypeSyncProxyImpl::ClearTransientSyncState() {
......@@ -239,7 +304,7 @@ void ModelTypeSyncProxyImpl::ClearSyncState() {
++it) {
it->second->ClearSyncState();
}
STLDeleteValues(&pending_updates_map_);
data_type_state_ = DataTypeState();
}
......
......@@ -73,7 +73,16 @@ class SYNC_EXPORT_PRIVATE ModelTypeSyncProxyImpl : base::NonThreadSafe {
// Informs this object that there are some incoming updates is should
// handle.
void OnUpdateReceived(const DataTypeState& type_state,
const UpdateResponseDataList& response_list);
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates);
// Returns the list of pending updates.
//
// This is used as a helper function, but it's public mainly for testing.
// The current test harness setup doesn't allow us to test the data that the
// proxy sends to the worker during initialization, so we use this to inspect
// its state instead.
UpdateResponseDataList GetPendingUpdates();
// Returns the long-lived WeakPtr that is intended to be registered with the
// ProfileSyncService.
......@@ -81,6 +90,7 @@ class SYNC_EXPORT_PRIVATE ModelTypeSyncProxyImpl : base::NonThreadSafe {
private:
typedef std::map<std::string, ModelTypeEntity*> EntityMap;
typedef std::map<std::string, UpdateResponseData*> UpdateMap;
// Sends all commit requests that are due to be sent to the sync thread.
void FlushPendingCommitRequests();
......@@ -123,6 +133,12 @@ class SYNC_EXPORT_PRIVATE ModelTypeSyncProxyImpl : base::NonThreadSafe {
EntityMap entities_;
STLValueDeleter<EntityMap> entities_deleter_;
// A set of updates that can not be applied at this time. These are never
// used by the model. They are kept here only so we can save and restore
// them across restarts, and keep them in sync with our progress markers.
UpdateMap pending_updates_map_;
STLValueDeleter<UpdateMap> pending_updates_map_deleter_;
// We use two different WeakPtrFactories because we want the pointers they
// issue to have different lifetimes. When asked to disconnect from the sync
// thread, we want to make sure that no tasks generated as part of the
......
This diff is collapsed.
......@@ -10,11 +10,13 @@
#include "base/threading/non_thread_safe.h"
#include "sync/base/sync_export.h"
#include "sync/engine/commit_contributor.h"
#include "sync/engine/cryptographer_provider.h"
#include "sync/engine/model_type_sync_worker.h"
#include "sync/engine/nudge_handler.h"
#include "sync/engine/update_handler.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/internal_api/public/sync_encryption_handler.h"
#include "sync/protocol/sync.pb.h"
namespace base {
......@@ -53,12 +55,19 @@ class SYNC_EXPORT ModelTypeSyncWorkerImpl : public UpdateHandler,
public:
ModelTypeSyncWorkerImpl(ModelType type,
const DataTypeState& initial_state,
const UpdateResponseDataList& saved_pending_updates,
CryptographerProvider* cryptographer_provider,
NudgeHandler* nudge_handler,
scoped_ptr<ModelTypeSyncProxy> type_sync_proxy);
virtual ~ModelTypeSyncWorkerImpl();
ModelType GetModelType() const;
bool IsEncryptionRequired() const;
void SetEncryptionKeyName(const std::string& name);
void OnCryptographerStateChanged();
// UpdateHandler implementation.
virtual void GetDownloadProgress(
sync_pb::DataTypeProgressMarker* progress_marker) const OVERRIDE;
......@@ -87,20 +96,45 @@ class SYNC_EXPORT ModelTypeSyncWorkerImpl : public UpdateHandler,
private:
typedef std::map<std::string, EntityTracker*> EntityMap;
typedef std::map<std::string, UpdateResponseData*> UpdateMap;
// Stores a single commit request in this object's internal state.
void StorePendingCommit(const CommitRequestData& request);
// Returns true if all data type state required for commits is available. In
// practice, this means that it returns true from the time this object first
// receives notice of a successful update fetch from the server.
bool CanCommitItems() const;
// Returns true if this type has successfully fetched all available updates
// from the server at least once. Our state may or may not be stale, but at
// least we know that it was valid at some point in the past.
bool IsTypeInitialized() const;
// Returns true if this type is prepared to commit items. Currently, this
// depends on having downloaded the initial data and having the encryption
// settings in a good state.
bool CanCommitItems(Cryptographer* cryptographer) const;
// Initializes the parts of a commit entity that are the responsibility of
// this class, and not the EntityTracker. Some fields, like the
// client-assigned ID, can only be set by an entity with knowledge of the
// entire data type's state.
void HelpInitializeCommitEntity(sync_pb::SyncEntity* commit_entity);
void HelpInitializeCommitEntity(Cryptographer* cryptographer,
sync_pb::SyncEntity* commit_entity);
// Attempts to decrypt pending updates stored in the EntityMap. If
// successful, will remove the update from the its EntityTracker and forward
// it to the proxy thread for application.
void TryDecryptPendingUpdates();
// Attempts to decrypt the given specifics and return them in the |out|
// parameter. Assumes cryptographer->CanDecrypt(specifics) returned true.
//
// Returns false if the decryption failed. There are no guarantees about the
// contents of |out| when that happens.
//
// In theory, this should never fail. Only corrupt or invalid entries could
// cause this to fail, and no clients are known to create such entries. The
// failure case is an attempt to be defensive against bad input.
static bool DecryptSpecifics(Cryptographer* cryptographer,
const sync_pb::EntitySpecifics& in,
sync_pb::EntitySpecifics* out);
ModelType type_;
......@@ -111,6 +145,10 @@ class SYNC_EXPORT ModelTypeSyncWorkerImpl : public UpdateHandler,
// This is NULL when no proxy is connected..
scoped_ptr<ModelTypeSyncProxy> type_sync_proxy_;
// A helper to provide access to the syncable::Directory's cryptographer.
// Not owned.
CryptographerProvider* cryptographer_provider_;
// Interface used to access and send nudges to the sync scheduler. Not owned.
NudgeHandler* nudge_handler_;
......
......@@ -38,6 +38,11 @@ struct SYNC_EXPORT_PRIVATE DataTypeState {
// until the first download cycle has completed.
std::string type_root_id;
// This value is set if this type's data should be encrypted on the server.
// If this key changes, the client will need to re-commit all of its local
// data to the server using the new encryption key.
std::string encryption_key_name;
// A strictly increasing counter used to generate unique values for the
// client-assigned IDs. The incrementing and ID assignment happens on the
// sync thread, but we store the value here so we can pass it back to the
......@@ -51,7 +56,6 @@ struct SYNC_EXPORT_PRIVATE DataTypeState {
// flag is set.
bool initial_sync_done;
};
struct SYNC_EXPORT_PRIVATE CommitRequestData {
CommitRequestData();
~CommitRequestData();
......@@ -94,6 +98,7 @@ struct SYNC_EXPORT_PRIVATE UpdateResponseData {
std::string non_unique_name;
bool deleted;
sync_pb::EntitySpecifics specifics;
std::string encryption_key_name;
};
typedef std::vector<CommitRequestData> CommitRequestDataList;
......
......@@ -10,11 +10,11 @@
#include "base/sequenced_task_runner.h"
#include "sync/base/sync_export.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
namespace syncer {
class ModelTypeSyncProxyImpl;
struct DataTypeState;
// An interface of the core parts of sync.
//
......@@ -35,6 +35,7 @@ class SYNC_EXPORT_PRIVATE SyncContext {
virtual void ConnectSyncTypeToWorker(
syncer::ModelType type,
const DataTypeState& data_type_state,
const syncer::UpdateResponseDataList& saved_pending_updates,
const scoped_refptr<base::SequencedTaskRunner>& datatype_task_runner,
const base::WeakPtr<ModelTypeSyncProxyImpl>& type_sync_proxy) = 0;
......
......@@ -7,6 +7,7 @@
#include "base/memory/weak_ptr.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
namespace syncer {
......@@ -27,6 +28,7 @@ class SYNC_EXPORT_PRIVATE SyncContextProxy {
virtual void ConnectTypeToSync(
syncer::ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& saved_pending_updates,
const base::WeakPtr<ModelTypeSyncProxyImpl>& type_sync_proxy) = 0;
// Tells the syncer that we're no longer interested in syncing this type.
......
......@@ -6,6 +6,7 @@
#define SYNC_INTERNAL_API_PUBLIC_TEST_NULL_SYNC_CONTEXT_PROXY_H_
#include "base/memory/weak_ptr.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/internal_api/public/sync_context_proxy.h"
namespace syncer {
......@@ -23,6 +24,7 @@ class NullSyncContextProxy : public SyncContextProxy {
virtual void ConnectTypeToSync(
syncer::ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& saved_pending_updates,
const base::WeakPtr<ModelTypeSyncProxyImpl>& type_sync_proxy) OVERRIDE;
virtual void Disconnect(syncer::ModelType type) OVERRIDE;
virtual scoped_ptr<SyncContextProxy> Clone() const OVERRIDE;
......
......@@ -25,6 +25,7 @@ SyncContextProxyImpl::~SyncContextProxyImpl() {
void SyncContextProxyImpl::ConnectTypeToSync(
ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& saved_pending_updates,
const base::WeakPtr<ModelTypeSyncProxyImpl>& type_sync_proxy) {
VLOG(1) << "ConnectTypeToSync: " << ModelTypeToString(type);
sync_task_runner_->PostTask(FROM_HERE,
......@@ -32,6 +33,7 @@ void SyncContextProxyImpl::ConnectTypeToSync(
sync_context_,
type,
data_type_state,
saved_pending_updates,
base::ThreadTaskRunnerHandle::Get(),
type_sync_proxy));
}
......
......@@ -40,6 +40,7 @@ class SYNC_EXPORT_PRIVATE SyncContextProxyImpl : public SyncContextProxy {
virtual void ConnectTypeToSync(
syncer::ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& pending_updates,
const base::WeakPtr<ModelTypeSyncProxyImpl>& sync_proxy_impl) OVERRIDE;
// Disables syncing for the given type on the sync thread.
......
......@@ -1519,7 +1519,7 @@ bool SyncEncryptionHandlerImpl::GetKeystoreDecryptor(
DCHECK(!keystore_key.empty());
DCHECK(cryptographer.is_ready());
std::string serialized_nigori;
serialized_nigori = cryptographer.GetDefaultNigoriKey();
serialized_nigori = cryptographer.GetDefaultNigoriKeyData();
if (serialized_nigori.empty()) {
LOG(ERROR) << "Failed to get cryptographer bootstrap token.";
return false;
......
......@@ -15,6 +15,7 @@ NullSyncContextProxy::~NullSyncContextProxy() {
void NullSyncContextProxy::ConnectTypeToSync(
syncer::ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& saved_pending_updates,
const base::WeakPtr<ModelTypeSyncProxyImpl>& type_sync_proxy) {
NOTREACHED() << "NullSyncContextProxy is not meant to be used";
}
......
......@@ -15,6 +15,7 @@
#include "sync/engine/model_type_sync_worker_impl.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/sessions/directory_type_debug_info_emitter.h"
#include "sync/util/cryptographer.h"
namespace syncer {
......@@ -32,7 +33,8 @@ class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
const CommitResponseDataList& response_list) OVERRIDE;
virtual void OnUpdateReceived(
const DataTypeState& type_state,
const UpdateResponseDataList& response_list) OVERRIDE;
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) OVERRIDE;
private:
base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
......@@ -61,13 +63,15 @@ void ModelTypeSyncProxyWrapper::OnCommitCompleted(
void ModelTypeSyncProxyWrapper::OnUpdateReceived(
const DataTypeState& type_state,
const UpdateResponseDataList& response_list) {
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) {
processor_task_runner_->PostTask(
FROM_HERE,
base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
processor_,
type_state,
response_list));
response_list,
pending_updates));
}
class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
......@@ -107,6 +111,7 @@ ModelTypeRegistry::ModelTypeRegistry(
syncable::Directory* directory,
NudgeHandler* nudge_handler)
: directory_(directory),
cryptographer_provider_(directory_),
nudge_handler_(nudge_handler),
weak_ptr_factory_(this) {
for (size_t i = 0u; i < workers.size(); ++i) {
......@@ -185,6 +190,7 @@ void ModelTypeRegistry::SetEnabledDirectoryTypes(
void ModelTypeRegistry::ConnectSyncTypeToWorker(
ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& saved_pending_updates,
const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
......@@ -192,8 +198,13 @@ void ModelTypeRegistry::ConnectSyncTypeToWorker(
// Initialize Worker -> Proxy communication channel.
scoped_ptr<ModelTypeSyncProxy> proxy(
new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
scoped_ptr<ModelTypeSyncWorkerImpl> worker(new ModelTypeSyncWorkerImpl(
type, data_type_state, nudge_handler_, proxy.Pass()));
scoped_ptr<ModelTypeSyncWorkerImpl> worker(
new ModelTypeSyncWorkerImpl(type,
data_type_state,
saved_pending_updates,
&cryptographer_provider_,
nudge_handler_,
proxy.Pass()));
// Initialize Proxy -> Worker communication channel.
scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
......
......@@ -12,9 +12,11 @@
#include "base/memory/scoped_vector.h"
#include "base/memory/weak_ptr.h"
#include "sync/base/sync_export.h"
#include "sync/engine/directory_cryptographer_provider.h"
#include "sync/engine/nudge_handler.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/internal_api/public/engine/model_safe_worker.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/internal_api/public/sessions/type_debug_info_observer.h"
#include "sync/internal_api/public/sync_context.h"
......@@ -31,7 +33,6 @@ class DirectoryTypeDebugInfoEmitter;
class ModelTypeSyncWorkerImpl;
class ModelTypeSyncProxyImpl;
class UpdateHandler;
struct DataTypeState;
typedef std::map<ModelType, UpdateHandler*> UpdateHandlerMap;
typedef std::map<ModelType, CommitContributor*> CommitContributorMap;
......@@ -57,6 +58,7 @@ class SYNC_EXPORT_PRIVATE ModelTypeRegistry : public SyncContext {
virtual void ConnectSyncTypeToWorker(
syncer::ModelType type,
const DataTypeState& data_type_state,
const syncer::UpdateResponseDataList& saved_pending_updates,
const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy) OVERRIDE;
......@@ -112,6 +114,9 @@ class SYNC_EXPORT_PRIVATE ModelTypeRegistry : public SyncContext {
// The directory. Not owned.
syncable::Directory* directory_;
// Provides access to the Directory's cryptographer.
DirectoryCryptographerProvider cryptographer_provider_;
// The NudgeHandler. Not owned.
NudgeHandler* nudge_handler_;
......
......@@ -154,6 +154,7 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypes) {
registry()->ConnectSyncTypeToWorker(syncer::THEMES,
MakeInitialDataTypeState(THEMES),
UpdateResponseDataList(),
task_runner,
themes_sync_proxy.AsWeakPtrForUI());
EXPECT_TRUE(registry()->GetEnabledTypes().Equals(
......@@ -161,6 +162,7 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypes) {
registry()->ConnectSyncTypeToWorker(syncer::SESSIONS,
MakeInitialDataTypeState(SESSIONS),
UpdateResponseDataList(),
task_runner,
sessions_sync_proxy.AsWeakPtrForUI());
EXPECT_TRUE(registry()->GetEnabledTypes().Equals(
......@@ -192,6 +194,7 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) {
// Add the themes non-blocking type.
registry()->ConnectSyncTypeToWorker(syncer::THEMES,
MakeInitialDataTypeState(THEMES),
UpdateResponseDataList(),
task_runner,
themes_sync_proxy.AsWeakPtrForUI());
current_types.Put(syncer::THEMES);
......@@ -205,6 +208,7 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) {
// Add sessions non-blocking type.
registry()->ConnectSyncTypeToWorker(syncer::SESSIONS,
MakeInitialDataTypeState(SESSIONS),
UpdateResponseDataList(),
task_runner,
sessions_sync_proxy.AsWeakPtrForUI());
current_types.Put(syncer::SESSIONS);
......@@ -235,10 +239,12 @@ TEST_F(ModelTypeRegistryTest, DeletionOrdering) {
registry()->ConnectSyncTypeToWorker(syncer::THEMES,
MakeInitialDataTypeState(THEMES),
UpdateResponseDataList(),
task_runner,
themes_sync_proxy->AsWeakPtrForUI());
registry()->ConnectSyncTypeToWorker(syncer::SESSIONS,
MakeInitialDataTypeState(SESSIONS),
UpdateResponseDataList(),
task_runner,
sessions_sync_proxy->AsWeakPtrForUI());
EXPECT_TRUE(registry()->GetEnabledTypes().Equals(
......
......@@ -20,6 +20,7 @@ InjectableSyncContextProxy::~InjectableSyncContextProxy() {
void InjectableSyncContextProxy::ConnectTypeToSync(
syncer::ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& response_list,
const base::WeakPtr<syncer::ModelTypeSyncProxyImpl>& type_sync_proxy) {
// This class is allowed to participate in only one connection.
DCHECK(!is_worker_connected_);
......
......@@ -6,6 +6,7 @@
#define SYNC_TEST_ENGINE_INJECTABLE_SYNC_CONTEXT_PROXY_H_
#include "sync/internal_api/public/base/model_type.h"
#include "sync/internal_api/public/non_blocking_sync_common.h"
#include "sync/internal_api/public/sync_context_proxy.h"
namespace syncer {
......@@ -24,6 +25,7 @@ class InjectableSyncContextProxy : public syncer::SyncContextProxy {
virtual void ConnectTypeToSync(
syncer::ModelType type,
const DataTypeState& data_type_state,
const UpdateResponseDataList& pending_updates,
const base::WeakPtr<syncer::ModelTypeSyncProxyImpl>& type_sync_proxy)
OVERRIDE;
virtual void Disconnect(syncer::ModelType type) OVERRIDE;
......
......@@ -29,11 +29,13 @@ void MockModelTypeSyncProxy::OnCommitCompleted(
void MockModelTypeSyncProxy::OnUpdateReceived(
const DataTypeState& type_state,
const UpdateResponseDataList& response_list) {
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) {
base::Closure task = base::Bind(&MockModelTypeSyncProxy::OnUpdateReceivedImpl,
base::Unretained(this),
type_state,
response_list);
response_list,
pending_updates);
pending_tasks_.push_back(task);
if (is_synchronous_)
RunQueuedTasks();
......@@ -111,6 +113,12 @@ UpdateResponseDataList MockModelTypeSyncProxy::GetNthUpdateResponse(
return received_update_responses_[n];
}
UpdateResponseDataList MockModelTypeSyncProxy::GetNthPendingUpdates(
size_t n) const {
DCHECK_LT(n, GetNumUpdateResponses());
return received_pending_updates_[n];
}
DataTypeState MockModelTypeSyncProxy::GetNthTypeStateReceivedInUpdateResponse(
size_t n) const {
DCHECK_LT(n, GetNumUpdateResponses());
......@@ -181,8 +189,10 @@ void MockModelTypeSyncProxy::OnCommitCompletedImpl(
void MockModelTypeSyncProxy::OnUpdateReceivedImpl(
const DataTypeState& type_state,
const UpdateResponseDataList& response_list) {
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) {
received_update_responses_.push_back(response_list);
received_pending_updates_.push_back(pending_updates);
type_states_received_on_update_.push_back(type_state);
for (UpdateResponseDataList::const_iterator it = response_list.begin();
it != response_list.end();
......
......@@ -36,7 +36,8 @@ class MockModelTypeSyncProxy : public ModelTypeSyncProxy {
const CommitResponseDataList& response_list) OVERRIDE;
virtual void OnUpdateReceived(
const DataTypeState& type_state,
const UpdateResponseDataList& response_list) OVERRIDE;
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates) OVERRIDE;
// By default, this object behaves as if all messages are processed
// immediately. Sometimes it is useful to defer work until later, as might
......@@ -65,6 +66,7 @@ class MockModelTypeSyncProxy : public ModelTypeSyncProxy {
// Does not includes repsonses that are in pending tasks.
size_t GetNumUpdateResponses() const;
UpdateResponseDataList GetNthUpdateResponse(size_t n) const;
UpdateResponseDataList GetNthPendingUpdates(size_t n) const;
DataTypeState GetNthTypeStateReceivedInUpdateResponse(size_t n) const;
// Getters to access the log of received commit responses.
......@@ -93,7 +95,8 @@ class MockModelTypeSyncProxy : public ModelTypeSyncProxy {
//
// Implemented as an Impl method so we can defer its execution in some cases.
void OnUpdateReceivedImpl(const DataTypeState& type_state,
const UpdateResponseDataList& response_list);
const UpdateResponseDataList& response_list,
const UpdateResponseDataList& pending_updates);
// Getter and setter for per-item sequence number tracking.
int64 GetCurrentSequenceNumber(const std::string& tag_hash) const;
......@@ -116,6 +119,7 @@ class MockModelTypeSyncProxy : public ModelTypeSyncProxy {
// A log of messages received by this object.
std::vector<CommitResponseDataList> received_commit_responses_;
std::vector<UpdateResponseDataList> received_update_responses_;
std::vector<UpdateResponseDataList> received_pending_updates_;
std::vector<DataTypeState> type_states_received_on_update_;
std::vector<DataTypeState> type_states_received_on_commit_;
......
......@@ -94,6 +94,8 @@ UpdateResponseData MockModelTypeSyncWorker::UpdateFromServer(
data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
data.non_unique_name = specifics.preference().name();
data.encryption_key_name = server_encryption_key_name_;
return data;
}
......@@ -118,6 +120,8 @@ UpdateResponseData MockModelTypeSyncWorker::TombstoneFromServer(
data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
data.non_unique_name = "Name Non Unique";
data.encryption_key_name = server_encryption_key_name_;
return data;
}
......@@ -149,6 +153,11 @@ CommitResponseData MockModelTypeSyncWorker::SuccessfulCommitResponse(
return response_data;
}
void MockModelTypeSyncWorker::SetServerEncryptionKey(
const std::string& key_name) {
server_encryption_key_name_ = key_name;
}
std::string MockModelTypeSyncWorker::GenerateId(const std::string& tag_hash) {
return "FakeId:" + tag_hash;
}
......
......@@ -57,6 +57,11 @@ class MockModelTypeSyncWorker : public ModelTypeSyncWorker {
CommitResponseData SuccessfulCommitResponse(
const CommitRequestData& request_data);
// Sets the encryption key name used for updates from the server.
// (ie. the key other clients are using to encrypt their commits.)
// The default value is an empty string, which indicates no encryption.
void SetServerEncryptionKey(const std::string& key_name);
private:
// Generate an ID string.
static std::string GenerateId(const std::string& tag_hash);
......@@ -72,6 +77,9 @@ class MockModelTypeSyncWorker : public ModelTypeSyncWorker {
// This is an essential part of the mocked server state.
std::map<const std::string, int64> server_versions_;
// Name of the encryption key in use on other clients.
std::string server_encryption_key_name_;
DISALLOW_COPY_AND_ASSIGN(MockModelTypeSyncWorker);
};
......
......@@ -251,7 +251,7 @@ bool Cryptographer::DecryptPendingKeys(const KeyParams& params) {
bool Cryptographer::GetBootstrapToken(std::string* token) const {
DCHECK(token);
std::string unencrypted_token = GetDefaultNigoriKey();
std::string unencrypted_token = GetDefaultNigoriKeyData();
if (unencrypted_token.empty())
return false;
......@@ -324,7 +324,11 @@ bool Cryptographer::KeybagIsStale(
return false;
}
std::string Cryptographer::GetDefaultNigoriKey() const {
std::string Cryptographer::GetDefaultNigoriKeyName() const {
return default_nigori_name_;
}
std::string Cryptographer::GetDefaultNigoriKeyData() const {
if (!is_initialized())
return std::string();
NigoriMap::const_iterator iter = nigoris_.find(default_nigori_name_);
......
......@@ -176,9 +176,12 @@ class SYNC_EXPORT Cryptographer {
// and/or has a different default key.
bool KeybagIsStale(const sync_pb::EncryptedData& keybag) const;
// Returns the name of the Nigori key currently used for encryption.
std::string GetDefaultNigoriKeyName() const;
// Returns a serialized sync_pb::NigoriKey version of current default
// encryption key.
std::string GetDefaultNigoriKey() const;
std::string GetDefaultNigoriKeyData() const;
// Generates a new Nigori from |serialized_nigori_key|, and if successful
// installs the new nigori as the default key.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment