Commit b495e7ee authored by Yuwei Huang's avatar Yuwei Huang Committed by Commit Bot

[remoting host] Install a watchdog during peer connection teardown

This is a simpler approach to tackle (workaround) the blocked network
thread issue when compared to CL 2436278.

This CL simply creates a base::Watchdog that arms before closing the
peer connection, and disarms after the peer connection is closed. If
thread join takes longer than expected then it will crash the host so
that the daemon process can respawn it.

Bug: 1130090
Change-Id: Ib2400e17b848ed07df902a02b344efbf5e15c588
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2446840Reviewed-by: default avatarLambros Lambrou <lambroslambrou@chromium.org>
Commit-Queue: Yuwei Huang <yuweih@chromium.org>
Cr-Commit-Position: refs/heads/master@{#813937}
parent f8e6df88
......@@ -22,6 +22,7 @@
#include "base/task_runner_util.h"
#include "base/threading/thread_restrictions.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/threading/watchdog.h"
#include "jingle/glue/thread_wrapper.h"
#include "jingle/glue/utils.h"
#include "remoting/base/constants.h"
......@@ -88,6 +89,11 @@ constexpr base::TimeDelta kDefaultDataChannelStatePollingInterval =
constexpr base::TimeDelta kWaitForDataChannelsClosedTimeout =
base::TimeDelta::FromSeconds(5);
// The maximum amount of time we will wait for a thread join before we crash the
// host.
constexpr base::TimeDelta kWaitForThreadJoinTimeout =
base::TimeDelta::FromSeconds(30);
base::TimeDelta data_channel_state_polling_interval =
kDefaultDataChannelStatePollingInterval;
......@@ -329,6 +335,26 @@ class RtcEventLogOutput : public webrtc::RtcEventLogOutput {
WebrtcEventLogData& event_log_data_;
};
// Helper class to monitor the thread join process (on a temporary thread) when
// tearing down the peer connection, which has been observed to occasionally
// block the network thread and zombify the host. This class crashes the ME2ME
// host if the thread join process takes too long, so that the ME2ME daemon
// process can respawn the host.
// See: crbug.com/1130090
class ThreadJoinWatchdog : public base::Watchdog {
public:
ThreadJoinWatchdog()
: base::Watchdog(kWaitForThreadJoinTimeout,
"WebRTC Thread Join Watchdog",
/* enabled= */ true) {}
~ThreadJoinWatchdog() override = default;
void Alarm() override {
// Crash the host if thread join takes too long.
CHECK(false) << "WebRTC thread join process timed out.";
}
};
} // namespace
class WebrtcTransport::PeerConnectionWrapper
......@@ -383,9 +409,13 @@ class WebrtcTransport::PeerConnectionWrapper
dependencies.allocator = std::move(port_allocator);
peer_connection_ = peer_connection_factory_->CreatePeerConnection(
rtc_config, std::move(dependencies));
thread_join_watchdog_ = std::make_unique<ThreadJoinWatchdog>();
}
~PeerConnectionWrapper() override {
thread_join_watchdog_->Arm();
// PeerConnection creates threads internally, which are joined when the
// connection is closed. See crbug.com/660081.
ScopedAllowThreadJoinForWebRtcTransport allow_thread_join;
......@@ -393,6 +423,11 @@ class WebrtcTransport::PeerConnectionWrapper
peer_connection_ = nullptr;
peer_connection_factory_ = nullptr;
audio_module_ = nullptr;
if (before_disarm_thread_join_watchdog_callback_) {
std::move(before_disarm_thread_join_watchdog_callback_).Run();
}
thread_join_watchdog_->Disarm();
}
WebrtcAudioModule* audio_module() {
......@@ -407,6 +442,14 @@ class WebrtcTransport::PeerConnectionWrapper
return peer_connection_factory_.get();
}
void SetThreadJoinWatchdogForTests(std::unique_ptr<base::Watchdog> watchdog) {
thread_join_watchdog_ = std::move(watchdog);
}
void SetBeforeDisarmThreadJoinWatchdogCallbackForTests(base::OnceClosure cb) {
before_disarm_thread_join_watchdog_callback_ = std::move(cb);
}
// webrtc::PeerConnectionObserver interface.
void OnSignalingChange(
webrtc::PeerConnectionInterface::SignalingState new_state) override {
......@@ -457,6 +500,8 @@ class WebrtcTransport::PeerConnectionWrapper
scoped_refptr<webrtc::PeerConnectionFactoryInterface>
peer_connection_factory_;
scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
std::unique_ptr<base::Watchdog> thread_join_watchdog_;
base::OnceClosure before_disarm_thread_join_watchdog_callback_;
base::WeakPtr<WebrtcTransport> transport_;
......@@ -1274,5 +1319,18 @@ void WebrtcTransport::StopRtcEventLogging() {
}
}
void WebrtcTransport::SetThreadJoinWatchdogForTests(
std::unique_ptr<base::Watchdog> watchdog) {
peer_connection_wrapper_->SetThreadJoinWatchdogForTests( // IN-TEST
std::move(watchdog));
}
void WebrtcTransport::SetBeforeDisarmThreadJoinWatchdogCallbackForTests(
base::OnceClosure cb) {
peer_connection_wrapper_
->SetBeforeDisarmThreadJoinWatchdogCallbackForTests( // IN-TEST
std::move(cb));
}
} // namespace protocol
} // namespace remoting
......@@ -10,6 +10,7 @@
#include <tuple>
#include <vector>
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
......@@ -27,6 +28,12 @@
#include "remoting/signaling/signal_strategy.h"
#include "third_party/webrtc/api/peer_connection_interface.h"
namespace base {
class Watchdog;
} // namespace base
namespace remoting {
namespace protocol {
......@@ -134,6 +141,14 @@ class WebrtcTransport : public Transport,
static void SetDataChannelPollingIntervalForTests(
base::TimeDelta data_channel_state_polling_interval);
// Replaces the watchdog that monitors the thread join process when the peer
// connection is being torn down.
void SetThreadJoinWatchdogForTests(std::unique_ptr<base::Watchdog> watchdog);
// Sets a callback to be executed before disarming the thread join watchdog.
// Only used for testing.
void SetBeforeDisarmThreadJoinWatchdogCallbackForTests(base::OnceClosure cb);
private:
// PeerConnectionWrapper is responsible for PeerConnection creation,
// ownership. It passes all events to the corresponding methods below. This is
......
......@@ -11,7 +11,11 @@
#include "base/macros.h"
#include "base/run_loop.h"
#include "base/strings/string_util.h"
#include "base/test/bind_test_util.h"
#include "base/test/task_environment.h"
#include "base/threading/platform_thread.h"
#include "base/threading/watchdog.h"
#include "base/time/time.h"
#include "jingle/glue/thread_wrapper.h"
#include "net/base/io_buffer.h"
#include "net/url_request/url_request_context_getter.h"
......@@ -35,6 +39,9 @@ namespace {
const char kChannelName[] = "test_channel";
const char kAuthKey[] = "test_auth_key";
constexpr base::TimeDelta kWaitForThreadJoinTimeout =
base::TimeDelta::FromMilliseconds(200);
class TestTransportEventHandler : public WebrtcTransport::EventHandler {
public:
typedef base::RepeatingCallback<void(ErrorCode error)> ErrorCallback;
......@@ -147,6 +154,21 @@ class TestMessagePipeEventHandler : public MessagePipe::EventHandler {
DISALLOW_COPY_AND_ASSIGN(TestMessagePipeEventHandler);
};
class FakeThreadJoinWatchdog : public base::Watchdog {
public:
explicit FakeThreadJoinWatchdog(bool* alarm_triggered)
: base::Watchdog(kWaitForThreadJoinTimeout,
"Fake Thread Join Watchdog",
/* enabled= */ true),
alarm_triggered_(alarm_triggered) {}
~FakeThreadJoinWatchdog() override = default;
void Alarm() override { *alarm_triggered_ = true; }
private:
bool* alarm_triggered_;
};
} // namespace
class WebrtcTransportTest : public testing::Test {
......@@ -184,24 +206,35 @@ class WebrtcTransportTest : public testing::Test {
}
void InitializeConnection() {
host_transport_.reset(
new WebrtcTransport(jingle_glue::JingleThreadWrapper::current(),
TransportContext::ForTests(TransportRole::SERVER),
&host_event_handler_));
host_transport_ = std::make_unique<WebrtcTransport>(
jingle_glue::JingleThreadWrapper::current(),
TransportContext::ForTests(TransportRole::SERVER),
&host_event_handler_);
host_transport_->SetThreadJoinWatchdogForTests(
std::make_unique<FakeThreadJoinWatchdog>(
&host_thread_join_alarm_triggered_));
// If offer_to_receive_video and offer_to_receive_audio are both false,
// there must be a stream present in order to generate a valid SDP offer.
host_transport_->peer_connection()->AddTransceiver(
cricket::MEDIA_TYPE_VIDEO);
host_authenticator_.reset(new FakeAuthenticator(FakeAuthenticator::ACCEPT));
host_authenticator_ =
std::make_unique<FakeAuthenticator>(FakeAuthenticator::ACCEPT);
host_authenticator_->set_auth_key(kAuthKey);
client_transport_.reset(
new WebrtcTransport(jingle_glue::JingleThreadWrapper::current(),
TransportContext::ForTests(TransportRole::CLIENT),
&client_event_handler_));
client_authenticator_.reset(
new FakeAuthenticator(FakeAuthenticator::ACCEPT));
client_transport_ = std::make_unique<WebrtcTransport>(
jingle_glue::JingleThreadWrapper::current(),
TransportContext::ForTests(TransportRole::CLIENT),
&client_event_handler_);
client_transport_->SetThreadJoinWatchdogForTests(
std::make_unique<FakeThreadJoinWatchdog>(
&client_thread_join_alarm_triggered_));
client_authenticator_ =
std::make_unique<FakeAuthenticator>(FakeAuthenticator::ACCEPT);
client_authenticator_->set_auth_key(kAuthKey);
}
......@@ -236,7 +269,7 @@ class WebrtcTransportTest : public testing::Test {
base::BindRepeating(&WebrtcTransportTest::QuitRunLoopOnCounter,
base::Unretained(this), &counter));
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
host_event_handler_.set_connected_callback({});
......@@ -311,10 +344,12 @@ class WebrtcTransportTest : public testing::Test {
std::unique_ptr<WebrtcTransport> host_transport_;
TestTransportEventHandler host_event_handler_;
std::unique_ptr<FakeAuthenticator> host_authenticator_;
bool host_thread_join_alarm_triggered_ = false;
std::unique_ptr<WebrtcTransport> client_transport_;
TestTransportEventHandler client_event_handler_;
std::unique_ptr<FakeAuthenticator> client_authenticator_;
bool client_thread_join_alarm_triggered_ = false;
std::unique_ptr<MessagePipe> client_message_pipe_;
TestMessagePipeEventHandler client_message_pipe_event_handler_;
......@@ -338,7 +373,7 @@ TEST_F(WebrtcTransportTest, InvalidAuthKey) {
client_authenticator_->set_auth_key("Incorrect Key");
StartConnection();
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
EXPECT_EQ(AUTHENTICATION_FAILED, client_error_);
......@@ -353,7 +388,7 @@ TEST_F(WebrtcTransportTest, DataStream) {
InitializeConnection();
StartConnection();
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
EXPECT_TRUE(client_message_pipe_);
......@@ -363,7 +398,7 @@ TEST_F(WebrtcTransportTest, DataStream) {
message.set_text("Hello");
host_message_pipe_->Send(&message, {});
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
client_message_pipe_event_handler_.set_message_callback(
run_loop_->QuitClosure());
run_loop_->Run();
......@@ -384,7 +419,7 @@ TEST_F(WebrtcTransportTest, DataStreamLate) {
ExpectClientDataStream();
CreateHostDataStream();
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
EXPECT_TRUE(client_message_pipe_);
......@@ -399,7 +434,7 @@ TEST_F(WebrtcTransportTest, TerminateDataChannel) {
ExpectClientDataStream();
CreateHostDataStream();
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
EXPECT_TRUE(client_message_pipe_);
......@@ -416,7 +451,7 @@ TEST_F(WebrtcTransportTest, TerminateDataChannel) {
// the other side.
client_message_pipe_.reset();
run_loop_.reset(new base::RunLoop());
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
// Check that OnHostChannelClosed() has been called.
......@@ -424,5 +459,73 @@ TEST_F(WebrtcTransportTest, TerminateDataChannel) {
EXPECT_FALSE(host_message_pipe_);
}
TEST_F(WebrtcTransportTest,
ThreadJoinBlockedDuringConnectionTeardown_WatchdogFired) {
InitializeConnection();
int counter = 2;
auto block_before_disarm = base::BindLambdaForTesting([&]() {
base::PlatformThread::Sleep(kWaitForThreadJoinTimeout * 2);
QuitRunLoopOnCounter(&counter);
});
host_transport_->SetBeforeDisarmThreadJoinWatchdogCallbackForTests(
block_before_disarm);
client_transport_->SetBeforeDisarmThreadJoinWatchdogCallbackForTests(
block_before_disarm);
StartConnection();
WaitUntilConnected();
ExpectClientDataStream();
CreateHostDataStream();
// Run loop for starting the data stream.
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
// Run loop for deleting the transports.
run_loop_ = std::make_unique<base::RunLoop>();
host_transport_.reset();
client_transport_.reset();
run_loop_->Run();
EXPECT_EQ(true, host_thread_join_alarm_triggered_);
EXPECT_EQ(true, client_thread_join_alarm_triggered_);
}
TEST_F(WebrtcTransportTest,
ThreadJoinNotBlockedDuringConnectionTeardown_WatchdogNotFired) {
InitializeConnection();
int counter = 2;
auto not_block_before_disarm =
base::BindLambdaForTesting([&]() { QuitRunLoopOnCounter(&counter); });
host_transport_->SetBeforeDisarmThreadJoinWatchdogCallbackForTests(
not_block_before_disarm);
client_transport_->SetBeforeDisarmThreadJoinWatchdogCallbackForTests(
not_block_before_disarm);
StartConnection();
WaitUntilConnected();
ExpectClientDataStream();
CreateHostDataStream();
// Run loop for starting the data stream.
run_loop_ = std::make_unique<base::RunLoop>();
run_loop_->Run();
// Run loop for deleting the transports.
run_loop_ = std::make_unique<base::RunLoop>();
host_transport_.reset();
client_transport_.reset();
run_loop_->Run();
EXPECT_EQ(false, host_thread_join_alarm_triggered_);
EXPECT_EQ(false, client_thread_join_alarm_triggered_);
}
} // namespace protocol
} // namespace remoting
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment