Commit 6a1ae504 authored by Dan Harrington's avatar Dan Harrington Committed by Commit Bot

Add primitive stream loading to feedv2

- Just loads from network.
- To make testing easier, you can now inject the internal (translated)
  response.

Bug: 1044139
Change-Id: I52b231ebda49c944626f090b66af612e7979d436
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2116922Reviewed-by: default avatarIan Wells <iwells@chromium.org>
Commit-Queue: Dan H <harringtond@chromium.org>
Cr-Commit-Position: refs/heads/master@{#753393}
parent f65119b5
......@@ -39,6 +39,8 @@ source_set("feed_core_v2") {
"stream_model/feature_tree.h",
"stream_model_update_request.cc",
"stream_model_update_request.h",
"tasks/load_stream_task.cc",
"tasks/load_stream_task.h",
]
deps = [
"//components/feed/core:feed_core",
......
......@@ -20,6 +20,8 @@
#include "components/feed/core/v2/refresh_task_scheduler.h"
#include "components/feed/core/v2/scheduling.h"
#include "components/feed/core/v2/stream_model.h"
#include "components/feed/core/v2/stream_model_update_request.h"
#include "components/feed/core/v2/tasks/load_stream_task.h"
#include "components/prefs/pref_service.h"
namespace feed {
......@@ -125,6 +127,13 @@ class FeedStream::ModelMonitor : public StreamModel::Observer {
std::set<ContentRevision> current_content_set_;
};
std::unique_ptr<StreamModelUpdateRequest>
FeedStream::WireResponseTranslator::TranslateWireResponse(
feedwire::Response response,
base::TimeDelta response_time) {
return ::feed::TranslateWireResponse(std::move(response), response_time);
}
FeedStream::FeedStream(
RefreshTaskScheduler* refresh_task_scheduler,
EventObserver* stream_event_observer,
......@@ -146,6 +155,8 @@ FeedStream::FeedStream(
user_classifier_(profile_prefs, clock),
refresh_throttler_(profile_prefs, clock) {
// TODO(harringtond): Use these members.
static WireResponseTranslator default_translator;
wire_response_translator_ = &default_translator;
(void)feed_network_;
}
......@@ -161,10 +172,25 @@ void FeedStream::InitializeScheduling() {
FeedStream::~FeedStream() = default;
void FeedStream::TriggerStreamLoad() {
if (model_monitor_ || model_loading_in_progress_)
return;
model_loading_in_progress_ = true;
task_queue_.AddTask(std::make_unique<LoadStreamTask>(
this, base::BindOnce(&FeedStream::LoadStreamTaskComplete,
base::Unretained(this))));
}
void FeedStream::LoadStreamTaskComplete() {
model_loading_in_progress_ = false;
}
void FeedStream::AttachSurface(SurfaceInterface* surface) {
surfaces_.AddObserver(surface);
if (model_monitor_) {
model_monitor_->SurfaceAdded(surface);
} else {
TriggerStreamLoad();
}
}
......@@ -226,6 +252,9 @@ base::Time FeedStream::GetLastFetchTime() {
void FeedStream::LoadModelForTesting(std::unique_ptr<StreamModel> model) {
LoadModel(std::move(model));
}
offline_pages::TaskQueue* FeedStream::GetTaskQueueForTesting() {
return &task_queue_;
}
void FeedStream::OnTaskQueueIsIdle() {}
......
......@@ -14,6 +14,7 @@
#include "base/task_runner_util.h"
#include "components/feed/core/common/enums.h"
#include "components/feed/core/common/user_classifier.h"
#include "components/feed/core/proto/v2/wire/response.pb.h"
#include "components/feed/core/v2/master_refresh_throttler.h"
#include "components/feed/core/v2/public/feed_stream_api.h"
#include "components/offline_pages/task/task_queue.h"
......@@ -29,6 +30,7 @@ namespace feed {
class StreamModel;
class FeedNetwork;
class RefreshTaskScheduler;
struct StreamModelUpdateRequest;
// Implements FeedStreamApi. |FeedStream| additionally exposes functionality
// needed by other classes within the Feed component.
......@@ -53,6 +55,17 @@ class FeedStream : public FeedStreamApi,
virtual void OnClearAll(base::TimeDelta time_since_last_clear) = 0;
};
// Forwards to |feed::TranslateWireResponse()| by default. Can be overridden
// for testing.
class WireResponseTranslator {
public:
WireResponseTranslator() = default;
~WireResponseTranslator() = default;
virtual std::unique_ptr<StreamModelUpdateRequest> TranslateWireResponse(
feedwire::Response response,
base::TimeDelta response_time);
};
FeedStream(RefreshTaskScheduler* refresh_task_scheduler,
EventObserver* stream_event_observer,
Delegate* delegate,
......@@ -107,6 +120,10 @@ class FeedStream : public FeedStreamApi,
// State shared for the sake of implementing FeedStream. Typically these
// functions are used by tasks.
void LoadModel(std::unique_ptr<StreamModel> model);
FeedNetwork* GetNetwork() { return feed_network_; }
// Returns the computed UserClass for the active user.
UserClass GetUserClass();
......@@ -116,17 +133,29 @@ class FeedStream : public FeedStreamApi,
// Loads |model|. Should be used for testing in place of typical model
// loading from network or storage.
void LoadModelForTesting(std::unique_ptr<StreamModel> model);
offline_pages::TaskQueue* GetTaskQueueForTesting();
// Returns the model if it is loaded, or null otherwise.
StreamModel* GetModel() { return model_.get(); }
WireResponseTranslator* GetWireResponseTranslator() const {
return wire_response_translator_;
}
void SetWireResponseTranslatorForTesting(
WireResponseTranslator* wire_response_translator) {
wire_response_translator_ = wire_response_translator;
}
private:
class ModelMonitor;
void MaybeTriggerRefresh(TriggerType trigger,
bool clear_all_before_refresh = false);
void LoadModel(std::unique_ptr<StreamModel> model);
void TriggerStreamLoad();
void UnloadModel();
void LoadStreamTaskComplete();
// Determines whether or not a fetch should be allowed.
// If a fetch is allowed, quota is reserved with the assumption that a fetch
// will follow shortly.
......@@ -134,6 +163,8 @@ class FeedStream : public FeedStreamApi,
void ClearAll();
// Unowned.
RefreshTaskScheduler* refresh_task_scheduler_;
EventObserver* stream_event_observer_;
Delegate* delegate_;
......@@ -141,10 +172,14 @@ class FeedStream : public FeedStreamApi,
FeedNetwork* feed_network_;
const base::Clock* clock_;
const base::TickClock* tick_clock_;
WireResponseTranslator* wire_response_translator_;
scoped_refptr<base::SequencedTaskRunner> background_task_runner_;
offline_pages::TaskQueue task_queue_;
// Whether the model is being loaded. Used to prevent multiple simultaneous
// attempts to load the model.
bool model_loading_in_progress_ = false;
// Monitors |model_|. Null when |model_| is null.
std::unique_ptr<ModelMonitor> model_monitor_;
// The stream model. Null if not yet loaded.
......
......@@ -13,9 +13,11 @@
#include "base/test/simple_test_clock.h"
#include "base/test/simple_test_tick_clock.h"
#include "base/test/task_environment.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "components/feed/core/common/pref_names.h"
#include "components/feed/core/proto/v2/store.pb.h"
#include "components/feed/core/proto/v2/ui.pb.h"
#include "components/feed/core/proto/v2/wire/request.pb.h"
#include "components/feed/core/shared_prefs/pref_names.h"
#include "components/feed/core/v2/feed_network.h"
#include "components/feed/core/v2/refresh_task_scheduler.h"
......@@ -34,13 +36,20 @@ class TestSurface : public FeedStream::SurfaceInterface {
// FeedStream::SurfaceInterface.
void InitialStreamState(const feedui::StreamUpdate& stream_update) override {
initial_state = stream_update;
if (on_initial_stream_state)
on_initial_stream_state.Run();
}
void StreamUpdate(const feedui::StreamUpdate& stream_update) override {
update = stream_update;
if (on_stream_update)
on_stream_update.Run();
}
base::Optional<feedui::StreamUpdate> initial_state;
base::Optional<feedui::StreamUpdate> update;
base::RepeatingClosure on_initial_stream_state;
base::RepeatingClosure on_stream_update;
};
class TestFeedNetwork : public FeedNetwork {
......@@ -48,11 +57,47 @@ class TestFeedNetwork : public FeedNetwork {
// FeedNetwork implementation.
void SendQueryRequest(
const feedwire::Request& request,
base::OnceCallback<void(QueryRequestResult)> callback) override {}
base::OnceCallback<void(QueryRequestResult)> callback) override {
++send_query_call_count;
// Emulate a successful response.
// The response body is currently an empty message, because most of the
// time we want to inject a translated response for ease of test-writing.
query_request_sent = request;
QueryRequestResult result;
result.status_code = 200;
result.response_body = std::make_unique<feedwire::Response>();
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(std::move(callback), std::move(result)));
}
void SendActionRequest(
const feedwire::ActionRequest& request,
base::OnceCallback<void(ActionRequestResult)> callback) override {}
void CancelRequests() override {}
base::Optional<feedwire::Request> query_request_sent;
int send_query_call_count = 0;
};
// Forwards to |FeedStream::WireResponseTranslator| unless a response is
// injected.
class TestWireResponseTranslator : public FeedStream::WireResponseTranslator {
public:
std::unique_ptr<StreamModelUpdateRequest> TranslateWireResponse(
feedwire::Response response,
base::TimeDelta response_time) override {
if (injected_response_) {
return std::move(injected_response_);
}
return FeedStream::WireResponseTranslator::TranslateWireResponse(
std::move(response), response_time);
}
void InjectResponse(std::unique_ptr<StreamModelUpdateRequest> response) {
injected_response_ = std::move(response);
}
bool InjectedResponseConsumed() const { return !injected_response_; }
private:
std::unique_ptr<StreamModelUpdateRequest> injected_response_;
};
class FakeRefreshTaskScheduler : public RefreshTaskScheduler {
......@@ -94,20 +139,30 @@ class FeedStreamTest : public testing::Test, public FeedStream::Delegate {
stream_ = std::make_unique<FeedStream>(
&refresh_scheduler_, &event_observer_, this, &profile_prefs_, &network_,
&clock_, &tick_clock_, task_environment_.GetMainThreadTaskRunner());
stream_->SetWireResponseTranslatorForTesting(&response_translator_);
}
// FeedStream::Delegate.
bool IsEulaAccepted() override { return true; }
bool IsOffline() override { return false; }
// For test access.
bool IsTaskQueueIdle() const {
return !stream_->GetTaskQueueForTesting()->HasPendingTasks() &&
!stream_->GetTaskQueueForTesting()->HasRunningTask();
}
protected:
base::test::TaskEnvironment task_environment_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
TestEventObserver event_observer_;
TestingPrefServiceSimple profile_prefs_;
TestFeedNetwork network_;
TestWireResponseTranslator response_translator_;
base::SimpleTestClock clock_;
base::SimpleTestTickClock tick_clock_;
FakeRefreshTaskScheduler refresh_scheduler_;
std::unique_ptr<FeedStream> stream_;
base::test::SingleThreadTaskEnvironment task_environment_;
};
TEST_F(FeedStreamTest, IsArticlesListVisibleByDefault) {
......@@ -284,5 +339,55 @@ TEST_F(FeedStreamTest, DetachSurface) {
EXPECT_FALSE(surface.update);
}
TEST_F(FeedStreamTest, AttachSurfaceTriggersModelLoad) {
response_translator_.InjectResponse(MakeTypicalInitialModelState());
TestSurface surface;
stream_->AttachSurface(&surface);
base::RunLoop run_loop;
surface.on_initial_stream_state = run_loop.QuitClosure();
run_loop.Run();
EXPECT_TRUE(network_.query_request_sent);
EXPECT_TRUE(response_translator_.InjectedResponseConsumed());
ASSERT_TRUE(surface.initial_state);
ASSERT_EQ(2, surface.initial_state->updated_slices().size());
}
TEST_F(FeedStreamTest, DetachSurfaceWhileLoadingModel) {
response_translator_.InjectResponse(MakeTypicalInitialModelState());
TestSurface surface;
stream_->AttachSurface(&surface);
stream_->DetachSurface(&surface);
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(network_.query_request_sent);
EXPECT_FALSE(surface.initial_state);
}
TEST_F(FeedStreamTest, AttachMultipleSurfacesLoadsModelOnce) {
response_translator_.InjectResponse(MakeTypicalInitialModelState());
TestSurface surface;
TestSurface other_surface;
stream_->AttachSurface(&surface);
stream_->AttachSurface(&other_surface);
{
base::RunLoop run_loop;
surface.on_initial_stream_state = run_loop.QuitClosure();
run_loop.Run();
}
EXPECT_EQ(1, network_.send_query_call_count);
base::RunLoop().RunUntilIdle(); // Make sure tasks are complete.
// After load, another surface doesn't trigger any tasks.
TestSurface later_surface;
stream_->AttachSurface(&later_surface);
EXPECT_TRUE(IsTaskQueueIdle());
}
} // namespace
} // namespace feed
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/feed/core/v2/tasks/load_stream_task.h"
#include <memory>
#include <utility>
#include "base/logging.h"
#include "base/time/time.h"
#include "components/feed/core/proto/v2/wire/client_info.pb.h"
#include "components/feed/core/proto/v2/wire/feed_request.pb.h"
#include "components/feed/core/proto/v2/wire/request.pb.h"
#include "components/feed/core/v2/feed_network.h"
#include "components/feed/core/v2/feed_stream.h"
#include "components/feed/core/v2/stream_model.h"
#include "components/feed/core/v2/stream_model_update_request.h"
namespace feed {
LoadStreamTask::LoadStreamTask(FeedStream* stream,
base::OnceClosure done_callback)
: stream_(stream), done_callback_(std::move(done_callback)) {}
LoadStreamTask::~LoadStreamTask() = default;
void LoadStreamTask::Run() {
// TODO(harringtond): This logic is all provisional and should be rewritten.
// Don't load if the model is already loaded.
if (stream_->GetModel()) {
Done();
return;
}
// TODO(harringtond): Request parameters here are all placeholder values.
feedwire::Request request;
feedwire::ClientInfo& client_info =
*request.mutable_feed_request()->mutable_client_info();
client_info.set_platform_type(feedwire::ClientInfo::ANDROID_ID);
client_info.set_app_type(feedwire::ClientInfo::CHROME);
request.mutable_feed_request()->mutable_feed_query()->set_reason(
feedwire::FeedQuery::MANUAL_REFRESH);
fetch_start_time_ = base::TimeTicks::Now();
stream_->GetNetwork()->SendQueryRequest(
request,
base::BindOnce(&LoadStreamTask::QueryRequestComplete, GetWeakPtr()));
}
void LoadStreamTask::QueryRequestComplete(
FeedNetwork::QueryRequestResult result) {
DCHECK(!stream_->GetModel());
if (!result.response_body) {
Done();
return;
}
std::unique_ptr<StreamModelUpdateRequest> update_request =
stream_->GetWireResponseTranslator()->TranslateWireResponse(
*result.response_body, base::TimeTicks::Now() - fetch_start_time_);
if (!update_request) {
Done();
return;
}
auto model = std::make_unique<StreamModel>();
model->Update(std::move(update_request));
stream_->LoadModel(std::move(model));
Done();
}
void LoadStreamTask::Done() {
std::move(done_callback_).Run();
TaskComplete();
}
} // namespace feed
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_FEED_CORE_V2_TASKS_LOAD_STREAM_TASK_H_
#define COMPONENTS_FEED_CORE_V2_TASKS_LOAD_STREAM_TASK_H_
#include "components/offline_pages/task/task.h"
#include "base/callback.h"
#include "base/memory/weak_ptr.h"
#include "components/feed/core/v2/feed_network.h"
namespace feed {
class FeedStream;
// Loads the stream model from storage or network.
// TODO(harringtond): This is ultra-simplified so that we have something in
// in place temporarily. Right now, we just always fetch from network.
class LoadStreamTask : public offline_pages::Task {
public:
explicit LoadStreamTask(FeedStream* stream, base::OnceClosure done_callback);
~LoadStreamTask() override;
LoadStreamTask(const LoadStreamTask&) = delete;
LoadStreamTask& operator=(const LoadStreamTask&) = delete;
private:
void Run() override;
base::WeakPtr<LoadStreamTask> GetWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
void QueryRequestComplete(FeedNetwork::QueryRequestResult result);
void Done();
FeedStream* stream_; // Unowned.
base::TimeTicks fetch_start_time_;
base::OnceClosure done_callback_;
base::WeakPtrFactory<LoadStreamTask> weak_ptr_factory_{this};
};
} // namespace feed
#endif // COMPONENTS_FEED_CORE_V2_TASKS_LOAD_STREAM_TASK_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment