Commit 636d6b60 authored by Alex Clarke's avatar Alex Clarke Committed by Commit Bot

Fix leak of MessageLoop in base::Thread

This leak was introduced by https://crrev.com/c/1462801/ where
there was a confusion between MessageLoop and MessageLoopBase
(surprisingly the former owns the latter).

This patch fixes the leak and generally clarifies ownership.
We still need to change base::Thread::Options::task_environment to be a
std::unique_ptr but that's a larger refactor.

Change-Id: Ic5380df2fb9dca5cb0818ce8e8b90e05192c6e9d
BUG: 933925, 934088, 933014, 932857
Reviewed-on: https://chromium-review.googlesource.com/c/1480462
Commit-Queue: Alex Clarke <alexclarke@chromium.org>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Cr-Commit-Position: refs/heads/master@{#634462}
parent 3529bb25
......@@ -168,6 +168,12 @@ std::unique_ptr<MessageLoop> MessageLoop::CreateUnbound(Type type) {
return WrapUnique(new MessageLoop(type, nullptr));
}
// static
std::unique_ptr<MessageLoop> MessageLoop::CreateUnbound(
std::unique_ptr<MessagePump> custom_pump) {
return WrapUnique(new MessageLoop(TYPE_CUSTOM, std::move(custom_pump)));
}
MessageLoop::MessageLoop(Type type, std::unique_ptr<MessagePump> custom_pump)
: backend_(sequence_manager::internal::SequenceManagerImpl::CreateUnbound(
sequence_manager::SequenceManager::Settings{.message_loop_type =
......
......@@ -24,6 +24,10 @@
namespace base {
namespace internal {
class MessageLoopTaskEnvironment;
} // namespace internal
class MessageLoopImpl;
namespace sequence_manager {
......@@ -321,6 +325,7 @@ class BASE_EXPORT MessageLoop {
friend class MessageLoopTypedTest;
friend class ScheduleWorkTest;
friend class Thread;
friend class internal::MessageLoopTaskEnvironment;
friend class sequence_manager::internal::SequenceManagerImpl;
FRIEND_TEST_ALL_PREFIXES(MessageLoopTest, DeleteUnboundLoop);
......@@ -333,6 +338,8 @@ class BASE_EXPORT MessageLoop {
// Before BindToCurrentThread() is called, only Post*Task() functions can
// be called on the message loop.
static std::unique_ptr<MessageLoop> CreateUnbound(Type type);
static std::unique_ptr<MessageLoop> CreateUnbound(
std::unique_ptr<MessagePump> pump);
scoped_refptr<sequence_manager::TaskQueue> CreateDefaultTaskQueue();
......
......@@ -27,13 +27,6 @@
namespace base {
class ThreadForTest : public Thread {
public:
ThreadForTest() : Thread("test") {}
using Thread::message_loop_base;
};
class ScheduleWorkTest : public testing::Test {
public:
ScheduleWorkTest() : counter_(0) {}
......@@ -85,8 +78,16 @@ class ScheduleWorkTest : public testing::Test {
} else
#endif
{
target_.reset(new ThreadForTest());
target_->StartWithOptions(Thread::Options(target_type, 0u));
target_.reset(new Thread("test"));
Thread::Options options(target_type, 0u);
std::unique_ptr<MessageLoop> message_loop =
MessageLoop::CreateUnbound(target_type);
message_loop_ = message_loop.get();
options.task_environment =
new internal::MessageLoopTaskEnvironment(std::move(message_loop));
target_->StartWithOptions(options);
// Without this, it's possible for the scheduling threads to start and run
// before the target thread. In this case, the scheduling threads will
......@@ -179,11 +180,12 @@ class ScheduleWorkTest : public testing::Test {
if (java_thread_)
return java_thread_->message_loop()->GetMessageLoopBase();
#endif
return target_->message_loop_base();
return message_loop_->GetMessageLoopBase();
}
private:
std::unique_ptr<ThreadForTest> target_;
std::unique_ptr<Thread> target_;
MessageLoop* message_loop_;
#if defined(OS_ANDROID)
std::unique_ptr<android::JavaHandlerThread> java_thread_;
#endif
......
......@@ -9,6 +9,7 @@
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/third_party/dynamic_annotations/dynamic_annotations.h"
......@@ -78,7 +79,7 @@ bool Thread::Start() {
bool Thread::StartWithOptions(const Options& options) {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
DCHECK(!message_loop_base_);
DCHECK(!task_environment_);
DCHECK(!IsRunning());
DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
<< "not allowed!";
......@@ -93,19 +94,17 @@ bool Thread::StartWithOptions(const Options& options) {
SetThreadWasQuitProperly(false);
MessageLoop::Type type = options.message_loop_type;
if (!options.message_pump_factory.is_null())
type = MessageLoop::TYPE_CUSTOM;
timer_slack_ = options.timer_slack;
std::unique_ptr<MessageLoop> message_loop_owned;
if (options.message_loop_base) {
message_loop_base_ = options.message_loop_base;
if (options.task_environment) {
DCHECK(!options.message_pump_factory);
task_environment_ = WrapUnique(options.task_environment);
} else if (options.message_pump_factory) {
task_environment_ = std::make_unique<internal::MessageLoopTaskEnvironment>(
MessageLoop::CreateUnbound(options.message_pump_factory.Run()));
} else {
message_pump_factory_ = options.message_pump_factory;
message_loop_owned = MessageLoop::CreateUnbound(type);
message_loop_base_ = message_loop_owned->GetMessageLoopBase();
task_environment_ = std::make_unique<internal::MessageLoopTaskEnvironment>(
MessageLoop::CreateUnbound(options.message_loop_type));
}
start_event_.Reset();
......@@ -123,18 +122,12 @@ bool Thread::StartWithOptions(const Options& options) {
options.stack_size, this, options.priority);
if (!success) {
DLOG(ERROR) << "failed to create thread";
message_loop_base_ = nullptr;
return false;
}
}
joinable_ = options.joinable;
// The ownership of |message_loop_| is managed by the newly created thread
// within the ThreadMain.
ignore_result(message_loop_owned.release());
DCHECK(message_loop_base_);
return true;
}
......@@ -149,7 +142,7 @@ bool Thread::StartAndWaitForTesting() {
bool Thread::WaitUntilThreadStarted() const {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (!message_loop_base_)
if (!task_environment_)
return false;
// https://crbug.com/918039
base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
......@@ -159,7 +152,7 @@ bool Thread::WaitUntilThreadStarted() const {
void Thread::FlushForTesting() {
DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (!message_loop_base_)
if (!task_environment_)
return;
WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
......@@ -186,14 +179,14 @@ void Thread::Stop() {
// Wait for the thread to exit.
//
// TODO(darin): Unfortunately, we need to keep |message_loop_base_| around
// TODO(darin): Unfortunately, we need to keep |task_environment_| around
// until the thread exits. Some consumers are abusing the API. Make them stop.
PlatformThread::Join(thread_);
thread_ = base::PlatformThreadHandle();
// The thread should nullify |message_loop_base_| on exit (note: Join() adds
// The thread should release |task_environment_| on exit (note: Join() adds
// an implicit memory barrier and no lock is thus required for this check).
DCHECK(!message_loop_base_);
DCHECK(!task_environment_);
stopping_ = false;
}
......@@ -203,7 +196,7 @@ void Thread::StopSoon() {
// enable this check.
// DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (stopping_ || !message_loop_base_)
if (stopping_ || !task_environment_)
return;
stopping_ = true;
......@@ -229,11 +222,11 @@ bool Thread::IsRunning() const {
// enable this check.
// DCHECK(owning_sequence_checker_.CalledOnValidSequence());
// If the thread's already started (i.e. |message_loop_base_| is non-null) and
// If the thread's already started (i.e. |task_environment_| is non-null) and
// not yet requested to stop (i.e. |stopping_| is false) we can just return
// true. (Note that |stopping_| is touched only on the same sequence that
// starts / started the new thread so we need no locking here.)
if (message_loop_base_ && !stopping_)
if (task_environment_ && !stopping_)
return true;
// Otherwise check the |running_| flag, which is set to true by the new thread
// only while it is inside Run().
......@@ -257,7 +250,7 @@ void Thread::SetThreadWasQuitProperly(bool flag) {
// static
bool Thread::GetThreadWasQuitProperly() {
bool quit_properly = true;
#ifndef NDEBUG
#if DCHECK_IS_ON()
quit_properly = lazy_tls_bool.Pointer()->Get();
#endif
return quit_properly;
......@@ -280,23 +273,18 @@ void Thread::ThreadMain() {
ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
// Lazily initialize the |message_loop| so that it can run on this thread.
DCHECK(message_loop_base_);
std::unique_ptr<MessageLoopBase> message_loop_base(message_loop_base_);
DCHECK(task_environment_);
// This binds MessageLoopCurrent and ThreadTaskRunnerHandle.
message_loop_base_->BindToCurrentThread(
message_pump_factory_ ? message_pump_factory_.Run()
: MessageLoop::CreateMessagePumpForType(
message_loop_base_->GetType()));
task_environment_->BindToCurrentThread(timer_slack_);
DCHECK(MessageLoopCurrent::Get());
DCHECK(ThreadTaskRunnerHandle::Get());
message_loop_base_->SetTimerSlack(timer_slack_);
DCHECK(ThreadTaskRunnerHandle::IsSet());
#if defined(OS_POSIX) && !defined(OS_NACL)
// Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
if (MessageLoopCurrentForIO::IsSet()) {
file_descriptor_watcher.reset(
new FileDescriptorWatcher(message_loop_base_->GetTaskRunner()));
new FileDescriptorWatcher(task_environment_->GetDefaultTaskRunner()));
}
#endif
......@@ -335,16 +323,11 @@ void Thread::ThreadMain() {
com_initializer.reset();
#endif
if (message_loop_base->GetType() != MessageLoop::TYPE_CUSTOM) {
// Assert that RunLoop::QuitWhenIdle was called by ThreadQuitHelper. Don't
// check for custom message pumps, because their shutdown might not allow
// this.
DCHECK(GetThreadWasQuitProperly());
}
DCHECK(GetThreadWasQuitProperly());
// We can't receive messages anymore.
// (The message loop is destructed at the end of this block)
message_loop_base_ = nullptr;
task_environment_.reset();
run_loop_ = nullptr;
}
......@@ -354,4 +337,24 @@ void Thread::ThreadQuitHelper() {
SetThreadWasQuitProperly(true);
}
namespace internal {
MessageLoopTaskEnvironment::MessageLoopTaskEnvironment(
std::unique_ptr<MessageLoop> message_loop)
: message_loop_(std::move(message_loop)) {}
MessageLoopTaskEnvironment::~MessageLoopTaskEnvironment() {}
scoped_refptr<SingleThreadTaskRunner>
MessageLoopTaskEnvironment::GetDefaultTaskRunner() {
return message_loop_->task_runner();
}
void MessageLoopTaskEnvironment::BindToCurrentThread(TimerSlack timer_slack) {
message_loop_->BindToCurrentThread();
message_loop_->SetTimerSlack(timer_slack);
}
} // namespace internal
} // namespace base
......@@ -59,6 +59,18 @@ class RunLoop;
// Thread object (including ~Thread()).
class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
class BASE_EXPORT TaskEnvironment {
public:
virtual ~TaskEnvironment() {}
virtual scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() = 0;
// Binds a RunLoop::Delegate and TaskRunnerHandle to the thread. The
// underlying MessagePump will have its |timer_slack| set to the specified
// amount.
virtual void BindToCurrentThread(TimerSlack timer_slack) = 0;
};
struct BASE_EXPORT Options {
typedef Callback<std::unique_ptr<MessagePump>()> MessagePumpFactory;
......@@ -71,10 +83,10 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// This is ignored if message_pump_factory.is_null() is false.
MessageLoop::Type message_loop_type = MessageLoop::TYPE_DEFAULT;
// An unbound MessageLoopBase that will be bound to the thread. Ownership
// of |message_loop_base| will be transferred to the thread.
// An unbound TaskEnvironment that will be bound to the thread. Ownership
// of |task_environment| will be transferred to the thread.
// TODO(alexclarke): This should be a std::unique_ptr
MessageLoopBase* message_loop_base = nullptr;
TaskEnvironment* task_environment = nullptr;
// Specifies timer slack for thread message loop.
TimerSlack timer_slack = TIMER_SLACK_NONE;
......@@ -82,7 +94,8 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// Used to create the MessagePump for the MessageLoop. The callback is Run()
// on the thread. If message_pump_factory.is_null(), then a MessagePump
// appropriate for |message_loop_type| is created. Setting this forces the
// MessageLoop::Type to TYPE_CUSTOM.
// MessageLoop::Type to TYPE_CUSTOM. This is not compatible with a non-null
// |task_environment|.
MessagePumpFactory message_pump_factory;
// Specifies the maximum stack size that the thread is allowed to use.
......@@ -123,7 +136,7 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// init_com_with_mta(false) and then StartWithOptions() with any message loop
// type other than TYPE_UI.
void init_com_with_mta(bool use_mta) {
DCHECK(!message_loop_base_);
DCHECK(!task_environment_);
com_status_ = use_mta ? MTA : STA;
}
#endif
......@@ -216,8 +229,9 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// Start().
DCHECK(owning_sequence_checker_.CalledOnValidSequence() ||
(id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) ||
message_loop_base_);
return message_loop_base_ ? message_loop_base_->GetTaskRunner() : nullptr;
task_environment_);
return task_environment_ ? task_environment_->GetDefaultTaskRunner()
: nullptr;
}
// Returns the name of this thread (for display in debugger too).
......@@ -248,29 +262,10 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
static void SetThreadWasQuitProperly(bool flag);
static bool GetThreadWasQuitProperly();
// Returns the message loop for this thread. Use the MessageLoop's
// PostTask methods to execute code on the thread. This only returns
// non-null after a successful call to Start. After Stop has been called,
// this will return nullptr.
//
// NOTE: You must not call this MessageLoop's Quit method directly. Use
// the Thread's Stop method instead.
//
// In addition to this Thread's owning sequence, this can also safely be
// called from the underlying thread itself.
MessageLoopBase* message_loop_base() const {
// See the comment inside |task_runner()|.
DCHECK(owning_sequence_checker_.CalledOnValidSequence() ||
(id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) ||
message_loop_base_);
return message_loop_base_;
}
private:
// Friends for message_loop() access:
friend class MessageLoopTaskRunnerTest;
friend class ScheduleWorkTest;
friend class MessageLoopTaskRunnerTest;
#if defined(OS_WIN)
enum ComStatus {
......@@ -313,12 +308,9 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// Protects |id_| which must only be read while it's signaled.
mutable WaitableEvent id_event_;
// Supports creation of MessageLoopType::CUSTOM.
Options::MessagePumpFactory message_pump_factory_;
// The thread's MessageLooBase and RunLoop. Valid only while the thread is
// The thread's TaskEnvironment and RunLoop are valid only while the thread is
// alive. Set by the created thread.
MessageLoopBase* message_loop_base_ = nullptr;
std::unique_ptr<TaskEnvironment> task_environment_;
RunLoop* run_loop_ = nullptr;
// Stores Options::timer_slack_ until the sequence manager has been bound to
......@@ -338,6 +330,25 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
DISALLOW_COPY_AND_ASSIGN(Thread);
};
namespace internal {
class BASE_EXPORT MessageLoopTaskEnvironment : public Thread::TaskEnvironment {
public:
explicit MessageLoopTaskEnvironment(
std::unique_ptr<MessageLoop> message_loop);
~MessageLoopTaskEnvironment() override;
// Thread::TaskEnvironment:
scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() override;
void BindToCurrentThread(TimerSlack timer_slack) override;
private:
std::unique_ptr<MessageLoop> message_loop_;
};
} // namespace internal
} // namespace base
#endif // BASE_THREADING_THREAD_H_
......@@ -522,25 +522,51 @@ TEST_F(ThreadTest, FlushForTesting) {
a.FlushForTesting();
}
TEST_F(ThreadTest, ProvidedMessageLoopBase) {
Thread thread("ProvidedMessageLoopBase");
std::unique_ptr<base::sequence_manager::internal::SequenceManagerImpl>
sequence_manager =
base::sequence_manager::internal::SequenceManagerImpl::CreateUnbound(
base::sequence_manager::SequenceManager::Settings());
scoped_refptr<base::sequence_manager::TaskQueue> task_queue =
sequence_manager
->CreateTaskQueueWithType<base::sequence_manager::TaskQueue>(
base::sequence_manager::TaskQueue::Spec("default_tq"));
sequence_manager->SetTaskRunner(task_queue->task_runner());
namespace {
class SequenceManagerTaskEnvironment : public Thread::TaskEnvironment {
public:
SequenceManagerTaskEnvironment()
: sequence_manager_(
base::sequence_manager::CreateUnboundSequenceManager()),
task_queue_(
sequence_manager_
->CreateTaskQueueWithType<base::sequence_manager::TaskQueue>(
base::sequence_manager::TaskQueue::Spec("default_tq"))) {
sequence_manager_->SetDefaultTaskRunner(GetDefaultTaskRunner());
}
~SequenceManagerTaskEnvironment() override {}
scoped_refptr<base::SingleThreadTaskRunner> GetDefaultTaskRunner() override {
return task_queue_->task_runner();
}
void BindToCurrentThread(base::TimerSlack timer_slack) override {
sequence_manager_->BindToMessagePump(
base::MessageLoop::CreateMessagePumpForType(
base::MessageLoop::TYPE_DEFAULT));
sequence_manager_->SetTimerSlack(timer_slack);
}
private:
std::unique_ptr<base::sequence_manager::SequenceManager> sequence_manager_;
scoped_refptr<base::sequence_manager::TaskQueue> task_queue_;
DISALLOW_COPY_AND_ASSIGN(SequenceManagerTaskEnvironment);
};
} // namespace
TEST_F(ThreadTest, ProvidedTaskEnvironment) {
Thread thread("TaskEnvironment");
base::Thread::Options options;
options.message_loop_base = sequence_manager.release();
options.task_environment = new SequenceManagerTaskEnvironment();
thread.StartWithOptions(options);
base::WaitableEvent event;
task_queue->task_runner()->PostTask(
options.task_environment->GetDefaultTaskRunner()->PostTask(
FROM_HERE,
base::BindOnce(&base::WaitableEvent::Signal, base::Unretained(&event)));
event.Wait();
......
......@@ -28,6 +28,41 @@
namespace content {
namespace {
class SequenceManagerTaskEnvironment : public base::Thread::TaskEnvironment {
public:
SequenceManagerTaskEnvironment(
std::unique_ptr<base::sequence_manager::SequenceManager> sequence_manager,
scoped_refptr<base::SingleThreadTaskRunner> default_task_runner)
: sequence_manager_(std::move(sequence_manager)),
default_task_runner_(std::move(default_task_runner)) {
sequence_manager_->SetDefaultTaskRunner(default_task_runner_);
}
~SequenceManagerTaskEnvironment() override {}
// Thread::TaskEnvironment:
scoped_refptr<base::SingleThreadTaskRunner> GetDefaultTaskRunner() override {
return default_task_runner_;
}
void BindToCurrentThread(base::TimerSlack timer_slack) override {
sequence_manager_->BindToMessagePump(
base::MessageLoop::CreateMessagePumpForType(
base::MessageLoop::TYPE_DEFAULT));
sequence_manager_->SetTimerSlack(timer_slack);
}
private:
std::unique_ptr<base::sequence_manager::SequenceManager> sequence_manager_;
scoped_refptr<base::SingleThreadTaskRunner> default_task_runner_;
DISALLOW_COPY_AND_ASSIGN(SequenceManagerTaskEnvironment);
};
} // namespace
class BrowserThreadTest : public testing::Test {
public:
void Release() const {
......@@ -50,14 +85,15 @@ class BrowserThreadTest : public testing::Test {
std::unique_ptr<BrowserUIThreadScheduler> browser_ui_thread_scheduler =
BrowserUIThreadScheduler::CreateForTesting(
sequence_manager.get(), sequence_manager->GetRealTimeDomain());
sequence_manager->SetDefaultTaskRunner(
base::Thread::Options ui_options;
ui_options.task_environment = new SequenceManagerTaskEnvironment(
std::move(sequence_manager),
browser_ui_thread_scheduler->GetTaskRunnerForTesting(
BrowserUIThreadScheduler::QueueType::kDefault));
BrowserTaskExecutor::CreateWithBrowserUIThreadSchedulerForTesting(
std::move(browser_ui_thread_scheduler));
base::Thread::Options ui_options;
ui_options.message_loop_base = sequence_manager.release();
ui_thread_->StartWithOptions(ui_options);
io_thread_ = std::make_unique<BrowserProcessSubThread>(BrowserThread::IO);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment