Commit 4393dc6b authored by Robert Liao's avatar Robert Liao Committed by Commit Bot

Add Optional COM MTA Support to All SchedulerWorkerPoolImpl Threads

This change initializes the Windows Runtime as an MTA for versions at
or above Windows 8 and COM as an MTA for versions below Windows 8 if
the caller requests for COM MTA.

BUG=706479

Change-Id: I55e08bb270ac9d1ebbce35647e8ae23ec8504f95
Reviewed-on: https://chromium-review.googlesource.com/741455
Commit-Queue: Robert Liao <robliao@chromium.org>
Reviewed-by: default avatarFrançois Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#513235}
parent 46dd8d1d
...@@ -24,6 +24,13 @@ ...@@ -24,6 +24,13 @@
#include "base/threading/scoped_blocking_call.h" #include "base/threading/scoped_blocking_call.h"
#include "base/threading/thread_restrictions.h" #include "base/threading/thread_restrictions.h"
#if defined(OS_WIN)
#include "base/win/scoped_com_initializer.h"
#include "base/win/scoped_windows_thread_environment.h"
#include "base/win/scoped_winrt_initializer.h"
#include "base/win/windows_version.h"
#endif // defined(OS_WIN)
namespace base { namespace base {
namespace internal { namespace internal {
...@@ -138,6 +145,10 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl ...@@ -138,6 +145,10 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// returned a non-empty sequence and DidRunTask() hasn't been called yet). // returned a non-empty sequence and DidRunTask() hasn't been called yet).
bool is_running_task_ = false; bool is_running_task_ = false;
#if defined(OS_WIN)
std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment_;
#endif // defined(OS_WIN)
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
}; };
...@@ -182,7 +193,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( ...@@ -182,7 +193,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
void SchedulerWorkerPoolImpl::Start( void SchedulerWorkerPoolImpl::Start(
const SchedulerWorkerPoolParams& params, const SchedulerWorkerPoolParams& params,
scoped_refptr<TaskRunner> service_thread_task_runner) { scoped_refptr<TaskRunner> service_thread_task_runner,
WorkerEnvironment worker_environment) {
AutoSchedulerLock auto_lock(lock_); AutoSchedulerLock auto_lock(lock_);
DCHECK(workers_.empty()); DCHECK(workers_.empty());
...@@ -191,6 +203,7 @@ void SchedulerWorkerPoolImpl::Start( ...@@ -191,6 +203,7 @@ void SchedulerWorkerPoolImpl::Start(
initial_worker_capacity_ = worker_capacity_; initial_worker_capacity_ = worker_capacity_;
suggested_reclaim_time_ = params.suggested_reclaim_time(); suggested_reclaim_time_ = params.suggested_reclaim_time();
backward_compatibility_ = params.backward_compatibility(); backward_compatibility_ = params.backward_compatibility();
worker_environment_ = worker_environment;
service_thread_task_runner_ = std::move(service_thread_task_runner); service_thread_task_runner_ = std::move(service_thread_task_runner);
...@@ -338,6 +351,18 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( ...@@ -338,6 +351,18 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
#endif #endif
} }
#if defined(OS_WIN)
if (outer_->worker_environment_ == WorkerEnvironment::COM_MTA) {
if (win::GetVersion() >= win::VERSION_WIN8) {
win_thread_environment_ = std::make_unique<win::ScopedWinrtInitializer>();
} else {
win_thread_environment_ = std::make_unique<win::ScopedCOMInitializer>(
win::ScopedCOMInitializer::kMTA);
}
DCHECK(win_thread_environment_->Succeeded());
}
#endif // defined(OS_WIN)
DCHECK_EQ(num_tasks_since_last_wait_, 0U); DCHECK_EQ(num_tasks_since_last_wait_, 0U);
PlatformThread::SetName( PlatformThread::SetName(
...@@ -498,6 +523,10 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainExit( ...@@ -498,6 +523,10 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainExit(
DCHECK(!ContainsWorker(outer_->workers_, worker)); DCHECK(!ContainsWorker(outer_->workers_, worker));
} }
#endif #endif
#if defined(OS_WIN)
win_thread_environment_.reset();
#endif // defined(OS_WIN)
} }
void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "base/task_scheduler/sequence.h" #include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/task.h" #include "base/task_scheduler/task.h"
#include "base/time/time.h" #include "base/time/time.h"
#include "build/build_config.h"
namespace base { namespace base {
...@@ -46,6 +47,15 @@ class TaskTracker; ...@@ -46,6 +47,15 @@ class TaskTracker;
// This class is thread-safe. // This class is thread-safe.
class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
public: public:
enum class WorkerEnvironment {
// No special worker environment required.
NONE,
#if defined(OS_WIN)
// Initialize a COM MTA on the worker.
COM_MTA,
#endif // defined(OS_WIN)
};
// Constructs a pool without workers. // Constructs a pool without workers.
// //
// |name| is used to label the pool's threads ("TaskScheduler" + |name| + // |name| is used to label the pool's threads ("TaskScheduler" + |name| +
...@@ -62,9 +72,11 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { ...@@ -62,9 +72,11 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// Creates workers following the |params| specification, allowing existing and // Creates workers following the |params| specification, allowing existing and
// future tasks to run. Uses |service_thread_task_runner| to monitor for // future tasks to run. Uses |service_thread_task_runner| to monitor for
// blocked threads in the pool. Can only be called once. CHECKs on failure. // blocked threads in the pool. |thread_environment| Can only be called once.
// CHECKs on failure.
void Start(const SchedulerWorkerPoolParams& params, void Start(const SchedulerWorkerPoolParams& params,
scoped_refptr<TaskRunner> service_thread_task_runner); scoped_refptr<TaskRunner> service_thread_task_runner,
WorkerEnvironment worker_environment);
// Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in
// production; it is always leaked. In tests, it can only be destroyed after // production; it is always leaked. In tests, it can only be destroyed after
...@@ -229,6 +241,9 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { ...@@ -229,6 +241,9 @@ class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
// but haven't caused a worker capacity increase yet. // but haven't caused a worker capacity increase yet.
int num_pending_may_block_workers_ = 0; int num_pending_may_block_workers_ = 0;
// Environment to be initialized per worker.
WorkerEnvironment worker_environment_ = WorkerEnvironment::NONE;
// Stack of idle workers. Initially, all workers are on this stack. A worker // Stack of idle workers. Initially, all workers are on this stack. A worker
// is removed from the stack before its WakeUp() function is called and when // is removed from the stack before its WakeUp() function is called and when
// it receives work from GetWork() (a worker calls GetWork() when its sleep // it receives work from GetWork() (a worker calls GetWork() when its sleep
......
...@@ -43,8 +43,13 @@ ...@@ -43,8 +43,13 @@
#include "base/threading/thread_local_storage.h" #include "base/threading/thread_local_storage.h"
#include "base/threading/thread_restrictions.h" #include "base/threading/thread_restrictions.h"
#include "base/time/time.h" #include "base/time/time.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h" #include "testing/gtest/include/gtest/gtest.h"
#if defined(OS_WIN)
#include "base/win/com_init_util.h"
#endif // defined(OS_WIN)
namespace base { namespace base {
namespace internal { namespace internal {
namespace { namespace {
...@@ -85,11 +90,13 @@ class TaskSchedulerWorkerPoolImplTestBase { ...@@ -85,11 +90,13 @@ class TaskSchedulerWorkerPoolImplTestBase {
ASSERT_TRUE(worker_pool_); ASSERT_TRUE(worker_pool_);
} }
void StartWorkerPool(TimeDelta suggested_reclaim_time, size_t num_workers) { virtual void StartWorkerPool(TimeDelta suggested_reclaim_time,
size_t num_workers) {
ASSERT_TRUE(worker_pool_); ASSERT_TRUE(worker_pool_);
worker_pool_->Start( worker_pool_->Start(
SchedulerWorkerPoolParams(num_workers, suggested_reclaim_time), SchedulerWorkerPoolParams(num_workers, suggested_reclaim_time),
service_thread_.task_runner()); service_thread_.task_runner(),
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
} }
void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time, void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,
...@@ -255,6 +262,29 @@ TEST_P(TaskSchedulerWorkerPoolImplTestParam, Saturate) { ...@@ -255,6 +262,29 @@ TEST_P(TaskSchedulerWorkerPoolImplTestParam, Saturate) {
worker_pool_->WaitForAllWorkersIdleForTesting(); worker_pool_->WaitForAllWorkersIdleForTesting();
} }
#if defined(OS_WIN)
TEST_P(TaskSchedulerWorkerPoolImplTestParam, NoEnvironment) {
// Verify that COM is not initialized in a SchedulerWorkerPoolImpl initialized
// with SchedulerWorkerPoolImpl::WorkerEnvironment::NONE.
scoped_refptr<TaskRunner> task_runner =
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
WaitableEvent task_running(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
task_runner->PostTask(
FROM_HERE, BindOnce(
[](WaitableEvent* task_running) {
win::AssertComApartmentType(win::ComApartmentType::NONE);
task_running->Signal();
},
&task_running));
task_running.Wait();
worker_pool_->WaitForAllWorkersIdleForTesting();
}
#endif // defined(OS_WIN)
INSTANTIATE_TEST_CASE_P(Parallel, INSTANTIATE_TEST_CASE_P(Parallel,
TaskSchedulerWorkerPoolImplTestParam, TaskSchedulerWorkerPoolImplTestParam,
::testing::Values(test::ExecutionMode::PARALLEL)); ::testing::Values(test::ExecutionMode::PARALLEL));
...@@ -262,6 +292,64 @@ INSTANTIATE_TEST_CASE_P(Sequenced, ...@@ -262,6 +292,64 @@ INSTANTIATE_TEST_CASE_P(Sequenced,
TaskSchedulerWorkerPoolImplTestParam, TaskSchedulerWorkerPoolImplTestParam,
::testing::Values(test::ExecutionMode::SEQUENCED)); ::testing::Values(test::ExecutionMode::SEQUENCED));
#if defined(OS_WIN)
namespace {
class TaskSchedulerWorkerPoolImplTestCOMMTAParam
: public TaskSchedulerWorkerPoolImplTestBase,
public testing::TestWithParam<test::ExecutionMode> {
protected:
TaskSchedulerWorkerPoolImplTestCOMMTAParam() = default;
void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::SetUp(); }
void TearDown() override { TaskSchedulerWorkerPoolImplTestBase::TearDown(); }
private:
void StartWorkerPool(TimeDelta suggested_reclaim_time,
size_t num_workers) override {
ASSERT_TRUE(worker_pool_);
worker_pool_->Start(
SchedulerWorkerPoolParams(num_workers, suggested_reclaim_time),
service_thread_.task_runner(),
SchedulerWorkerPoolImpl::WorkerEnvironment::COM_MTA);
}
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestCOMMTAParam);
};
} // namespace
TEST_P(TaskSchedulerWorkerPoolImplTestCOMMTAParam, COMMTAInitialized) {
// Verify that SchedulerWorkerPoolImpl workers have a COM MTA available.
scoped_refptr<TaskRunner> task_runner =
CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
WaitableEvent task_running(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
task_runner->PostTask(
FROM_HERE, BindOnce(
[](WaitableEvent* task_running) {
win::AssertComApartmentType(win::ComApartmentType::MTA);
task_running->Signal();
},
&task_running));
task_running.Wait();
worker_pool_->WaitForAllWorkersIdleForTesting();
}
INSTANTIATE_TEST_CASE_P(Parallel,
TaskSchedulerWorkerPoolImplTestCOMMTAParam,
::testing::Values(test::ExecutionMode::PARALLEL));
INSTANTIATE_TEST_CASE_P(Sequenced,
TaskSchedulerWorkerPoolImplTestCOMMTAParam,
::testing::Values(test::ExecutionMode::SEQUENCED));
#endif // defined(OS_WIN)
namespace { namespace {
class TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest class TaskSchedulerWorkerPoolImplPostTaskBeforeStartTest
...@@ -694,7 +782,8 @@ TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) { ...@@ -694,7 +782,8 @@ TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) {
"OnePolicyWorkerPool", ThreadPriority::NORMAL, &task_tracker, "OnePolicyWorkerPool", ThreadPriority::NORMAL, &task_tracker,
&delayed_task_manager); &delayed_task_manager);
worker_pool->Start(SchedulerWorkerPoolParams(8U, TimeDelta::Max()), worker_pool->Start(SchedulerWorkerPoolParams(8U, TimeDelta::Max()),
service_thread_task_runner); service_thread_task_runner,
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
ASSERT_TRUE(worker_pool); ASSERT_TRUE(worker_pool);
EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting()); EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting());
worker_pool->JoinForTesting(); worker_pool->JoinForTesting();
...@@ -715,7 +804,8 @@ TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, VerifyStandbyThread) { ...@@ -715,7 +804,8 @@ TEST(TaskSchedulerWorkerPoolStandbyPolicyTest, VerifyStandbyThread) {
&delayed_task_manager); &delayed_task_manager);
worker_pool->Start( worker_pool->Start(
SchedulerWorkerPoolParams(worker_capacity, kReclaimTimeForCleanupTests), SchedulerWorkerPoolParams(worker_capacity, kReclaimTimeForCleanupTests),
service_thread_task_runner); service_thread_task_runner,
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
ASSERT_TRUE(worker_pool); ASSERT_TRUE(worker_pool);
EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting()); EXPECT_EQ(1U, worker_pool->NumberOfWorkersForTesting());
...@@ -1237,7 +1327,8 @@ TEST(TaskSchedulerWorkerPoolOverWorkerCapacityTest, VerifyCleanup) { ...@@ -1237,7 +1327,8 @@ TEST(TaskSchedulerWorkerPoolOverWorkerCapacityTest, VerifyCleanup) {
&delayed_task_manager); &delayed_task_manager);
worker_pool.Start( worker_pool.Start(
SchedulerWorkerPoolParams(kWorkerCapacity, kReclaimTimeForCleanupTests), SchedulerWorkerPoolParams(kWorkerCapacity, kReclaimTimeForCleanupTests),
service_thread_task_runner); service_thread_task_runner,
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
scoped_refptr<TaskRunner> task_runner = scoped_refptr<TaskRunner> task_runner =
worker_pool.CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()}); worker_pool.CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
......
...@@ -133,7 +133,8 @@ class TaskSchedulerWorkerPoolTest ...@@ -133,7 +133,8 @@ class TaskSchedulerWorkerPoolTest
scheduler_worker_pool_impl->Start( scheduler_worker_pool_impl->Start(
SchedulerWorkerPoolParams(kNumWorkersInWorkerPool, SchedulerWorkerPoolParams(kNumWorkersInWorkerPool,
TimeDelta::Max()), TimeDelta::Max()),
service_thread_.task_runner()); service_thread_.task_runner(),
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
break; break;
} }
#if defined(OS_WIN) #if defined(OS_WIN)
......
...@@ -27,13 +27,15 @@ TaskScheduler::InitParams::InitParams( ...@@ -27,13 +27,15 @@ TaskScheduler::InitParams::InitParams(
const SchedulerWorkerPoolParams& background_worker_pool_params_in, const SchedulerWorkerPoolParams& background_worker_pool_params_in,
const SchedulerWorkerPoolParams& background_blocking_worker_pool_params_in, const SchedulerWorkerPoolParams& background_blocking_worker_pool_params_in,
const SchedulerWorkerPoolParams& foreground_worker_pool_params_in, const SchedulerWorkerPoolParams& foreground_worker_pool_params_in,
const SchedulerWorkerPoolParams& foreground_blocking_worker_pool_params_in) const SchedulerWorkerPoolParams& foreground_blocking_worker_pool_params_in,
SharedWorkerPoolEnvironment shared_worker_pool_environment_in)
: background_worker_pool_params(background_worker_pool_params_in), : background_worker_pool_params(background_worker_pool_params_in),
background_blocking_worker_pool_params( background_blocking_worker_pool_params(
background_blocking_worker_pool_params_in), background_blocking_worker_pool_params_in),
foreground_worker_pool_params(foreground_worker_pool_params_in), foreground_worker_pool_params(foreground_worker_pool_params_in),
foreground_blocking_worker_pool_params( foreground_blocking_worker_pool_params(
foreground_blocking_worker_pool_params_in) {} foreground_blocking_worker_pool_params_in),
shared_worker_pool_environment(shared_worker_pool_environment_in) {}
TaskScheduler::InitParams::~InitParams() = default; TaskScheduler::InitParams::~InitParams() = default;
......
...@@ -51,19 +51,31 @@ class Location; ...@@ -51,19 +51,31 @@ class Location;
class BASE_EXPORT TaskScheduler { class BASE_EXPORT TaskScheduler {
public: public:
struct BASE_EXPORT InitParams { struct BASE_EXPORT InitParams {
enum class SharedWorkerPoolEnvironment {
// Use the default environment (no environment).
DEFAULT,
#if defined(OS_WIN)
// Place the worker in a COM MTA.
COM_MTA,
#endif // defined(OS_WIN)
};
InitParams( InitParams(
const SchedulerWorkerPoolParams& background_worker_pool_params_in, const SchedulerWorkerPoolParams& background_worker_pool_params_in,
const SchedulerWorkerPoolParams& const SchedulerWorkerPoolParams&
background_blocking_worker_pool_params_in, background_blocking_worker_pool_params_in,
const SchedulerWorkerPoolParams& foreground_worker_pool_params_in, const SchedulerWorkerPoolParams& foreground_worker_pool_params_in,
const SchedulerWorkerPoolParams& const SchedulerWorkerPoolParams&
foreground_blocking_worker_pool_params_in); foreground_blocking_worker_pool_params_in,
SharedWorkerPoolEnvironment shared_worker_pool_environment_in =
SharedWorkerPoolEnvironment::DEFAULT);
~InitParams(); ~InitParams();
SchedulerWorkerPoolParams background_worker_pool_params; SchedulerWorkerPoolParams background_worker_pool_params;
SchedulerWorkerPoolParams background_blocking_worker_pool_params; SchedulerWorkerPoolParams background_blocking_worker_pool_params;
SchedulerWorkerPoolParams foreground_worker_pool_params; SchedulerWorkerPoolParams foreground_worker_pool_params;
SchedulerWorkerPoolParams foreground_blocking_worker_pool_params; SchedulerWorkerPoolParams foreground_blocking_worker_pool_params;
SharedWorkerPoolEnvironment shared_worker_pool_environment;
}; };
// Destroying a TaskScheduler is not allowed in production; it is always // Destroying a TaskScheduler is not allowed in production; it is always
......
...@@ -86,16 +86,28 @@ void TaskSchedulerImpl::Start(const TaskScheduler::InitParams& init_params) { ...@@ -86,16 +86,28 @@ void TaskSchedulerImpl::Start(const TaskScheduler::InitParams& init_params) {
single_thread_task_runner_manager_.Start(); single_thread_task_runner_manager_.Start();
const SchedulerWorkerPoolImpl::WorkerEnvironment worker_environment =
#if defined(OS_WIN)
init_params.shared_worker_pool_environment ==
InitParams::SharedWorkerPoolEnvironment::COM_MTA
? SchedulerWorkerPoolImpl::WorkerEnvironment::COM_MTA
: SchedulerWorkerPoolImpl::WorkerEnvironment::NONE;
#else
SchedulerWorkerPoolImpl::WorkerEnvironment::NONE;
#endif
worker_pools_[BACKGROUND]->Start(init_params.background_worker_pool_params, worker_pools_[BACKGROUND]->Start(init_params.background_worker_pool_params,
service_thread_task_runner); service_thread_task_runner,
worker_environment);
worker_pools_[BACKGROUND_BLOCKING]->Start( worker_pools_[BACKGROUND_BLOCKING]->Start(
init_params.background_blocking_worker_pool_params, init_params.background_blocking_worker_pool_params,
service_thread_task_runner); service_thread_task_runner, worker_environment);
worker_pools_[FOREGROUND]->Start(init_params.foreground_worker_pool_params, worker_pools_[FOREGROUND]->Start(init_params.foreground_worker_pool_params,
service_thread_task_runner); service_thread_task_runner,
worker_environment);
worker_pools_[FOREGROUND_BLOCKING]->Start( worker_pools_[FOREGROUND_BLOCKING]->Start(
init_params.foreground_blocking_worker_pool_params, init_params.foreground_blocking_worker_pool_params,
service_thread_task_runner); service_thread_task_runner, worker_environment);
} }
void TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here, void TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment