Commit 99d5d72e authored by Sam McNally's avatar Sam McNally Committed by Commit Bot

Pref service: provide eventual consistency with multiple writers.

Currently, if two writers both write to a single pref or a single
subpref without seeing each other's writes, the two writers will each
converge to the other's value rather than a shared value. This CL
corrects this by eventually converging all clients to the last value
observed by the service, without any values moving backwards in
serialized write order.

To provide eventual consistency, each client tracks the prefs for which
a write has been sent to the pref service, but which have not been
acknowledged. The acks are sent on the same interface as updates
triggered by other clients so any remote writes received before an ack
are writes that have been serialized to after the in-flight write.
Thus, the client can ignore such writes.

Since prefs are hierarchical and writes can be applied to individual
sub-components of a single pref, writes to parent and child values can
race. If the parent write is applied first, then the client writing the
child value does not have a simple strategy to resolve the conflicting
changes with the information it has. Instead, in these cases, it
requests a value from the service for the parent value. Such values are
returned as pref value updates, just like notifications of a write by
another client and conflicts are handled similarly.

Bug: 654988
Change-Id: I1959f1b100134d9dc3af2b1c94d6ec08b139b8ce
Reviewed-on: https://chromium-review.googlesource.com/522303Reviewed-by: default avatarJohan Tibell <tibell@chromium.org>
Reviewed-by: default avatarRobert Sesek <rsesek@chromium.org>
Commit-Queue: Sam McNally <sammc@chromium.org>
Cr-Commit-Position: refs/heads/master@{#477850}
parent 41b846b3
......@@ -55,6 +55,7 @@ source_set("tests") {
]
sources = [
"persistent_pref_store_impl_unittest.cc",
"pref_store_consistency_unittest.cc",
]
if (!is_ios) {
sources += [ "pref_service_factory_unittest.cc" ]
......
......@@ -16,6 +16,65 @@
#include "services/preferences/public/cpp/lib/util.h"
namespace prefs {
namespace {
// Creates a PrefUpdateValuePtr representing |value| at |path|.
mojom::PrefUpdateValuePtr CreatePrefUpdate(const std::vector<std::string>& path,
const base::Value* value) {
if (path.empty()) {
return mojom::PrefUpdateValue::NewAtomicUpdate(
value ? value->CreateDeepCopy() : nullptr);
}
std::vector<mojom::SubPrefUpdatePtr> pref_updates;
pref_updates.emplace_back(base::in_place, path,
value ? value->CreateDeepCopy() : nullptr);
return mojom::PrefUpdateValue::NewSplitUpdates(std::move(pref_updates));
}
// Returns a mojom::PrefUpdateValuePtr for |path| relative to |value|. If the
// full path does not exist, a PrefUpdateValue containing the closest value is
// returned.
//
// For example, for a |path| of {"foo", "bar"}:
// - with a |value| of
// {
// "foo": 1
// }
// returns a path {"foo"} and value 1.
//
// - with a |value| of
// {}
// returns a path {"foo"} and null value.
//
// - with a |value| of
// {
// "foo": {}
// }
// returns a path {"foo", "bar"} and null value.
//
// - with a |value| of
// {
// "foo": {
// "bar": "baz"
// }
// }
// returns a path {"foo", "bar"} and value "baz".
mojom::PrefUpdateValuePtr LookupPrefUpdate(const std::vector<std::string>& path,
const base::Value* value) {
if (!value)
return CreatePrefUpdate(std::vector<std::string>(), value);
for (size_t i = 0; i < path.size(); ++i) {
const base::DictionaryValue* dictionary_value = nullptr;
if (!value->GetAsDictionary(&dictionary_value) ||
!dictionary_value->Get(path[i], &value)) {
return CreatePrefUpdate({path.begin(), path.begin() + i}, value);
}
}
return CreatePrefUpdate(path, value);
}
} // namespace
class PersistentPrefStoreImpl::Connection : public mojom::PersistentPrefStore {
public:
......@@ -55,6 +114,19 @@ class PersistentPrefStoreImpl::Connection : public mojom::PersistentPrefStore {
void SetValues(std::vector<mojom::PrefUpdatePtr> updates) override {
base::AutoReset<bool> scoped_call_in_progress(&write_in_progress_, true);
pref_store_->SetValues(std::move(updates));
observer_->OnPrefChangeAck();
}
void RequestValue(const std::string& key,
const std::vector<std::string>& path) override {
if (!base::ContainsKey(observed_keys_, key))
return;
const base::Value* value = nullptr;
pref_store_->GetValue(key, &value);
std::vector<mojom::PrefUpdatePtr> updates;
updates.emplace_back(base::in_place, key, LookupPrefUpdate(path, value), 0);
observer_->OnPrefsChanged(std::move(updates));
}
void CommitPendingWrite() override { pref_store_->CommitPendingWrite(); }
......@@ -171,6 +243,11 @@ void PersistentPrefStoreImpl::SetValues(
}
}
bool PersistentPrefStoreImpl::GetValue(const std::string& key,
const base::Value** value) const {
return backing_pref_store_->GetValue(key, value);
}
void PersistentPrefStoreImpl::CommitPendingWrite() {
backing_pref_store_->CommitPendingWrite();
}
......
......@@ -37,6 +37,7 @@ class PersistentPrefStoreImpl : public PrefStore::Observer {
class Connection;
void SetValues(std::vector<mojom::PrefUpdatePtr> updates);
bool GetValue(const std::string& key, const base::Value** value) const;
void CommitPendingWrite();
void SchedulePendingLossyWrites();
......
......@@ -200,7 +200,7 @@ TEST_F(PersistentPrefStoreImplTest, WriteObservedByOtherClient) {
EXPECT_TRUE(other_pref_store->IsInitializationComplete());
const base::Value value("value");
pref_store()->SetValueSilently(kKey, value.CreateDeepCopy(), 0);
pref_store()->SetValue(kKey, value.CreateDeepCopy(), 0);
ExpectPrefChange(other_pref_store.get(), kKey);
const base::Value* output = nullptr;
......
......@@ -496,7 +496,12 @@ TEST_F(PrefServiceFactoryTest,
{
ScopedDictionaryPrefUpdate update(pref_service.get(), kDictionaryKey);
update.Get();
update->SetInteger(kKey, kInitialValue);
}
WaitForPrefChange(pref_service2.get(), kDictionaryKey);
{
ScopedDictionaryPrefUpdate update(pref_service.get(), kDictionaryKey);
update->Remove(kKey, nullptr);
}
WaitForPrefChange(pref_service2.get(), kDictionaryKey);
EXPECT_TRUE(pref_service2->GetDictionary(kDictionaryKey)->empty());
......
This diff is collapsed.
......@@ -59,6 +59,92 @@ void RemoveRedundantPaths(std::set<std::vector<std::string>>* updated_paths) {
} // namespace
// A trie of writes that have been applied locally and sent to the service
// backend, but have not been acked.
class PersistentPrefStoreClient::InFlightWriteTrie {
public:
// Decision on what to do with writes incoming from the service.
enum class Decision {
// The write should be allowed.
kAllow,
// The write has already been superceded locally and should be ignored.
kIgnore,
// The write may have been partially superceded locally and should be
// ignored but an updated value is needed from the service.
kResolve
};
InFlightWriteTrie() = default;
void Add() {
std::vector<std::string> v;
Add(v.begin(), v.end());
}
template <typename It, typename Jt>
void Add(It path_start, Jt path_end) {
if (path_start == path_end) {
++writes_in_flight_;
return;
}
children_[*path_start].Add(path_start + 1, path_end);
}
bool Remove() {
std::vector<std::string> v;
return Remove(v.begin(), v.end());
}
template <typename It, typename Jt>
bool Remove(It path_start, Jt path_end) {
if (path_start == path_end) {
DCHECK_GT(writes_in_flight_, 0);
return --writes_in_flight_ == 0 && children_.empty();
}
auto it = children_.find(*path_start);
DCHECK(it != children_.end()) << *path_start;
auto removed = it->second.Remove(path_start + 1, path_end);
if (removed)
children_.erase(*path_start);
return children_.empty() && writes_in_flight_ == 0;
}
template <typename It, typename Jt>
Decision Lookup(It path_start, Jt path_end) {
if (path_start == path_end) {
if (children_.empty()) {
DCHECK_GT(writes_in_flight_, 0);
return Decision::kIgnore;
}
return Decision::kResolve;
}
if (writes_in_flight_ != 0) {
return Decision::kIgnore;
}
auto it = children_.find(*path_start);
if (it == children_.end()) {
return Decision::kAllow;
}
return it->second.Lookup(path_start + 1, path_end);
}
private:
std::map<std::string, InFlightWriteTrie> children_;
int writes_in_flight_ = 0;
DISALLOW_COPY_AND_ASSIGN(InFlightWriteTrie);
};
struct PersistentPrefStoreClient::InFlightWrite {
std::string key;
std::vector<std::vector<std::string>> sub_pref_paths;
};
PersistentPrefStoreClient::PersistentPrefStoreClient(
mojom::PrefStoreConnectorPtr connector,
scoped_refptr<PrefRegistry> pref_registry,
......@@ -123,9 +209,6 @@ void PersistentPrefStoreClient::SetValueSilently(
uint32_t flags) {
DCHECK(pref_store_);
GetMutableValues().Set(key, std::move(value));
QueueWrite(key,
std::set<std::vector<std::string>>{std::vector<std::string>{}},
flags);
}
bool PersistentPrefStoreClient::ReadOnly() const {
......@@ -232,16 +315,24 @@ void PersistentPrefStoreClient::QueueWrite(
}
void PersistentPrefStoreClient::FlushPendingWrites() {
weak_factory_.InvalidateWeakPtrs();
if (pending_writes_.empty())
return;
std::vector<mojom::PrefUpdatePtr> updates;
std::vector<InFlightWrite> writes;
for (auto& pref : pending_writes_) {
auto update_value = mojom::PrefUpdateValue::New();
const base::Value* value = nullptr;
if (GetValue(pref.first, &value)) {
std::vector<mojom::SubPrefUpdatePtr> pref_updates;
std::vector<std::vector<std::string>> sub_pref_writes;
RemoveRedundantPaths(&pref.second.first);
for (const auto& path : pref.second.first) {
if (path.empty()) {
pref_updates.clear();
sub_pref_writes.clear();
break;
}
const base::Value* nested_value = LookupPath(value, path);
......@@ -251,20 +342,78 @@ void PersistentPrefStoreClient::FlushPendingWrites() {
} else {
pref_updates.emplace_back(base::in_place, path, nullptr);
}
sub_pref_writes.push_back(path);
}
if (pref_updates.empty()) {
update_value->set_atomic_update(value->CreateDeepCopy());
writes.push_back({pref.first});
} else {
update_value->set_split_updates(std::move(pref_updates));
writes.push_back({pref.first, std::move(sub_pref_writes)});
}
} else {
update_value->set_atomic_update(nullptr);
writes.push_back({pref.first});
}
updates.emplace_back(base::in_place, pref.first, std::move(update_value),
pref.second.second);
}
pref_store_->SetValues(std::move(updates));
pending_writes_.clear();
for (const auto& write : writes) {
auto& trie = in_flight_writes_tries_[write.key];
if (write.sub_pref_paths.empty()) {
trie.Add();
} else {
for (const auto& subpref_update : write.sub_pref_paths) {
trie.Add(subpref_update.begin(), subpref_update.end());
}
}
}
in_flight_writes_queue_.push(std::move(writes));
}
void PersistentPrefStoreClient::OnPrefChangeAck() {
const auto& writes = in_flight_writes_queue_.front();
for (const auto& write : writes) {
auto it = in_flight_writes_tries_.find(write.key);
DCHECK(it != in_flight_writes_tries_.end()) << write.key;
bool remove = false;
if (write.sub_pref_paths.empty()) {
remove = it->second.Remove();
} else {
for (const auto& subpref_update : write.sub_pref_paths) {
remove =
it->second.Remove(subpref_update.begin(), subpref_update.end());
}
}
if (remove) {
in_flight_writes_tries_.erase(it);
}
}
in_flight_writes_queue_.pop();
}
bool PersistentPrefStoreClient::ShouldSkipWrite(
const std::string& key,
const std::vector<std::string>& path,
const base::Value* new_value) {
if (!pending_writes_.empty()) {
FlushPendingWrites();
}
auto it = in_flight_writes_tries_.find(key);
if (it == in_flight_writes_tries_.end()) {
return false;
}
auto decision = it->second.Lookup(path.begin(), path.end());
if (decision == InFlightWriteTrie::Decision::kAllow) {
return false;
}
if (decision == InFlightWriteTrie::Decision::kResolve)
pref_store_->RequestValue(key, path);
return true;
}
} // namespace prefs
......@@ -69,6 +69,9 @@ class PersistentPrefStoreClient
~PersistentPrefStoreClient() override;
private:
class InFlightWriteTrie;
struct InFlightWrite;
void OnConnect(mojom::PersistentPrefStoreConnectionPtr connection,
mojom::PersistentPrefStoreConnectionPtr incognito_connection,
std::unordered_map<PrefValueStore::PrefStoreType,
......@@ -80,6 +83,13 @@ class PersistentPrefStoreClient
uint32_t flags);
void FlushPendingWrites();
// prefs::mojom::PreferenceObserver:
void OnPrefChangeAck() override;
bool ShouldSkipWrite(const std::string& key,
const std::vector<std::string>& path,
const base::Value* new_value) override;
mojom::PrefStoreConnectorPtr connector_;
scoped_refptr<PrefRegistry> pref_registry_;
bool read_only_ = false;
......@@ -91,6 +101,9 @@ class PersistentPrefStoreClient
std::unique_ptr<ReadErrorDelegate> error_delegate_;
std::vector<PrefValueStore::PrefStoreType> already_connected_types_;
std::queue<std::vector<InFlightWrite>> in_flight_writes_queue_;
std::map<std::string, InFlightWriteTrie> in_flight_writes_tries_;
base::WeakPtrFactory<PersistentPrefStoreClient> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(PersistentPrefStoreClient);
......
......@@ -100,6 +100,9 @@ void PrefStoreClientMixin<BasePrefStore>::OnInitializationCompleted(
}
}
template <typename BasePrefStore>
void PrefStoreClientMixin<BasePrefStore>::OnPrefChangeAck() {}
template <typename BasePrefStore>
void PrefStoreClientMixin<BasePrefStore>::OnPrefChanged(
const std::string& key,
......@@ -107,6 +110,10 @@ void PrefStoreClientMixin<BasePrefStore>::OnPrefChanged(
DCHECK(cached_prefs_);
bool changed = false;
if (update_value->is_atomic_update()) {
if (ShouldSkipWrite(key, std::vector<std::string>(),
update_value->get_atomic_update().get())) {
return;
}
auto& value = update_value->get_atomic_update();
if (!value) { // Delete
if (cached_prefs_->RemovePath(key, nullptr))
......@@ -129,9 +136,10 @@ void PrefStoreClientMixin<BasePrefStore>::OnPrefChanged(
changed = true;
for (auto& update : updates) {
// Clients shouldn't send empty paths.
if (update->path.empty())
if (update->path.empty() ||
ShouldSkipWrite(key, update->path, update->value.get())) {
continue;
}
std::vector<base::StringPiece> full_path = base::SplitStringPiece(
key, ".", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY);
full_path.insert(full_path.end(), update->path.begin(),
......@@ -143,6 +151,14 @@ void PrefStoreClientMixin<BasePrefStore>::OnPrefChanged(
ReportPrefValueChanged(key);
}
template <typename BasePrefStore>
bool PrefStoreClientMixin<BasePrefStore>::ShouldSkipWrite(
const std::string& key,
const std::vector<std::string>& path,
const base::Value* new_value) {
return false;
}
template class PrefStoreClientMixin<::PrefStore>;
template class PrefStoreClientMixin<::PersistentPrefStore>;
......
......@@ -57,10 +57,17 @@ class PrefStoreClientMixin : public BasePrefStore,
// prefs::mojom::PreferenceObserver:
void OnPrefsChanged(std::vector<mojom::PrefUpdatePtr> updates) override;
void OnInitializationCompleted(bool succeeded) override;
void OnPrefChangeAck() override;
void OnPrefChanged(const std::string& key,
mojom::PrefUpdateValuePtr update_value);
// Should this client ignore a write received from the service? The default
// implementation never skips writes.
virtual bool ShouldSkipWrite(const std::string& key,
const std::vector<std::string>& path,
const base::Value* new_value);
// Cached preferences.
// If null, indicates that initialization failed.
std::unique_ptr<base::DictionaryValue> cached_prefs_;
......
......@@ -54,11 +54,6 @@ class PersistentPrefStoreClientTest : public testing::Test,
nullptr, pref_notifier),
persistent_pref_store_client.get(), pref_registry.get(),
base::Bind(&DoNothingWithReadError), false);
// The first update to a pref will write the entire dictionary as it would
// previously be missing. Do this here to avoid individual tests needing to
// deal with those updates.
ScopedDictionaryPrefUpdate(pref_service(), kDictionaryKey).Get();
auto update = WaitForUpdate();
}
void TearDown() override {
......@@ -81,6 +76,7 @@ class PersistentPrefStoreClientTest : public testing::Test,
}
void ExpectNoUpdate() {
pref_service()->CommitPendingWrite();
binding_.FlushForTesting();
EXPECT_TRUE(last_updates_.empty());
}
......@@ -92,6 +88,8 @@ class PersistentPrefStoreClientTest : public testing::Test,
std::move(on_update_).Run();
}
void RequestValue(const std::string& key,
const std::vector<std::string>& path) override {}
void CommitPendingWrite() override {}
void SchedulePendingLossyWrites() override {}
void ClearMutableValues() override {}
......@@ -487,18 +485,5 @@ TEST_F(PersistentPrefStoreClientTest, SubPrefUpdates_ReplaceDictionary) {
EXPECT_EQ((std::vector<std::string>{"path"}), split_updates[0]->path);
}
TEST_F(PersistentPrefStoreClientTest, SubPrefUpdates_Uninitialized) {
{
ScopedDictionaryPrefUpdate update(pref_service(),
kUninitializedDictionaryKey);
update->SetInteger("path.to.integer", 1);
}
auto update = WaitForUpdate();
ASSERT_TRUE(update->is_atomic_update());
base::DictionaryValue expected_value;
expected_value.SetInteger("path.to.integer", 1);
EXPECT_EQ(expected_value, *update->get_atomic_update());
}
} // namespace
} // namespace prefs
......@@ -32,6 +32,13 @@ interface PrefStoreObserver {
// The PrefStore has been initialized (asynchronously).
OnInitializationCompleted(bool succeeded);
// A preference write by this client has been applied. If this
// PrefStoreObserver is associated with a PersistentPrefStore, one
// OnPrefChangeAck() message is sent in response to each SetValues() message.
// This exists to ensure acks are ordered with respect to OnPrefsChanged
// messages.
OnPrefChangeAck();
};
// Captures the connections to a PrefStore by supplying the initial state of the
......@@ -145,6 +152,12 @@ interface PersistentPrefStore {
// Sets the values for prefs.
SetValues(array<PrefUpdate> updates);
// Requests that the pref service transmits its value for a pref (or sub-pref
// if |sub_pref_path| is non-empty). The value will be transmitted over the
// corresponding PrefStoreObserver interface previous returned by
// PrefStoreConnector.Connect().
RequestValue(string key, array<string> sub_pref_path);
// These mirror the C++ PersistentPrefStore methods.
CommitPendingWrite();
SchedulePendingLossyWrites();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment