Commit 58a0f880 authored by Tarun Bansal's avatar Tarun Bansal Committed by Commit Bot

Use a global throttler for Socket Watcher in the Network Quality Estimator (NQE)

Bug: 786074
Cq-Include-Trybots: master.tryserver.chromium.android:android_cronet_tester;master.tryserver.chromium.mac:ios-simulator-cronet
Change-Id: I63330a3ababd79f58fb19faa3531411d179122a4
Reviewed-on: https://chromium-review.googlesource.com/768525Reviewed-by: default avatarBen Greenstein <bengr@chromium.org>
Commit-Queue: Tarun Bansal <tbansal@chromium.org>
Cr-Commit-Position: refs/heads/master@{#517873}
parent ec602d0e
......@@ -278,6 +278,8 @@ NetworkQualityEstimator::NetworkQualityEstimator(
params_->min_socket_watcher_notification_interval(),
base::Bind(&NetworkQualityEstimator::OnUpdatedTransportRTTAvailable,
base::Unretained(this)),
base::Bind(&NetworkQualityEstimator::ShouldSocketWatcherNotifyRTT,
base::Unretained(this)),
tick_clock_.get()));
// Record accuracy after a 15 second interval. The values used here must
......@@ -795,6 +797,7 @@ void NetworkQualityEstimator::OnConnectionTypeChanged(
new_rtt_observations_since_last_ect_computation_ = 0;
new_throughput_observations_since_last_ect_computation_ = 0;
transport_rtt_observation_count_last_ect_computation_ = 0;
last_socket_watcher_rtt_notification_ = base::TimeTicks();
estimated_quality_at_last_main_frame_ = nqe::internal::NetworkQuality();
cached_estimate_applied_ = false;
......@@ -1657,6 +1660,7 @@ void NetworkQualityEstimator::SetTickClockForTesting(
http_downstream_throughput_kbps_observations_.SetTickClockForTesting(
tick_clock_.get());
throughput_analyzer_->SetTickClockForTesting(tick_clock_.get());
watcher_factory_->SetTickClockForTesting(tick_clock_.get());
}
double NetworkQualityEstimator::RandDouble() const {
......@@ -1705,6 +1709,11 @@ void NetworkQualityEstimator::AddAndNotifyObserversOfRTT(
break;
}
if (observation.source() == NETWORK_QUALITY_OBSERVATION_SOURCE_TCP ||
observation.source() == NETWORK_QUALITY_OBSERVATION_SOURCE_QUIC) {
last_socket_watcher_rtt_notification_ = tick_clock_->NowTicks();
}
UMA_HISTOGRAM_ENUMERATION("NQE.RTT.ObservationSource", observation.source(),
NETWORK_QUALITY_OBSERVATION_SOURCE_MAX);
......@@ -2029,4 +2038,11 @@ bool NetworkQualityEstimator::ShouldAddObservation(
return true;
}
bool NetworkQualityEstimator::ShouldSocketWatcherNotifyRTT(
base::TimeTicks now) {
DCHECK(thread_checker_.CalledOnValidThread());
return (now - last_socket_watcher_rtt_notification_ >=
params_->socket_watchers_min_notification_interval());
}
} // namespace net
......@@ -551,6 +551,10 @@ class NET_EXPORT NetworkQualityEstimator
// Returns true if |observation| should be added to the observation buffer.
bool ShouldAddObservation(const Observation& observation) const;
// Returns true if the socket watcher can run the callback to notify the RTT
// observations.
bool ShouldSocketWatcherNotifyRTT(base::TimeTicks now);
// Determines if the requests to local host can be used in estimating the
// network quality. Set to true only for tests.
bool use_localhost_requests_;
......@@ -678,6 +682,9 @@ class NET_EXPORT NetworkQualityEstimator
// Manages the writing of events to the net log.
nqe::internal::EventCreator event_creator_;
// Time when the last RTT observation from a socket watcher was received.
base::TimeTicks last_socket_watcher_rtt_notification_;
base::WeakPtrFactory<NetworkQualityEstimator> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(NetworkQualityEstimator);
......
......@@ -437,6 +437,11 @@ NetworkQualityEstimatorParams::NetworkQualityEstimatorParams(
params_,
"add_default_platform_observations",
"true") == "true"),
socket_watchers_min_notification_interval_(
base::TimeDelta::FromMilliseconds(GetValueForVariationParam(
params_,
"socket_watchers_min_notification_interval_msec",
200))),
use_small_responses_(false) {
DCHECK_LE(0.0, correlation_uma_logging_probability_);
DCHECK_GE(1.0, correlation_uma_logging_probability_);
......
......@@ -223,6 +223,12 @@ class NET_EXPORT NetworkQualityEstimatorParams {
// ObservationBuffer.
size_t observation_buffer_size() const { return 300; }
// Minimun interval between consecutive notifications from socket
// watchers who live on the same thread as the network quality estimator.
base::TimeDelta socket_watchers_min_notification_interval() const {
return socket_watchers_min_notification_interval_;
}
private:
// Map containing all field trial parameters related to
// NetworkQualityEstimator field trial.
......@@ -245,6 +251,7 @@ class NET_EXPORT NetworkQualityEstimatorParams {
const int hanging_request_duration_http_rtt_multiplier_;
const base::TimeDelta hanging_request_min_duration_;
const bool add_default_platform_observations_;
const base::TimeDelta socket_watchers_min_notification_interval_;
bool use_small_responses_;
......
......@@ -2243,6 +2243,66 @@ TEST(NetworkQualityEstimatorTest, TestRttThroughputObservers) {
estimator.GetRecentTransportRTT(base::TimeTicks(), &rtt, nullptr));
}
TEST(NetworkQualityEstimatorTest, TestGlobalSocketWatcherThrottle) {
std::unique_ptr<base::SimpleTestTickClock> tick_clock(
new base::SimpleTestTickClock());
base::SimpleTestTickClock* tick_clock_ptr = tick_clock.get();
tick_clock_ptr->Advance(base::TimeDelta::FromSeconds(1));
std::map<std::string, std::string> variation_params;
variation_params["add_default_platform_observations"] = "false";
TestNetworkQualityEstimator estimator(variation_params);
estimator.SetTickClockForTesting(std::move(tick_clock));
TestRTTObserver rtt_observer;
estimator.AddRTTObserver(&rtt_observer);
const base::TimeDelta tcp_rtt(base::TimeDelta::FromMilliseconds(1));
TestDelegate test_delegate;
TestURLRequestContext context(true);
context.set_network_quality_estimator(&estimator);
context.Init();
// Use a public IP address so that the socket watcher runs the RTT callback.
IPAddressList ip_list;
IPAddress ip_address;
ASSERT_TRUE(ip_address.AssignFromIPLiteral("157.0.0.1"));
ip_list.push_back(ip_address);
AddressList address_list =
AddressList::CreateFromIPAddressList(ip_list, "canonical.example.com");
std::unique_ptr<SocketPerformanceWatcher> tcp_watcher =
estimator.GetSocketPerformanceWatcherFactory()
->CreateSocketPerformanceWatcher(
SocketPerformanceWatcherFactory::PROTOCOL_TCP, address_list);
EXPECT_EQ(0U, rtt_observer.observations().size());
EXPECT_TRUE(tcp_watcher->ShouldNotifyUpdatedRTT());
std::unique_ptr<URLRequest> request(
context.CreateRequest(estimator.GetEchoURL(), DEFAULT_PRIORITY,
&test_delegate, TRAFFIC_ANNOTATION_FOR_TESTS));
request->SetLoadFlags(request->load_flags() | LOAD_MAIN_FRAME_DEPRECATED);
request->Start();
base::RunLoop().Run();
EXPECT_EQ(1U, rtt_observer.observations().size());
EXPECT_TRUE(tcp_watcher->ShouldNotifyUpdatedRTT());
tcp_watcher->OnUpdatedRTTAvailable(tcp_rtt);
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(tcp_watcher->ShouldNotifyUpdatedRTT());
EXPECT_EQ(2U, rtt_observer.observations().size());
// Advancing the clock should make it possible to notify new RTT
// notifications.
tick_clock_ptr->Advance(
estimator.params()->socket_watchers_min_notification_interval());
EXPECT_TRUE(tcp_watcher->ShouldNotifyUpdatedRTT());
EXPECT_EQ(tcp_rtt.InMilliseconds(), rtt_observer.observations().at(1).rtt_ms);
base::TimeDelta rtt;
EXPECT_TRUE(
estimator.GetRecentTransportRTT(base::TimeTicks(), &rtt, nullptr));
}
// TestTCPSocketRTT requires kernel support for tcp_info struct, and so it is
// enabled only on certain platforms.
#if defined(TCP_INFO) || defined(OS_LINUX)
......@@ -2253,6 +2313,11 @@ TEST(NetworkQualityEstimatorTest, TestRttThroughputObservers) {
// Tests that the TCP socket notifies the Network Quality Estimator of TCP RTTs,
// which in turn notifies registered RTT observers.
TEST(NetworkQualityEstimatorTest, MAYBE_TestTCPSocketRTT) {
std::unique_ptr<base::SimpleTestTickClock> tick_clock(
new base::SimpleTestTickClock());
base::SimpleTestTickClock* tick_clock_ptr = tick_clock.get();
tick_clock_ptr->Advance(base::TimeDelta::FromSeconds(1));
base::HistogramTester histogram_tester;
TestRTTObserver rtt_observer;
......@@ -2262,6 +2327,7 @@ TEST(NetworkQualityEstimatorTest, MAYBE_TestTCPSocketRTT) {
TestNetworkQualityEstimator estimator(
nullptr, variation_params, true, true,
std::make_unique<BoundTestNetLog>());
estimator.SetTickClockForTesting(std::move(tick_clock));
estimator.SimulateNetworkChange(
NetworkChangeNotifier::ConnectionType::CONNECTION_2G, "test");
......@@ -2305,6 +2371,9 @@ TEST(NetworkQualityEstimatorTest, MAYBE_TestTCPSocketRTT) {
&test_delegate, TRAFFIC_ANNOTATION_FOR_TESTS));
request->SetLoadFlags(request->load_flags() | LOAD_MAIN_FRAME_DEPRECATED);
request->Start();
tick_clock_ptr->Advance(
estimator.params()->socket_watchers_min_notification_interval());
base::RunLoop().Run();
size_t after_count_tcp_rtt_observations = 0;
......
......@@ -60,10 +60,12 @@ SocketWatcher::SocketWatcher(
bool allow_rtt_private_address,
scoped_refptr<base::SingleThreadTaskRunner> task_runner,
OnUpdatedRTTAvailableCallback updated_rtt_observation_callback,
ShouldNotifyRTTCallback should_notify_rtt_callback,
base::TickClock* tick_clock)
: protocol_(protocol),
task_runner_(std::move(task_runner)),
updated_rtt_observation_callback_(updated_rtt_observation_callback),
should_notify_rtt_callback_(should_notify_rtt_callback),
rtt_notifications_minimum_interval_(min_notification_interval),
run_rtt_callback_(allow_rtt_private_address ||
(!address_list.empty() &&
......@@ -80,12 +82,25 @@ SocketWatcher::~SocketWatcher() {}
bool SocketWatcher::ShouldNotifyUpdatedRTT() const {
DCHECK(thread_checker_.CalledOnValidThread());
if (!run_rtt_callback_)
return false;
const base::TimeTicks now = tick_clock_->NowTicks();
if (task_runner_->RunsTasksInCurrentSequence()) {
// Enables socket watcher to send more frequent RTT observations when very
// few sockets are receiving data.
if (should_notify_rtt_callback_.Run(now))
return true;
}
// Do not allow incoming notifications if the last notification was more
// recent than |rtt_notifications_minimum_interval_| ago. This helps in
// reducing the overhead of obtaining the RTT values.
return run_rtt_callback_ &&
tick_clock_->NowTicks() - last_rtt_notification_ >=
rtt_notifications_minimum_interval_;
// Enables a socket watcher to send RTT observation, helps in reducing
// starvation by allowing every socket watcher to notify at least one RTT
// notification every |rtt_notifications_minimum_interval_| duration.
return now - last_rtt_notification_ >= rtt_notifications_minimum_interval_;
}
void SocketWatcher::OnUpdatedRTTAvailable(const base::TimeDelta& rtt) {
......
......@@ -35,6 +35,8 @@ typedef base::Callback<void(SocketPerformanceWatcherFactory::Protocol protocol,
const base::Optional<nqe::internal::IPHash>& host)>
OnUpdatedRTTAvailableCallback;
typedef base::Callback<bool(base::TimeTicks)> ShouldNotifyRTTCallback;
} // namespace
namespace nqe {
......@@ -53,12 +55,16 @@ class NET_EXPORT_PRIVATE SocketWatcher : public SocketPerformanceWatcher {
// |allow_rtt_private_address| is true if |updated_rtt_observation_callback|
// should be called when RTT observation from a socket connected to private
// address is received. |tick_clock| is guaranteed to be non-null.
// |should_notify_rtt_callback| callback should be called back on
// |task_runner| by the created socket watchers to check if RTT observation
// should be taken and notified.
SocketWatcher(SocketPerformanceWatcherFactory::Protocol protocol,
const AddressList& address_list,
base::TimeDelta min_notification_interval,
bool allow_rtt_private_address,
scoped_refptr<base::SingleThreadTaskRunner> task_runner,
OnUpdatedRTTAvailableCallback updated_rtt_observation_callback,
ShouldNotifyRTTCallback should_notify_rtt_callback,
base::TickClock* tick_clock);
~SocketWatcher() override;
......@@ -77,6 +83,10 @@ class NET_EXPORT_PRIVATE SocketWatcher : public SocketPerformanceWatcher {
// Called every time a new RTT observation is available.
OnUpdatedRTTAvailableCallback updated_rtt_observation_callback_;
// Called to determine if the RTT notification should be notified using
// |updated_rtt_observation_callback_|.
ShouldNotifyRTTCallback should_notify_rtt_callback_;
// Minimum interval betweeen consecutive incoming notifications.
const base::TimeDelta rtt_notifications_minimum_interval_;
......
......@@ -17,11 +17,13 @@ SocketWatcherFactory::SocketWatcherFactory(
scoped_refptr<base::SingleThreadTaskRunner> task_runner,
base::TimeDelta min_notification_interval,
OnUpdatedRTTAvailableCallback updated_rtt_observation_callback,
ShouldNotifyRTTCallback should_notify_rtt_callback,
base::TickClock* tick_clock)
: task_runner_(std::move(task_runner)),
min_notification_interval_(min_notification_interval),
allow_rtt_private_address_(false),
updated_rtt_observation_callback_(updated_rtt_observation_callback),
should_notify_rtt_callback_(should_notify_rtt_callback),
tick_clock_(tick_clock) {
DCHECK(tick_clock_);
}
......@@ -35,7 +37,12 @@ SocketWatcherFactory::CreateSocketPerformanceWatcher(
return std::make_unique<SocketWatcher>(
protocol, address_list, min_notification_interval_,
allow_rtt_private_address_, task_runner_,
updated_rtt_observation_callback_, tick_clock_);
updated_rtt_observation_callback_, should_notify_rtt_callback_,
tick_clock_);
}
void SocketWatcherFactory::SetTickClockForTesting(base::TickClock* tick_clock) {
tick_clock_ = tick_clock;
}
} // namespace internal
......
......@@ -26,11 +26,15 @@ class TimeDelta;
namespace net {
namespace {
typedef base::Callback<void(SocketPerformanceWatcherFactory::Protocol protocol,
const base::TimeDelta& rtt,
const base::Optional<nqe::internal::IPHash>& host)>
OnUpdatedRTTAvailableCallback;
}
typedef base::Callback<bool(base::TimeTicks)> ShouldNotifyRTTCallback;
} // namespace
namespace nqe {
......@@ -45,11 +49,14 @@ class SocketWatcherFactory : public SocketPerformanceWatcherFactory {
// |task_runner| every time a new RTT observation is available.
// |min_notification_interval| is the minimum interval betweeen consecutive
// notifications to the socket watchers created by this factory. |tick_clock|
// is guaranteed to be non-null.
// is guaranteed to be non-null. |should_notify_rtt_callback| is the callback
// that should be called back on |task_runner| to check if RTT observation
// should be taken and notified.
SocketWatcherFactory(
scoped_refptr<base::SingleThreadTaskRunner> task_runner,
base::TimeDelta min_notification_interval,
OnUpdatedRTTAvailableCallback updated_rtt_observation_callback,
ShouldNotifyRTTCallback should_notify_rtt_callback,
base::TickClock* tick_clock);
~SocketWatcherFactory() override;
......@@ -63,6 +70,9 @@ class SocketWatcherFactory : public SocketPerformanceWatcherFactory {
allow_rtt_private_address_ = use_localhost_requests;
}
// Overrides the tick clock used by |this| for testing.
void SetTickClockForTesting(base::TickClock* tick_clock);
private:
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
......@@ -77,6 +87,10 @@ class SocketWatcherFactory : public SocketPerformanceWatcherFactory {
// Called every time a new RTT observation is available.
OnUpdatedRTTAvailableCallback updated_rtt_observation_callback_;
// Callback that should be called by socket watchers to determine if the RTT
// notification should be notified using |updated_rtt_observation_callback_|.
ShouldNotifyRTTCallback should_notify_rtt_callback_;
base::TickClock* tick_clock_;
DISALLOW_COPY_AND_ASSIGN(SocketWatcherFactory);
......
......@@ -51,6 +51,14 @@ class NetworkQualitySocketWatcherTest : public testing::Test {
callback_executed_ = true;
}
static void SetShouldNotifyRTTCallback(bool value) {
should_notify_rtt_callback_ = value;
}
static bool ShouldNotifyRTTCallback(base::TimeTicks now) {
return should_notify_rtt_callback_;
}
static void VerifyCallbackParams(const base::TimeDelta& rtt,
const base::Optional<IPHash>& host) {
ASSERT_TRUE(callback_executed_);
......@@ -66,6 +74,7 @@ class NetworkQualitySocketWatcherTest : public testing::Test {
callback_rtt_ = base::TimeDelta::FromMilliseconds(0);
callback_host_ = base::nullopt;
callback_executed_ = false;
should_notify_rtt_callback_ = false;
}
static base::TimeDelta callback_rtt() { return callback_rtt_; }
......@@ -74,6 +83,7 @@ class NetworkQualitySocketWatcherTest : public testing::Test {
static base::TimeDelta callback_rtt_;
static base::Optional<IPHash> callback_host_;
static bool callback_executed_;
static bool should_notify_rtt_callback_;
DISALLOW_COPY_AND_ASSIGN(NetworkQualitySocketWatcherTest);
};
......@@ -86,6 +96,8 @@ base::Optional<IPHash> NetworkQualitySocketWatcherTest::callback_host_ =
bool NetworkQualitySocketWatcherTest::callback_executed_ = false;
bool NetworkQualitySocketWatcherTest::should_notify_rtt_callback_ = false;
// Verify that the buffer size is never exceeded.
TEST_F(NetworkQualitySocketWatcherTest, NotificationsThrottled) {
base::SimpleTestTickClock tick_clock;
......@@ -99,11 +111,11 @@ TEST_F(NetworkQualitySocketWatcherTest, NotificationsThrottled) {
AddressList address_list =
AddressList::CreateFromIPAddressList(ip_list, "canonical.example.com");
SocketWatcher socket_watcher(SocketPerformanceWatcherFactory::PROTOCOL_TCP,
address_list,
base::TimeDelta::FromMilliseconds(2000), false,
base::ThreadTaskRunnerHandle::Get(),
base::Bind(OnUpdatedRTTAvailable), &tick_clock);
SocketWatcher socket_watcher(
SocketPerformanceWatcherFactory::PROTOCOL_TCP, address_list,
base::TimeDelta::FromMilliseconds(2000), false,
base::ThreadTaskRunnerHandle::Get(), base::Bind(OnUpdatedRTTAvailable),
base::Bind(ShouldNotifyRTTCallback), &tick_clock);
EXPECT_TRUE(socket_watcher.ShouldNotifyUpdatedRTT());
socket_watcher.OnUpdatedRTTAvailable(base::TimeDelta::FromSeconds(10));
......@@ -120,6 +132,14 @@ TEST_F(NetworkQualitySocketWatcherTest, NotificationsThrottled) {
// 2000 msec more than the last time |socket_watcher| received a notification.
tick_clock.Advance(base::TimeDelta::FromMilliseconds(1000));
EXPECT_TRUE(socket_watcher.ShouldNotifyUpdatedRTT());
ResetExpectedCallbackParams();
socket_watcher.OnUpdatedRTTAvailable(base::TimeDelta::FromSeconds(10));
EXPECT_FALSE(socket_watcher.ShouldNotifyUpdatedRTT());
// RTT notification is allowed by the global check.
SetShouldNotifyRTTCallback(true);
EXPECT_TRUE(socket_watcher.ShouldNotifyUpdatedRTT());
}
TEST_F(NetworkQualitySocketWatcherTest, QuicFirstNotificationDropped) {
......@@ -138,7 +158,8 @@ TEST_F(NetworkQualitySocketWatcherTest, QuicFirstNotificationDropped) {
SocketPerformanceWatcherFactory::PROTOCOL_QUIC, address_list,
base::TimeDelta::FromMilliseconds(2000), false,
base::ThreadTaskRunnerHandle::Get(),
base::Bind(OnUpdatedRTTAvailableStoreParams), &tick_clock);
base::Bind(OnUpdatedRTTAvailableStoreParams),
base::Bind(ShouldNotifyRTTCallback), &tick_clock);
EXPECT_TRUE(socket_watcher.ShouldNotifyUpdatedRTT());
socket_watcher.OnUpdatedRTTAvailable(base::TimeDelta::FromSeconds(10));
......@@ -147,6 +168,7 @@ TEST_F(NetworkQualitySocketWatcherTest, QuicFirstNotificationDropped) {
// be possible to notify the |socket_watcher| again.
EXPECT_TRUE(NetworkQualitySocketWatcherTest::callback_rtt().is_zero());
EXPECT_TRUE(socket_watcher.ShouldNotifyUpdatedRTT());
ResetExpectedCallbackParams();
socket_watcher.OnUpdatedRTTAvailable(base::TimeDelta::FromSeconds(2));
base::RunLoop().RunUntilIdle();
......@@ -191,7 +213,7 @@ TEST_F(NetworkQualitySocketWatcherTest, PrivateAddressRTTNotNotified) {
SocketPerformanceWatcherFactory::PROTOCOL_TCP, address_list,
base::TimeDelta::FromMilliseconds(2000), false,
base::ThreadTaskRunnerHandle::Get(), base::Bind(OnUpdatedRTTAvailable),
&tick_clock);
base::Bind(ShouldNotifyRTTCallback), &tick_clock);
EXPECT_EQ(test.expect_should_notify_rtt,
socket_watcher.ShouldNotifyUpdatedRTT());
......@@ -229,7 +251,8 @@ TEST_F(NetworkQualitySocketWatcherTest, RemoteHostIPHashComputedCorrectly) {
SocketPerformanceWatcherFactory::PROTOCOL_TCP, address_list,
base::TimeDelta::FromMilliseconds(2000), false,
base::ThreadTaskRunnerHandle::Get(),
base::Bind(OnUpdatedRTTAvailableStoreParams), &tick_clock);
base::Bind(OnUpdatedRTTAvailableStoreParams),
base::Bind(ShouldNotifyRTTCallback), &tick_clock);
EXPECT_TRUE(socket_watcher.ShouldNotifyUpdatedRTT());
socket_watcher.OnUpdatedRTTAvailable(base::TimeDelta::FromSeconds(10));
base::RunLoop().RunUntilIdle();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment