Commit 0554cbe7 authored by Carlos Caballero's avatar Carlos Caballero Committed by Commit Bot

[task] New OperationsController class

Create a helper class to manage critical multi-threaded operations
without locks.

This class will be used to remove the lock in
base/task/sequence_manager/task_queue_proxy.h and to replace similar
code in base/task/task_scheduler/task_tracker.h, and
base/message_loop/message_loop_impl.cc

Inspired from MessageLoopImpl::Controller.

Bug: 901345
Change-Id: I56d2f2143e396c7bc81a76a23fe4731d280455d1
Reviewed-on: https://chromium-review.googlesource.com/c/1335942
Commit-Queue: Carlos Caballero <carlscab@google.com>
Reviewed-by: default avatarGabriel Charette <gab@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Reviewed-by: default avatarAlex Clarke <alexclarke@chromium.org>
Reviewed-by: default avatarSami Kyöstilä <skyostil@chromium.org>
Cr-Commit-Position: refs/heads/master@{#609670}
parent 3b0eebbf
...@@ -732,6 +732,8 @@ jumbo_component("base") { ...@@ -732,6 +732,8 @@ jumbo_component("base") {
"task/cancelable_task_tracker.cc", "task/cancelable_task_tracker.cc",
"task/cancelable_task_tracker.h", "task/cancelable_task_tracker.h",
"task/common/intrusive_heap.h", "task/common/intrusive_heap.h",
"task/common/operations_controller.cc",
"task/common/operations_controller.h",
"task/lazy_task_runner.cc", "task/lazy_task_runner.cc",
"task/lazy_task_runner.h", "task/lazy_task_runner.h",
"task/post_task.cc", "task/post_task.cc",
...@@ -2458,6 +2460,7 @@ test("base_unittests") { ...@@ -2458,6 +2460,7 @@ test("base_unittests") {
"system/system_monitor_unittest.cc", "system/system_monitor_unittest.cc",
"task/cancelable_task_tracker_unittest.cc", "task/cancelable_task_tracker_unittest.cc",
"task/common/intrusive_heap_unittest.cc", "task/common/intrusive_heap_unittest.cc",
"task/common/operations_controller_unittest.cc",
"task/lazy_task_runner_unittest.cc", "task/lazy_task_runner_unittest.cc",
"task/post_task_unittest.cc", "task/post_task_unittest.cc",
"task/scoped_set_task_priority_for_current_thread_unittest.cc", "task/scoped_set_task_priority_for_current_thread_unittest.cc",
......
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/common/operations_controller.h"
#include "base/logging.h"
namespace base {
namespace internal {
OperationsController::OperationsController() {
DETACH_FROM_SEQUENCE(owning_sequence_checker_);
}
OperationsController::~OperationsController() {
#if DCHECK_IS_ON()
// An OperationsController may only be deleted when it was either not
// accepting operations or after it was shutdown and there are no in flight
// attempts to perform operations.
auto value = state_and_count_.load();
DCHECK(
ExtractState(value) == State::kRejectingOperations ||
(ExtractState(value) == State::kShuttingDown && ExtractCount(value) == 0))
<< value;
#endif
}
bool OperationsController::StartAcceptingOperations() {
DCHECK_CALLED_ON_VALID_SEQUENCE(owning_sequence_checker_);
// Release semantics are required to ensure that all memory accesses made on
// this thread happen-before any others done on a thread which is later
// allowed to perform an operation.
auto prev_value = state_and_count_.fetch_or(kAcceptingOperationsBitMask,
std::memory_order_release);
DCHECK_EQ(ExtractState(prev_value), State::kRejectingOperations);
// The count is the number of rejected operations, unwind them now.
auto num_rejected = ExtractCount(prev_value);
DecrementBy(num_rejected);
return num_rejected != 0;
}
OperationsController::OperationToken OperationsController::TryBeginOperation() {
// Acquire semantics are required to ensure that a thread which is allowed to
// perform an operation sees all the memory side-effects that happened-before
// StartAcceptingOperations(). They're also required so that no operations on
// this thread (e.g. the operation itself) can be reordered before this one.
auto prev_value = state_and_count_.fetch_add(1, std::memory_order_acquire);
switch (ExtractState(prev_value)) {
case State::kRejectingOperations:
return OperationToken(nullptr);
case State::kAcceptingOperations:
return OperationToken(this);
case State::kShuttingDown:
DecrementBy(1);
return OperationToken(nullptr);
}
}
void OperationsController::ShutdownAndWaitForZeroOperations() {
DCHECK_CALLED_ON_VALID_SEQUENCE(owning_sequence_checker_);
// Acquire semantics are required to guarantee that all memory side-effects
// made by other threads that were allowed to perform operations are
// synchronized with this thread before it returns from this method.
auto prev_value = state_and_count_.fetch_or(kShuttingDownBitMask,
std::memory_order_acquire);
switch (ExtractState(prev_value)) {
case State::kRejectingOperations:
// The count is the number of rejected operations, unwind them now.
DecrementBy(ExtractCount(prev_value));
break;
case State::kAcceptingOperations:
if (ExtractCount(prev_value) != 0) {
shutdown_complete_.Wait();
}
break;
case State::kShuttingDown:
DCHECK(false) << "Multiple calls to ShutdownAndWaitForZeroOperations()";
break;
}
}
OperationsController::State OperationsController::ExtractState(uint32_t value) {
if (value & kShuttingDownBitMask) {
return State::kShuttingDown;
} else if (value & kAcceptingOperationsBitMask) {
return State::kAcceptingOperations;
} else {
return State::kRejectingOperations;
}
}
void OperationsController::DecrementBy(uint32_t n) {
// Release semantics are required to ensure that no operation on the current
// thread (e.g. the operation itself) can be reordered after this one.
auto prev_value = state_and_count_.fetch_sub(n, std::memory_order_release);
DCHECK_LE(n, ExtractCount(prev_value)) << "Decrement underflow";
if (ExtractState(prev_value) == State::kShuttingDown &&
ExtractCount(prev_value) == n) {
shutdown_complete_.Signal();
}
}
} // namespace internal
} // namespace base
\ No newline at end of file
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_COMMON_OPERATIONS_CONTROLLER_H_
#define BASE_TASK_COMMON_OPERATIONS_CONTROLLER_H_
#include <atomic>
#include <cstdint>
#include "base/sequence_checker.h"
#include "base/synchronization/waitable_event.h"
namespace base {
namespace internal {
// A lock-free thread-safe controller to manage critical multi-threaded
// operations without locks.
//
// The controller is used to determine if operations are allowed, and to keep
// track of how many are currently active. Users will call TryBeginOperation()
// before starting such operations. If the call succeeds the user can run the
// operation and the controller will keep track of it until the user signals
// that the operation is completed. No operations are allowed before
// StartAcceptingOperations() is called, or after
// ShutdownAndWaitForZeroOperations() is called.
//
// There is no explicit way of telling the controller when an operation is
// completed, instead for convenience TryBeginOperation() will return a RAII
// like object that will do so on destruction.
//
// For example:
//
// OperationsController controller_;
//
// void SetUp() {
// controller_.StartAcceptingOperations();
// }
//
// void TearDown() {
// controller_.ShutdownAndWaitForZeroOperations();
// }
//
// void MaybeRunOperation() {
// auto operation_token = controller_.TryBeginOperation();
// if (operation_token) {
// Process();
// }
// }
//
// Attention: StartAcceptingOperations() and ShutdownAndWaitForZeroOperations()
// must be called form the same Sequence.
//
// This class is thread-safe (but see attention note above).
class BASE_EXPORT OperationsController {
public:
// The owner of an OperationToken which evaluates to true can safely perform
// an operation while being certain it happens-after
// StartAcceptingOperations() and happens-before
// ShutdownAndWaitForZeroOperations(). Releasing this OperationToken
// relinquishes this right.
//
// This class is thread-safe
class OperationToken {
public:
~OperationToken() {
if (outer_)
outer_->DecrementBy(1);
}
OperationToken(const OperationToken&) = delete;
OperationToken(OperationToken&& other) {
this->outer_ = other.outer_;
other.outer_ = nullptr;
}
operator bool() const { return !!outer_; }
private:
friend class OperationsController;
explicit OperationToken(OperationsController* outer) : outer_(outer) {}
OperationsController* outer_;
};
OperationsController();
// Users must call ShutdownAndWaitForZeroOperations() before destroying an
// instance of this class if StartAcceptingOperations() was called.
~OperationsController();
OperationsController(const OperationsController&) = delete;
OperationsController& operator=(const OperationsController&) = delete;
// Starts to accept operations (before this point TryBeginOperation() returns
// an invalid token). Returns true if an attempt to perform an operation was
// made and denied before StartAcceptingOperations() was called. Can be called
// at most once, never after ShutdownAndWaitForZeroOperations().
//
// Note that if this returns true, the caller may perform an operation to
// replace the ones denied (safe since ShutdownAndWaitForZeroOperations() has
// to be invoked on the same sequence).
bool StartAcceptingOperations();
// Returns a RAII like object that implicitly converts to true if operations
// are allowed i.e. if this call happens-after StartAcceptingOperations() and
// happens-before Shutdown(), otherwise the object will convert to false. On
// successful return, this OperationsController will keep track of the
// operation until the returned object goes out of scope.
OperationToken TryBeginOperation();
// Prevents further calls to TryBeginOperation() from succeeding and waits for
// all the ongoing operations to complete.
//
// Attention: Can only be called once.
// Attention: Must be called from the same Sequence as
// StartAcceptingOperations() (if called).
void ShutdownAndWaitForZeroOperations();
private:
// Atomic representation of the state of this class. We use the upper 2 bits
// to keep track of flag like values and the remainder bits are used as a
// counter. The 2 flags are used to represent 3 different states:
//
// State | AcceptOperations Bit | ShuttingDown Bit
// --------------------------------------------------------------
// kRejectingOperations | 0 | 0
// kAcceptingOperations | 1 | 0
// kShuttingDown | * | 1
//
// The counter keeps track of the rejected operations when we are in
// the kRejectingOperations state, the number of inflight operations
// otherwise. If the count reaches zero and we are in the shutting down state
// |shutdown_complete_| will be signaled.
static constexpr uint32_t kShuttingDownBitMask = uint32_t{1} << 31;
static constexpr uint32_t kAcceptingOperationsBitMask = uint32_t{1} << 30;
static constexpr uint32_t kFlagsBitMask =
(kShuttingDownBitMask | kAcceptingOperationsBitMask);
static constexpr uint32_t kCountBitMask = ~kFlagsBitMask;
enum class State {
kRejectingOperations,
kAcceptingOperations,
kShuttingDown,
};
// Helper methods for the bit fiddling. Pass a |state_and_count_| value to
// extract state or count out of it.
static uint32_t ExtractCount(uint32_t value) { return value & kCountBitMask; }
static State ExtractState(uint32_t value);
// Decrements the counter by |n| and signals |shutdown_complete_| if needed.
void DecrementBy(uint32_t n);
std::atomic<uint32_t> state_and_count_{0};
WaitableEvent shutdown_complete_;
// Verifies that StartAcceptingOperations() and
// ShutdownAndWaitForZeroOperations() are performed on the same sequence.
SEQUENCE_CHECKER(owning_sequence_checker_);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_COMMON_OPERATIONS_CONTROLLER_H_
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/common/operations_controller.h"
#include <atomic>
#include <cstdint>
#include <utility>
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace internal {
namespace {
class ScopedShutdown {
public:
ScopedShutdown(OperationsController* controller) : controller_(*controller) {}
~ScopedShutdown() { controller_.ShutdownAndWaitForZeroOperations(); }
private:
OperationsController& controller_;
};
TEST(OperationsControllerTest, CanBeDestroyedWithoutWaiting) {
OperationsController controller;
}
TEST(OperationsControllerTest, CanShutdownIfNotStarted) {
OperationsController controller;
controller.ShutdownAndWaitForZeroOperations();
}
TEST(OperationsControllerTest, FailsToBeginWhenNotStarted) {
OperationsController controller;
auto operation_token = controller.TryBeginOperation();
EXPECT_FALSE(operation_token);
}
TEST(OperationsControllerTest, CanShutdownAfterTryCallsIfNotStarted) {
OperationsController controller;
auto operation_token = controller.TryBeginOperation();
ASSERT_FALSE(operation_token);
controller.ShutdownAndWaitForZeroOperations();
}
TEST(OperationsControllerTest,
StartAcceptingOperationsReturnsFalseIfNoRejectedBeginAttempts) {
OperationsController controller;
ScopedShutdown cleanup(&controller);
EXPECT_FALSE(controller.StartAcceptingOperations());
}
TEST(OperationsControllerTest,
StartAcceptingOperationsReturnsTrueIfFailedBeginAttempts) {
OperationsController controller;
ScopedShutdown cleanup(&controller);
auto operation_token = controller.TryBeginOperation();
ASSERT_FALSE(operation_token);
EXPECT_TRUE(controller.StartAcceptingOperations());
}
TEST(OperationsControllerTest, SuccesfulBeginReturnsValidScopedObject) {
OperationsController controller;
ScopedShutdown cleanup(&controller);
controller.StartAcceptingOperations();
auto operation_token = controller.TryBeginOperation();
EXPECT_TRUE(operation_token);
}
TEST(OperationsControllerTest, BeginFailsAfterShutdown) {
OperationsController controller;
controller.StartAcceptingOperations();
controller.ShutdownAndWaitForZeroOperations();
auto operation_token = controller.TryBeginOperation();
EXPECT_FALSE(operation_token);
}
TEST(OperationsControllerTest, ScopedOperationsControllerIsMoveConstructible) {
OperationsController controller;
ScopedShutdown cleanup(&controller);
controller.StartAcceptingOperations();
auto operation_token_1 = controller.TryBeginOperation();
auto operation_token_2 = std::move(operation_token_1);
EXPECT_FALSE(operation_token_1);
EXPECT_TRUE(operation_token_2);
}
// Dummy SimpleThread implementation that periodically begins and ends
// operations until one of them fails.
class TestThread : public SimpleThread {
public:
explicit TestThread(OperationsController* ref_controller,
std::atomic<bool>* started,
std::atomic<int32_t>* thread_counter)
: SimpleThread("TestThread"),
controller_(*ref_controller),
started_(*started),
thread_counter_(*thread_counter) {}
void Run() override {
thread_counter_.fetch_add(1, std::memory_order_relaxed);
while (true) {
PlatformThread::YieldCurrentThread();
bool was_started = started_.load(std::memory_order_relaxed);
std::vector<OperationsController::OperationToken> tokens;
for (int i = 0; i < 100; ++i) {
tokens.push_back(controller_.TryBeginOperation());
}
if (!was_started)
continue;
if (std::any_of(tokens.begin(), tokens.end(),
[](const auto& token) { return !token; })) {
break;
}
}
}
private:
OperationsController& controller_;
std::atomic<bool>& started_;
std::atomic<int32_t>& thread_counter_;
};
TEST(OperationsControllerTest, BeginsFromMultipleThreads) {
constexpr int32_t kNumThreads = 10;
for (int32_t i = 0; i < 10; ++i) {
OperationsController ref_controller;
std::atomic<bool> started(false);
std::atomic<int32_t> running_threads(0);
std::vector<std::unique_ptr<TestThread>> threads;
for (int j = 0; j < kNumThreads; ++j) {
threads.push_back(std::make_unique<TestThread>(&ref_controller, &started,
&running_threads));
threads.back()->Start();
}
// Make sure all threads are running.
while (running_threads.load(std::memory_order_relaxed) != kNumThreads) {
PlatformThread::YieldCurrentThread();
}
// Wait a bit before starting to try to introduce races.
constexpr TimeDelta kRaceInducingTimeout = TimeDelta::FromMicroseconds(50);
PlatformThread::Sleep(kRaceInducingTimeout);
ref_controller.StartAcceptingOperations();
// Signal threads to terminate on TryBeginOperation() failures
started.store(true, std::memory_order_relaxed);
// Let the test run for a while before shuting down.
PlatformThread::Sleep(TimeDelta::FromMilliseconds(5));
ref_controller.ShutdownAndWaitForZeroOperations();
for (const auto& t : threads) {
t->Join();
}
}
}
} // namespace
} // namespace internal
} // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment