Commit 8ff66c4b authored by Dan Harrington's avatar Dan Harrington Committed by Commit Bot

Add StoreObserver and Update() to StreamModel

* Added StreamModel::Update() to accept data from storage or
  the network.
* Added StoreObserver to StreamModel for observing changes
  that should be made to persistent storage. Currently this
  only includes DataOperations, but might include more data
  later (for network fetches).

Bug: 1044139
Change-Id: I8179f6f0d5b2c4dbd2347d0d8667a41bc97d09e4
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2106554
Commit-Queue: Dan H <harringtond@chromium.org>
Reviewed-by: default avatarIan Wells <iwells@chromium.org>
Cr-Commit-Position: refs/heads/master@{#751107}
parent 4d9fefc8
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "components/feed/core/proto/v2/store.pb.h" #include "components/feed/core/proto/v2/store.pb.h"
#include "components/feed/core/proto/v2/wire/content_id.pb.h" #include "components/feed/core/proto/v2/wire/content_id.pb.h"
#include "components/feed/core/v2/stream_model_update_request.h"
namespace feed { namespace feed {
...@@ -19,10 +20,24 @@ StreamModel::UiUpdate::~UiUpdate() = default; ...@@ -19,10 +20,24 @@ StreamModel::UiUpdate::~UiUpdate() = default;
StreamModel::UiUpdate::UiUpdate(const UiUpdate&) = default; StreamModel::UiUpdate::UiUpdate(const UiUpdate&) = default;
StreamModel::UiUpdate& StreamModel::UiUpdate::operator=(const UiUpdate&) = StreamModel::UiUpdate& StreamModel::UiUpdate::operator=(const UiUpdate&) =
default; default;
StreamModel::StoreUpdate::StoreUpdate() = default;
StreamModel::StoreUpdate::~StoreUpdate() = default;
StreamModel::StoreUpdate::StoreUpdate(const StoreUpdate&) = default;
StreamModel::StoreUpdate& StreamModel::StoreUpdate::operator=(
const StoreUpdate&) = default;
StreamModel::StoreUpdate::StoreUpdate(StoreUpdate&&) = default;
StreamModel::StoreUpdate& StreamModel::StoreUpdate::operator=(StoreUpdate&&) =
default;
StreamModel::StreamModel() = default; StreamModel::StreamModel() = default;
StreamModel::~StreamModel() = default; StreamModel::~StreamModel() = default;
void StreamModel::SetStoreObserver(StoreObserver* store_observer) {
DCHECK(!store_observer || !store_observer_)
<< "Attempting to set store_observer multiple times";
store_observer_ = store_observer;
}
void StreamModel::SetObserver(Observer* observer) { void StreamModel::SetObserver(Observer* observer) {
DCHECK(!observer || !observer_) DCHECK(!observer || !observer_)
<< "Attempting to set the observer multiple times"; << "Attempting to set the observer multiple times";
...@@ -34,6 +49,36 @@ const feedstore::Content* StreamModel::FindContent( ...@@ -34,6 +49,36 @@ const feedstore::Content* StreamModel::FindContent(
return GetFinalFeatureTree()->FindContent(revision); return GetFinalFeatureTree()->FindContent(revision);
} }
void StreamModel::Update(
std::unique_ptr<StreamModelUpdateRequest> update_request) {
feedstore::StreamData& stream_data = update_request->stream_data;
for (const feedstore::StreamStructure& structure : stream_data.structures()) {
base_feature_tree_.ApplyStreamStructure(structure);
}
for (feedstore::Content& content : update_request->content) {
base_feature_tree_.AddContent(std::move(content));
}
next_page_token_ = stream_data.next_page_token();
last_added_time_ =
base::Time::UnixEpoch() +
base::TimeDelta::FromMilliseconds(stream_data.last_added_time_millis());
consistency_token_ = stream_data.consistency_token();
// TODO(harringtond): consume shared state.
// TODO(harringtond): Some StreamData fields not yet used.
// next_action_id - do we need to load the model before uploading
// actions? If not, we probably will want to move this out of
// StreamData.
// content_id - probably just ignore for now
// TODO(harringtond): Figure out how to forward network updates to storage.
// We can either update through |observer_|, or as part of the load stream
// task.
UpdateFlattenedTree();
}
EphemeralChangeId StreamModel::CreateEphemeralChange( EphemeralChangeId StreamModel::CreateEphemeralChange(
std::vector<feedstore::DataOperation> operations) { std::vector<feedstore::DataOperation> operations) {
const EphemeralChangeId id = const EphemeralChangeId id =
...@@ -46,14 +91,21 @@ EphemeralChangeId StreamModel::CreateEphemeralChange( ...@@ -46,14 +91,21 @@ EphemeralChangeId StreamModel::CreateEphemeralChange(
void StreamModel::ExecuteOperations( void StreamModel::ExecuteOperations(
std::vector<feedstore::DataOperation> operations) { std::vector<feedstore::DataOperation> operations) {
for (feedstore::DataOperation& operation : operations) { for (const feedstore::DataOperation& operation : operations) {
if (operation.has_structure()) { if (operation.has_structure()) {
base_feature_tree_.ApplyStreamStructure(operation.structure()); base_feature_tree_.ApplyStreamStructure(operation.structure());
} }
if (operation.has_content()) { if (operation.has_content()) {
base_feature_tree_.AddContent(std::move(*operation.mutable_content())); base_feature_tree_.AddContent(operation.content());
} }
} }
if (store_observer_) {
StoreUpdate store_update;
store_update.operations = std::move(operations);
store_observer_->OnStoreChange(std::move(store_update));
}
UpdateFlattenedTree(); UpdateFlattenedTree();
} }
......
...@@ -20,6 +20,7 @@ class DataOperation; ...@@ -20,6 +20,7 @@ class DataOperation;
} // namespace feedwire } // namespace feedwire
namespace feed { namespace feed {
struct StreamModelUpdateRequest;
// An in-memory stream model. // An in-memory stream model.
class StreamModel { class StreamModel {
...@@ -43,12 +44,29 @@ class StreamModel { ...@@ -43,12 +44,29 @@ class StreamModel {
std::vector<SharedStateInfo> shared_states; std::vector<SharedStateInfo> shared_states;
}; };
struct StoreUpdate {
StoreUpdate();
~StoreUpdate();
StoreUpdate(const StoreUpdate&);
StoreUpdate(StoreUpdate&&);
StoreUpdate& operator=(const StoreUpdate&);
StoreUpdate& operator=(StoreUpdate&&);
std::vector<feedstore::DataOperation> operations;
};
class Observer { class Observer {
public: public:
virtual ~Observer() = default; virtual ~Observer() = default;
// Called when the UI model changes. // Called when the UI model changes.
virtual void OnUiUpdate(const UiUpdate& update) = 0; virtual void OnUiUpdate(const UiUpdate& update) = 0;
}; };
class StoreObserver {
public:
// Called when the peristent store should be modified to reflect a model
// change.
virtual void OnStoreChange(const StoreUpdate& update) = 0;
};
explicit StreamModel(); explicit StreamModel();
~StreamModel(); ~StreamModel();
...@@ -57,6 +75,7 @@ class StreamModel { ...@@ -57,6 +75,7 @@ class StreamModel {
StreamModel& operator=(const StreamModel&) = delete; StreamModel& operator=(const StreamModel&) = delete;
void SetObserver(Observer* observer); void SetObserver(Observer* observer);
void SetStoreObserver(StoreObserver* store_observer);
// Data access. // Data access.
...@@ -66,6 +85,10 @@ class StreamModel { ...@@ -66,6 +85,10 @@ class StreamModel {
} }
// Returns the content identified by |ContentRevision|. // Returns the content identified by |ContentRevision|.
const feedstore::Content* FindContent(ContentRevision revision) const; const feedstore::Content* FindContent(ContentRevision revision) const;
// Apply an update from the network or storage.
void Update(std::unique_ptr<StreamModelUpdateRequest> update_request);
// Apply |operations| to the model. // Apply |operations| to the model.
void ExecuteOperations(std::vector<feedstore::DataOperation> operations); void ExecuteOperations(std::vector<feedstore::DataOperation> operations);
...@@ -86,7 +109,7 @@ class StreamModel { ...@@ -86,7 +109,7 @@ class StreamModel {
void UpdateFlattenedTree(); void UpdateFlattenedTree();
Observer* observer_ = nullptr; // Unowned. Observer* observer_ = nullptr; // Unowned.
StoreObserver* store_observer_ = nullptr; // Unowned.
stream_model::ContentIdMap id_map_; stream_model::ContentIdMap id_map_;
stream_model::FeatureTree base_feature_tree_{&id_map_}; stream_model::FeatureTree base_feature_tree_{&id_map_};
// |base_feature_tree_| with |ephemeral_changes_| applied. // |base_feature_tree_| with |ephemeral_changes_| applied.
...@@ -94,6 +117,13 @@ class StreamModel { ...@@ -94,6 +117,13 @@ class StreamModel {
std::unique_ptr<stream_model::FeatureTree> feature_tree_after_changes_; std::unique_ptr<stream_model::FeatureTree> feature_tree_after_changes_;
stream_model::EphemeralChangeList ephemeral_changes_; stream_model::EphemeralChangeList ephemeral_changes_;
// The following data is associated with the stream, but lives outside of the
// tree.
std::string next_page_token_; // TODO(harringtond): use this value.
std::string consistency_token_; // TODO(harringtond): use this value.
base::Time last_added_time_; // TODO(harringtond): use this value.
// Current state of the flattened tree. // Current state of the flattened tree.
// Updated after each tree change. // Updated after each tree change.
std::vector<ContentRevision> content_list_; std::vector<ContentRevision> content_list_;
......
...@@ -133,7 +133,6 @@ void FeatureTree::AddContent(ContentRevision revision_id, ...@@ -133,7 +133,6 @@ void FeatureTree::AddContent(ContentRevision revision_id,
// these copies. // these copies.
const ContentTag tag = GetContentTag(content.content_id()); const ContentTag tag = GetContentTag(content.content_id());
DCHECK(!content_.count(revision_id)); DCHECK(!content_.count(revision_id));
GetOrMakeNode(tag)->content_revision = revision_id; GetOrMakeNode(tag)->content_revision = revision_id;
content_[revision_id] = std::move(content); content_[revision_id] = std::move(content);
} }
......
...@@ -7,11 +7,14 @@ ...@@ -7,11 +7,14 @@
#include "base/optional.h" #include "base/optional.h"
#include "components/feed/core/proto/v2/store.pb.h" #include "components/feed/core/proto/v2/store.pb.h"
#include "components/feed/core/proto/v2/wire/content_id.pb.h" #include "components/feed/core/proto/v2/wire/content_id.pb.h"
#include "components/feed/core/v2/stream_model_update_request.h"
#include "components/feed/core/v2/test/stream_builder.h" #include "components/feed/core/v2/test/stream_builder.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
namespace feed { namespace feed {
namespace { namespace {
using StoreUpdate = StreamModel::StoreUpdate;
using UiUpdate = StreamModel::UiUpdate;
std::vector<std::string> GetContentFrames(const StreamModel& model) { std::vector<std::string> GetContentFrames(const StreamModel& model) {
std::vector<std::string> frames; std::vector<std::string> frames;
...@@ -29,20 +32,35 @@ std::vector<std::string> GetContentFrames(const StreamModel& model) { ...@@ -29,20 +32,35 @@ std::vector<std::string> GetContentFrames(const StreamModel& model) {
class TestObserver : public StreamModel::Observer { class TestObserver : public StreamModel::Observer {
public: public:
explicit TestObserver(StreamModel* model) { model->SetObserver(this); } explicit TestObserver(StreamModel* model) { model->SetObserver(this); }
void OnUiUpdate(const StreamModel::UiUpdate& update) override {
update_ = update;
}
const base::Optional<StreamModel::UiUpdate>& GetUiUpdate() const { // StreamModel::Observer.
return update_; void OnUiUpdate(const UiUpdate& update) override { update_ = update; }
} const base::Optional<UiUpdate>& GetUiUpdate() const { return update_; }
bool ContentListChanged() const { bool ContentListChanged() const {
return update_ && update_->content_list_changed; return update_ && update_->content_list_changed;
} }
void Clear() { update_ = base::nullopt; } void Clear() { update_ = base::nullopt; }
private: private:
base::Optional<StreamModel::UiUpdate> update_; base::Optional<UiUpdate> update_;
};
class TestStoreObserver : public StreamModel::StoreObserver {
public:
explicit TestStoreObserver(StreamModel* model) {
model->SetStoreObserver(this);
}
// StreamModel::StoreObserver.
void OnStoreChange(const StoreUpdate& records) override { update_ = records; }
const base::Optional<StoreUpdate>& GetUpdate() const { return update_; }
void Clear() { update_ = base::nullopt; }
private:
base::Optional<StoreUpdate> update_;
}; };
TEST(StreamModelTest, ConstructEmptyModel) { TEST(StreamModelTest, ConstructEmptyModel) {
...@@ -52,15 +70,17 @@ TEST(StreamModelTest, ConstructEmptyModel) { ...@@ -52,15 +70,17 @@ TEST(StreamModelTest, ConstructEmptyModel) {
EXPECT_EQ(0UL, model.GetContentList().size()); EXPECT_EQ(0UL, model.GetContentList().size());
} }
// Typical stream (Stream -> Cluster -> Content). TEST(StreamModelTest, ExecuteOperationsTypicalStream) {
TEST(StreamModelTest, AddStreamClusterContent) {
StreamModel model; StreamModel model;
TestObserver observer(&model); TestObserver observer(&model);
TestStoreObserver store_observer(&model);
model.ExecuteOperations(MakeTypicalStreamOperations()); model.ExecuteOperations(MakeTypicalStreamOperations());
EXPECT_TRUE(observer.ContentListChanged()); EXPECT_TRUE(observer.ContentListChanged());
EXPECT_EQ(std::vector<std::string>({"f:0", "f:1"}), GetContentFrames(model)); EXPECT_EQ(std::vector<std::string>({"f:0", "f:1"}), GetContentFrames(model));
ASSERT_TRUE(store_observer.GetUpdate());
ASSERT_EQ(MakeTypicalStreamOperations().size(),
store_observer.GetUpdate()->operations.size());
} }
TEST(StreamModelTest, AddContentWithoutRoot) { TEST(StreamModelTest, AddContentWithoutRoot) {
...@@ -264,14 +284,26 @@ TEST(StreamModelTest, CommitEphemeralChange) { ...@@ -264,14 +284,26 @@ TEST(StreamModelTest, CommitEphemeralChange) {
TestObserver observer(&model); TestObserver observer(&model);
model.ExecuteOperations(MakeTypicalStreamOperations()); model.ExecuteOperations(MakeTypicalStreamOperations());
EphemeralChangeId change_id = model.CreateEphemeralChange({ EphemeralChangeId change_id = model.CreateEphemeralChange({
MakeOperation(MakeCluster(2, MakeRootId())), MakeOperation(MakeCluster(2, MakeRootId())),
MakeOperation(MakeContentNode(2, MakeClusterId(2))), MakeOperation(MakeContentNode(2, MakeClusterId(2))),
MakeOperation(MakeContent(2)), MakeOperation(MakeContent(2)),
}); });
observer.Clear();
TestStoreObserver store_observer(&model);
EXPECT_TRUE(model.CommitEphemeralChange(change_id)); EXPECT_TRUE(model.CommitEphemeralChange(change_id));
// Check that the observer's |OnStoreChange()| was called.
ASSERT_TRUE(store_observer.GetUpdate());
StoreUpdate store_update = *store_observer.GetUpdate();
ASSERT_EQ(3UL, store_update.operations.size());
EXPECT_EQ(feedstore::StreamStructure::CLUSTER,
store_update.operations[0].structure().type());
EXPECT_EQ(feedstore::StreamStructure::CONTENT,
store_update.operations[1].structure().type());
// Can't reject after commit. // Can't reject after commit.
EXPECT_FALSE(model.RejectEphemeralChange(change_id)); EXPECT_FALSE(model.RejectEphemeralChange(change_id));
...@@ -326,5 +358,26 @@ TEST(StreamModelTest, RejectFirstEphemeralChange) { ...@@ -326,5 +358,26 @@ TEST(StreamModelTest, RejectFirstEphemeralChange) {
GetContentFrames(model)); GetContentFrames(model));
} }
TEST(StreamModelTest, InitialLoad) {
StreamModel model;
TestObserver observer(&model);
TestStoreObserver store_observer(&model);
auto initial_update = std::make_unique<StreamModelUpdateRequest>();
initial_update->source =
StreamModelUpdateRequest::Source::kInitialLoadFromStore;
initial_update->content.push_back(MakeContent(0));
*initial_update->stream_data.add_structures() = MakeStream();
*initial_update->stream_data.add_structures() = MakeCluster(0, MakeRootId());
*initial_update->stream_data.add_structures() =
MakeContentNode(0, MakeClusterId(0));
model.Update(std::move(initial_update));
// Check that content was added and the store doesn't receive its own update.
EXPECT_TRUE(observer.ContentListChanged());
EXPECT_EQ(std::vector<std::string>({"f:0"}), GetContentFrames(model));
EXPECT_FALSE(store_observer.GetUpdate());
}
} // namespace } // namespace
} // namespace feed } // namespace feed
...@@ -46,10 +46,12 @@ struct StreamModelUpdateRequest { ...@@ -46,10 +46,12 @@ struct StreamModelUpdateRequest {
// If this data originates from the network, this is the server-reported time // If this data originates from the network, this is the server-reported time
// at which the request was fulfilled. // at which the request was fulfilled.
// TODO(harringtond): Use this or remove it.
int64_t server_response_time; int64_t server_response_time;
// If this data originates from the network, this is the time taken by the // If this data originates from the network, this is the time taken by the
// server to produce the response. // server to produce the response.
// TODO(harringtond): Use this or remove it.
base::TimeDelta response_time; base::TimeDelta response_time;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment