Commit 8a408722 authored by bauerb's avatar bauerb Committed by Commit bot

Add SequencedTaskRunnerHandle to get a SequencedTaskRunner for the current thread / sequence.

BUG=546596

Review URL: https://codereview.chromium.org/1423773003

Cr-Commit-Position: refs/heads/master@{#357067}
parent fff0ba0a
...@@ -547,6 +547,8 @@ component("base") { ...@@ -547,6 +547,8 @@ component("base") {
"threading/platform_thread_win.cc", "threading/platform_thread_win.cc",
"threading/post_task_and_reply_impl.cc", "threading/post_task_and_reply_impl.cc",
"threading/post_task_and_reply_impl.h", "threading/post_task_and_reply_impl.h",
"threading/sequenced_task_runner_handle.cc",
"threading/sequenced_task_runner_handle.h",
"threading/sequenced_worker_pool.cc", "threading/sequenced_worker_pool.cc",
"threading/sequenced_worker_pool.h", "threading/sequenced_worker_pool.h",
"threading/simple_thread.cc", "threading/simple_thread.cc",
...@@ -1435,6 +1437,7 @@ test("base_unittests") { ...@@ -1435,6 +1437,7 @@ test("base_unittests") {
"test/user_action_tester_unittest.cc", "test/user_action_tester_unittest.cc",
"threading/non_thread_safe_unittest.cc", "threading/non_thread_safe_unittest.cc",
"threading/platform_thread_unittest.cc", "threading/platform_thread_unittest.cc",
"threading/sequenced_task_runner_handle_unittest.cc",
"threading/sequenced_worker_pool_unittest.cc", "threading/sequenced_worker_pool_unittest.cc",
"threading/simple_thread_unittest.cc", "threading/simple_thread_unittest.cc",
"threading/thread_checker_unittest.cc", "threading/thread_checker_unittest.cc",
......
...@@ -624,6 +624,7 @@ ...@@ -624,6 +624,7 @@
'threading/non_thread_safe_unittest.cc', 'threading/non_thread_safe_unittest.cc',
'threading/platform_thread_unittest.cc', 'threading/platform_thread_unittest.cc',
'threading/sequenced_worker_pool_unittest.cc', 'threading/sequenced_worker_pool_unittest.cc',
'threading/sequenced_task_runner_handle_unittest.cc',
'threading/simple_thread_unittest.cc', 'threading/simple_thread_unittest.cc',
'threading/thread_checker_unittest.cc', 'threading/thread_checker_unittest.cc',
'threading/thread_collision_warner_unittest.cc', 'threading/thread_collision_warner_unittest.cc',
......
...@@ -645,6 +645,8 @@ ...@@ -645,6 +645,8 @@
'threading/platform_thread_win.cc', 'threading/platform_thread_win.cc',
'threading/post_task_and_reply_impl.cc', 'threading/post_task_and_reply_impl.cc',
'threading/post_task_and_reply_impl.h', 'threading/post_task_and_reply_impl.h',
'threading/sequenced_task_runner_handle.cc',
'threading/sequenced_task_runner_handle.h',
'threading/sequenced_worker_pool.cc', 'threading/sequenced_worker_pool.cc',
'threading/sequenced_worker_pool.h', 'threading/sequenced_worker_pool.h',
'threading/simple_thread.cc', 'threading/simple_thread.cc',
......
...@@ -16,6 +16,7 @@ class SingleThreadTaskRunner; ...@@ -16,6 +16,7 @@ class SingleThreadTaskRunner;
// in thread-local storage. Callers can then retrieve the TaskRunner // in thread-local storage. Callers can then retrieve the TaskRunner
// for the current thread by calling ThreadTaskRunnerHandle::Get(). // for the current thread by calling ThreadTaskRunnerHandle::Get().
// At most one TaskRunner may be bound to each thread at a time. // At most one TaskRunner may be bound to each thread at a time.
// Prefer SequenceTaskRunnerHandle to this unless thread affinity is required.
class BASE_EXPORT ThreadTaskRunnerHandle { class BASE_EXPORT ThreadTaskRunnerHandle {
public: public:
// Gets the SingleThreadTaskRunner for the current thread. // Gets the SingleThreadTaskRunner for the current thread.
......
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/sequenced_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "base/threading/sequenced_worker_pool.h"
namespace base {
// static
scoped_refptr<SequencedTaskRunner> SequencedTaskRunnerHandle::Get() {
// If we are on a worker thread for a SequencedBlockingPool that is running a
// sequenced task, return a SequencedTaskRunner for it.
scoped_refptr<base::SequencedWorkerPool> pool =
SequencedWorkerPool::GetWorkerPoolForCurrentThread();
if (pool) {
SequencedWorkerPool::SequenceToken sequence_token =
SequencedWorkerPool::GetSequenceTokenForCurrentThread();
DCHECK(sequence_token.IsValid());
DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
return pool->GetSequencedTaskRunner(sequence_token);
}
// Otherwise, return a SingleThreadTaskRunner for the current thread.
return base::ThreadTaskRunnerHandle::Get();
}
// static
bool SequencedTaskRunnerHandle::IsSet() {
return (SequencedWorkerPool::GetWorkerPoolForCurrentThread() &&
SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()) ||
base::ThreadTaskRunnerHandle::IsSet();
}
} // namespace base
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_
#define BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
namespace base {
class SequencedTaskRunner;
class BASE_EXPORT SequencedTaskRunnerHandle {
public:
// Returns a SequencedTaskRunner which guarantees that posted tasks will only
// run after the current task is finished and will satisfy a SequenceChecker.
// It should only be called if IsSet() returns true (see the comment there for
// the requirements).
static scoped_refptr<SequencedTaskRunner> Get();
// Returns true if one of the following conditions is fulfilled:
// a) The current thread has a ThreadTaskRunnerHandle (which includes any
// thread that has a MessageLoop associated with it), or
// b) The current thread is a worker thread belonging to a SequencedWorkerPool
// *and* is currently running a sequenced task.
static bool IsSet();
private:
DISALLOW_IMPLICIT_CONSTRUCTORS(SequencedTaskRunnerHandle);
};
} // namespace base
#endif // BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/bind.h"
#include "base/callback.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/sequence_checker_impl.h"
#include "base/sequenced_task_runner.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/sequenced_worker_pool.h"
#include "base/threading/simple_thread.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace {
class SequencedTaskRunnerHandleTest : public ::testing::Test {
protected:
static void GetTaskRunner(const Closure& callback) {
// Use SequenceCheckerImpl to make sure it's not a no-op in Release builds.
scoped_ptr<SequenceCheckerImpl> sequence_checker(new SequenceCheckerImpl);
ASSERT_TRUE(SequencedTaskRunnerHandle::IsSet());
scoped_refptr<SequencedTaskRunner> task_runner =
SequencedTaskRunnerHandle::Get();
ASSERT_TRUE(task_runner);
task_runner->PostTask(
FROM_HERE, base::Bind(&SequencedTaskRunnerHandleTest::CheckValidThread,
base::Passed(&sequence_checker), callback));
}
private:
static void CheckValidThread(scoped_ptr<SequenceCheckerImpl> sequence_checker,
const Closure& callback) {
EXPECT_TRUE(sequence_checker->CalledOnValidSequencedThread());
callback.Run();
}
MessageLoop message_loop_;
};
TEST_F(SequencedTaskRunnerHandleTest, FromMessageLoop) {
RunLoop run_loop;
GetTaskRunner(run_loop.QuitClosure());
run_loop.Run();
}
TEST_F(SequencedTaskRunnerHandleTest, FromSequencedWorkerPool) {
scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Test"));
WaitableEvent event(false, false);
pool->PostSequencedWorkerTask(
pool->GetSequenceToken(), FROM_HERE,
base::Bind(&SequencedTaskRunnerHandleTest::GetTaskRunner,
base::Bind(&WaitableEvent::Signal, base::Unretained(&event))));
event.Wait();
}
class ThreadRunner : public DelegateSimpleThread::Delegate {
public:
void Run() override {
ASSERT_FALSE(SequencedTaskRunnerHandle::IsSet());
}
private:
Closure callback_;
};
TEST_F(SequencedTaskRunnerHandleTest, FromSimpleThread) {
ThreadRunner thread_runner;
DelegateSimpleThread thread(&thread_runner, "Background thread");
thread.Start();
thread.Join();
}
} // namespace
} // namespace base
...@@ -219,10 +219,6 @@ uint64 GetTaskTraceID(const SequencedTask& task, ...@@ -219,10 +219,6 @@ uint64 GetTaskTraceID(const SequencedTask& task,
static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
} }
base::LazyInstance<base::ThreadLocalPointer<
SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
LAZY_INSTANCE_INITIALIZER;
} // namespace } // namespace
// Worker --------------------------------------------------------------------- // Worker ---------------------------------------------------------------------
...@@ -239,6 +235,9 @@ class SequencedWorkerPool::Worker : public SimpleThread { ...@@ -239,6 +235,9 @@ class SequencedWorkerPool::Worker : public SimpleThread {
// SimpleThread implementation. This actually runs the background thread. // SimpleThread implementation. This actually runs the background thread.
void Run() override; void Run() override;
// Gets the worker for the current thread out of thread-local storage.
static Worker* GetForCurrentThread();
// Indicates that a task is about to be run. The parameters provide // Indicates that a task is about to be run. The parameters provide
// additional metainformation about the task being run. // additional metainformation about the task being run.
void set_running_task_info(SequenceToken token, void set_running_task_info(SequenceToken token,
...@@ -264,7 +263,14 @@ class SequencedWorkerPool::Worker : public SimpleThread { ...@@ -264,7 +263,14 @@ class SequencedWorkerPool::Worker : public SimpleThread {
return task_shutdown_behavior_; return task_shutdown_behavior_;
} }
scoped_refptr<SequencedWorkerPool> worker_pool() const {
return worker_pool_;
}
private: private:
static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
lazy_tls_ptr_;
scoped_refptr<SequencedWorkerPool> worker_pool_; scoped_refptr<SequencedWorkerPool> worker_pool_;
// The sequence token of the task being processed. Only valid when // The sequence token of the task being processed. Only valid when
// is_processing_task_ is true. // is_processing_task_ is true.
...@@ -508,9 +514,10 @@ void SequencedWorkerPool::Worker::Run() { ...@@ -508,9 +514,10 @@ void SequencedWorkerPool::Worker::Run() {
win::ScopedCOMInitializer com_initializer; win::ScopedCOMInitializer com_initializer;
#endif #endif
// Store a pointer to the running sequence in thread local storage for // Store a pointer to this worker in thread local storage for static function
// static function access. // access.
g_lazy_tls_ptr.Get().Set(&task_sequence_token_); DCHECK(!lazy_tls_ptr_.Get().Get());
lazy_tls_ptr_.Get().Set(this);
// Just jump back to the Inner object to run the thread, since it has all the // Just jump back to the Inner object to run the thread, since it has all the
// tracking information and queues. It might be more natural to implement // tracking information and queues. It might be more natural to implement
...@@ -519,9 +526,23 @@ void SequencedWorkerPool::Worker::Run() { ...@@ -519,9 +526,23 @@ void SequencedWorkerPool::Worker::Run() {
// send thread-specific information easily to the thread loop. // send thread-specific information easily to the thread loop.
worker_pool_->inner_->ThreadLoop(this); worker_pool_->inner_->ThreadLoop(this);
// Release our cyclic reference once we're done. // Release our cyclic reference once we're done.
worker_pool_ = NULL; worker_pool_ = nullptr;
}
// static
SequencedWorkerPool::Worker*
SequencedWorkerPool::Worker::GetForCurrentThread() {
// Don't construct lazy instance on check.
if (lazy_tls_ptr_ == nullptr)
return nullptr;
return lazy_tls_ptr_.Get().Get();
} }
// static
LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
// Inner definitions --------------------------------------------------------- // Inner definitions ---------------------------------------------------------
SequencedWorkerPool::Inner::Inner( SequencedWorkerPool::Inner::Inner(
...@@ -1145,17 +1166,28 @@ SequencedWorkerPool::Inner::g_last_sequence_number_; ...@@ -1145,17 +1166,28 @@ SequencedWorkerPool::Inner::g_last_sequence_number_;
// SequencedWorkerPool -------------------------------------------------------- // SequencedWorkerPool --------------------------------------------------------
std::string SequencedWorkerPool::SequenceToken::ToString() const {
return base::StringPrintf("[%d]", id_);
}
// static // static
SequencedWorkerPool::SequenceToken SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetSequenceTokenForCurrentThread() { SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
// Don't construct lazy instance on check. Worker* worker = Worker::GetForCurrentThread();
if (g_lazy_tls_ptr == NULL) if (!worker)
return SequenceToken(); return SequenceToken();
SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); return worker->task_sequence_token();
if (!token) }
return SequenceToken();
return *token; // static
scoped_refptr<SequencedWorkerPool>
SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
Worker* worker = Worker::GetForCurrentThread();
if (!worker)
return nullptr;
return worker->worker_pool();
} }
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
......
...@@ -121,7 +121,7 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { ...@@ -121,7 +121,7 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// Opaque identifier that defines sequencing of tasks posted to the worker // Opaque identifier that defines sequencing of tasks posted to the worker
// pool. // pool.
class SequenceToken { class BASE_EXPORT SequenceToken {
public: public:
SequenceToken() : id_(0) {} SequenceToken() : id_(0) {}
~SequenceToken() {} ~SequenceToken() {}
...@@ -135,6 +135,10 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { ...@@ -135,6 +135,10 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
return id_ != 0; return id_ != 0;
} }
// Returns a string representation of this token. This method should only be
// used for debugging.
std::string ToString() const;
private: private:
friend class SequencedWorkerPool; friend class SequencedWorkerPool;
...@@ -157,17 +161,21 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { ...@@ -157,17 +161,21 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// an unsequenced task, returns an invalid SequenceToken. // an unsequenced task, returns an invalid SequenceToken.
static SequenceToken GetSequenceTokenForCurrentThread(); static SequenceToken GetSequenceTokenForCurrentThread();
// Returns the SequencedWorkerPool that owns this thread, or null if the
// current thread is not a SequencedWorkerPool worker thread.
static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
// When constructing a SequencedWorkerPool, there must be a // When constructing a SequencedWorkerPool, there must be a
// MessageLoop on the current thread unless you plan to deliberately // ThreadTaskRunnerHandle on the current thread unless you plan to
// leak it. // deliberately leak it.
// Pass the maximum number of threads (they will be lazily created as needed) // Pass the maximum number of threads (they will be lazily created as needed)
// and a prefix for the thread name to aid in debugging. // and a prefix for the thread name to aid in debugging.
SequencedWorkerPool(size_t max_threads, SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix); const std::string& thread_name_prefix);
// Like above, but with |observer| for testing. Does not take // Like above, but with |observer| for testing. Does not take ownership of
// ownership of |observer|. // |observer|.
SequencedWorkerPool(size_t max_threads, SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix, const std::string& thread_name_prefix,
TestingObserver* observer); TestingObserver* observer);
......
...@@ -904,6 +904,49 @@ TEST_F(SequencedWorkerPoolTest, FlushForTesting) { ...@@ -904,6 +904,49 @@ TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
pool()->FlushForTesting(); pool()->FlushForTesting();
} }
namespace {
void CheckWorkerPoolAndSequenceToken(
const scoped_refptr<SequencedWorkerPool>& expected_pool,
SequencedWorkerPool::SequenceToken expected_token) {
SequencedWorkerPool::SequenceToken token =
SequencedWorkerPool::GetSequenceTokenForCurrentThread();
EXPECT_EQ(expected_token.ToString(), token.ToString());
scoped_refptr<SequencedWorkerPool> pool =
SequencedWorkerPool::GetWorkerPoolForCurrentThread();
EXPECT_EQ(expected_pool, pool);
}
} // namespace
TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) {
EnsureAllWorkersCreated();
// The current thread should have neither a worker pool nor a sequence token.
SequencedWorkerPool::SequenceToken local_token =
SequencedWorkerPool::GetSequenceTokenForCurrentThread();
scoped_refptr<SequencedWorkerPool> local_pool =
SequencedWorkerPool::GetWorkerPoolForCurrentThread();
EXPECT_FALSE(local_token.IsValid()) << local_token.ToString();
EXPECT_FALSE(local_pool);
SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
pool()->PostSequencedWorkerTask(
token1, FROM_HERE,
base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token1));
pool()->PostSequencedWorkerTask(
token2, FROM_HERE,
base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token2));
pool()->PostWorkerTask(FROM_HERE,
base::Bind(&CheckWorkerPoolAndSequenceToken, pool(),
SequencedWorkerPool::SequenceToken()));
pool()->FlushForTesting();
}
TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) { TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
MessageLoop loop; MessageLoop loop;
scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool")); scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment