Commit 441b9c3e authored by Daniel Murphy's avatar Daniel Murphy Committed by Commit Bot

[IndexedDB] Enable multi-threaded scopes with blocking close.

This patch enables multi-threaded LevelDBScopes usage by IndexedDB. It
does this by:
* Changing the mode during scopes initialization,
* Changing the RequestDestruction parameters to accept a WaitableEvent
* Modifies the destructor of IDBOriginState to request this destruction
  & then wait on the event.
* Modifies all unittests to match the new API pattern.

This allows cleanup & revert tasks to now take place off of the main
IndexedDB sequence, ensuring that cleanup & deferred range deletion do
not block IndexedDB operations.

See doc for more reasoning & alternatives:
https://docs.google.com/document/d/1xlSye5iCVU2kRsdzpue3-OE-BIBhmhTfMWss4XYxygI/edit#

Bug: 1058977
Change-Id: I679990a8ef057531a9ad3f710c9411377881eb11
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2090630Reviewed-by: default avatarenne <enne@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@google.com>
Reviewed-by: default avatarMarijn Kruisselbrink <mek@chromium.org>
Commit-Queue: Daniel Murphy <dmurph@chromium.org>
Cr-Commit-Position: refs/heads/master@{#749233}
parent 94e76188
...@@ -4,8 +4,10 @@ ...@@ -4,8 +4,10 @@
#include "components/services/storage/indexed_db/leveldb/leveldb_state.h" #include "components/services/storage/indexed_db/leveldb/leveldb_state.h"
#include "base/location.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/metrics/histogram_functions.h" #include "base/metrics/histogram_functions.h"
#include "base/synchronization/waitable_event.h"
#include "third_party/leveldatabase/src/include/leveldb/env.h" #include "third_party/leveldatabase/src/include/leveldb/env.h"
namespace content { namespace content {
...@@ -43,24 +45,20 @@ LevelDBState::LevelDBState(std::unique_ptr<leveldb::Env> optional_in_memory_env, ...@@ -43,24 +45,20 @@ LevelDBState::LevelDBState(std::unique_ptr<leveldb::Env> optional_in_memory_env,
name_for_tracing_(std::move(name_for_tracing)), name_for_tracing_(std::move(name_for_tracing)),
destruction_requested_(false) {} destruction_requested_(false) {}
bool LevelDBState::RequestDestruction( void LevelDBState::RequestDestruction(
base::OnceClosure on_state_destruction, base::WaitableEvent* signal_on_destruction) {
scoped_refptr<base::SequencedTaskRunner> task_runner) { DCHECK(signal_on_destruction);
if (destruction_requested_.exchange(true, std::memory_order_relaxed)) bool destruct_already_requested =
return false; destruction_requested_.exchange(true, std::memory_order_relaxed);
CHECK(!destruct_already_requested)
DCHECK(!on_destruction_); << "RequestDestruction can only be called one time.";
DCHECK(!on_destruction_task_runner_); DCHECK(!signal_on_destruction_);
on_destruction_ = std::move(on_state_destruction); signal_on_destruction_ = signal_on_destruction;
on_destruction_task_runner_ = std::move(task_runner);
return true;
} }
LevelDBState::~LevelDBState() { LevelDBState::~LevelDBState() {
if (on_destruction_) { if (signal_on_destruction_)
on_destruction_task_runner_->PostTask(FROM_HERE, signal_on_destruction_->Signal();
std::move(on_destruction_));
}
if (!db_) if (!db_)
return; return;
base::TimeTicks begin_time = base::TimeTicks::Now(); base::TimeTicks begin_time = base::TimeTicks::Now();
......
...@@ -9,15 +9,17 @@ ...@@ -9,15 +9,17 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include "base/callback.h"
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/sequenced_task_runner.h" #include "base/sequenced_task_runner.h"
#include "third_party/leveldatabase/src/include/leveldb/comparator.h" #include "third_party/leveldatabase/src/include/leveldb/comparator.h"
#include "third_party/leveldatabase/src/include/leveldb/db.h" #include "third_party/leveldatabase/src/include/leveldb/db.h"
#include "third_party/leveldatabase/src/include/leveldb/filter_policy.h" #include "third_party/leveldatabase/src/include/leveldb/filter_policy.h"
namespace base {
class WaitableEvent;
} // namespace base
namespace content { namespace content {
// Encapsulates a leveldb database and comparator, allowing them to be used // Encapsulates a leveldb database and comparator, allowing them to be used
...@@ -35,16 +37,18 @@ class LevelDBState : public base::RefCountedThreadSafe<LevelDBState> { ...@@ -35,16 +37,18 @@ class LevelDBState : public base::RefCountedThreadSafe<LevelDBState> {
std::unique_ptr<leveldb::DB> in_memory_database, std::unique_ptr<leveldb::DB> in_memory_database,
std::string name_for_tracing); std::string name_for_tracing);
// Returns if this call was successfully the first call to request destruction // Can only be called once. |signal_on_destruction| must outlive this class,
// of this state. Can be called on any thread. The given |task_runner| will be // and will be signaled on the destruction of this state (in the destructor).
// used to call the |on_destruction| closure, which is called on the // Can be called on any thread.
// destruction of this state. void RequestDestruction(base::WaitableEvent* signal_on_destruction);
bool RequestDestruction(base::OnceClosure on_destruction,
scoped_refptr<base::SequencedTaskRunner> task_runner);
bool destruction_requested() const { bool destruction_requested() const {
return destruction_requested_.load(std::memory_order_relaxed); return destruction_requested_.load(std::memory_order_relaxed);
} }
// Only valid if destruction_requested() returns true.
base::WaitableEvent* destruction_event() const {
return signal_on_destruction_;
}
const leveldb::Comparator* comparator() const { return comparator_; } const leveldb::Comparator* comparator() const { return comparator_; }
leveldb::DB* db() const { return db_.get(); } leveldb::DB* db() const { return db_.get(); }
...@@ -74,11 +78,10 @@ class LevelDBState : public base::RefCountedThreadSafe<LevelDBState> { ...@@ -74,11 +78,10 @@ class LevelDBState : public base::RefCountedThreadSafe<LevelDBState> {
// This member transitions from false to true at most once in the instance's // This member transitions from false to true at most once in the instance's
// lifetime. // lifetime.
std::atomic_bool destruction_requested_; std::atomic_bool destruction_requested_;
// These members are written only once (when |destruction_requested_| // |signal_on_destruction_| is written only once (when
// transitions from false to true) and read only once in the destructor, so // |destruction_requested_| transitions from false to true) and read only once
// they are thread-compatible. // in the destructor, so it is thread-compatible.
base::OnceClosure on_destruction_; base::WaitableEvent* signal_on_destruction_ = nullptr;
scoped_refptr<base::SequencedTaskRunner> on_destruction_task_runner_;
}; };
} // namespace content } // namespace content
......
...@@ -139,8 +139,7 @@ TEST_F(LevelDBScopesTasksTest, CleanupAbortsOnDestructionRequested) { ...@@ -139,8 +139,7 @@ TEST_F(LevelDBScopesTasksTest, CleanupAbortsOnDestructionRequested) {
CleanupScopeTask::CleanupMode::kExecuteCleanupTasks, CleanupScopeTask::CleanupMode::kExecuteCleanupTasks,
kWriteBatchSizeForTesting); kWriteBatchSizeForTesting);
leveldb_->RequestDestruction(base::DoNothing(), leveldb_->RequestDestruction(&leveldb_close_event_);
base::SequencedTaskRunnerHandle::Get());
leveldb::Status s = task.Run(); leveldb::Status s = task.Run();
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
EXPECT_TRUE(LoadAt(delete_range_start_key_).ok()); EXPECT_TRUE(LoadAt(delete_range_start_key_).ok());
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "base/run_loop.h" #include "base/run_loop.h"
#include "base/sequenced_task_runner.h" #include "base/sequenced_task_runner.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/synchronization/waitable_event_watcher.h"
#include "base/system/sys_info.h" #include "base/system/sys_info.h"
#include "base/test/bind_test_util.h" #include "base/test/bind_test_util.h"
#include "base/threading/sequenced_task_runner_handle.h" #include "base/threading/sequenced_task_runner_handle.h"
...@@ -64,12 +65,26 @@ void LevelDBScopesTestBase::TearDown() { ...@@ -64,12 +65,26 @@ void LevelDBScopesTestBase::TearDown() {
void LevelDBScopesTestBase::CloseScopesAndDestroyLevelDBState() { void LevelDBScopesTestBase::CloseScopesAndDestroyLevelDBState() {
if (leveldb_) { if (leveldb_) {
base::RunLoop loop; base::RunLoop loop;
if (leveldb_->RequestDestruction(loop.QuitClosure(), base::WaitableEvent* leveldb_close_event_ptr;
base::SequencedTaskRunnerHandle::Get())) { base::WaitableEventWatcher event_watcher;
leveldb_.reset(); if (leveldb_->destruction_requested()) {
loop.Run(); leveldb_close_event_ptr = leveldb_->destruction_event();
} else {
leveldb_close_event_ptr = &leveldb_close_event_;
leveldb_->RequestDestruction(leveldb_close_event_ptr);
} }
event_watcher.StartWatching(
leveldb_close_event_ptr,
base::BindLambdaForTesting([&](base::WaitableEvent*) { loop.Quit(); }),
base::SequencedTaskRunnerHandle::Get());
leveldb_.reset(); leveldb_.reset();
loop.Run();
// There is a possible race in |leveldb_close_event| where the signaling
// thread is still in the WaitableEvent::Signal() method. To ensure that
// the other thread exits their Signal method, any method on the
// WaitableEvent can be called to acquire the internal lock (which will
// subsequently wait for the other thread to exit the Signal method).
EXPECT_TRUE(leveldb_close_event_ptr->IsSignaled());
} }
} }
......
...@@ -103,6 +103,8 @@ class LevelDBScopesTestBase : public testing::Test { ...@@ -103,6 +103,8 @@ class LevelDBScopesTestBase : public testing::Test {
// may need to run cleanup tasks that close files residing in the former. // may need to run cleanup tasks that close files residing in the former.
base::ScopedTempDir temp_directory_; base::ScopedTempDir temp_directory_;
base::test::TaskEnvironment task_env_; base::test::TaskEnvironment task_env_;
// For use with calling leveldb_->RequestDestruction(...);
base::WaitableEvent leveldb_close_event_;
const std::string simple_lock_begin_ = "0000000001"; const std::string simple_lock_begin_ = "0000000001";
const std::string simple_lock_end_ = "0000000010"; const std::string simple_lock_end_ = "0000000010";
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "base/strings/string16.h" #include "base/strings/string16.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "base/strings/utf_string_conversions.h" #include "base/strings/utf_string_conversions.h"
#include "base/synchronization/waitable_event.h"
#include "base/synchronization/waitable_event_watcher.h"
#include "base/task/post_task.h" #include "base/task/post_task.h"
#include "base/task/thread_pool.h" #include "base/task/thread_pool.h"
#include "base/test/bind_test_util.h" #include "base/test/bind_test_util.h"
...@@ -361,15 +363,29 @@ class IndexedDBBackingStoreTest : public testing::Test { ...@@ -361,15 +363,29 @@ class IndexedDBBackingStoreTest : public testing::Test {
base::RunLoop loop; base::RunLoop loop;
IndexedDBOriginState* per_origin_factory = IndexedDBOriginState* per_origin_factory =
factory->GetOriginFactory(origin); factory->GetOriginFactory(origin);
per_origin_factory->backing_store()
->db() auto* leveldb_state =
->leveldb_state() per_origin_factory->backing_store()->db()->leveldb_state();
->RequestDestruction(loop.QuitClosure(),
base::SequencedTaskRunnerHandle::Get()); base::WaitableEvent leveldb_close_event;
base::WaitableEventWatcher event_watcher;
leveldb_state->RequestDestruction(&leveldb_close_event);
event_watcher.StartWatching(
&leveldb_close_event,
base::BindLambdaForTesting(
[&](base::WaitableEvent*) { loop.Quit(); }),
base::SequencedTaskRunnerHandle::Get());
idb_context_->ForceCloseSync( idb_context_->ForceCloseSync(
origin, origin,
storage::mojom::ForceCloseReason::FORCE_CLOSE_DELETE_ORIGIN); storage::mojom::ForceCloseReason::FORCE_CLOSE_DELETE_ORIGIN);
loop.Run(); loop.Run();
// There is a possible race in |leveldb_close_event| where the signaling
// thread is still in the WaitableEvent::Signal() method. To ensure that
// the other thread exits their Signal method, any method on the
// WaitableEvent can be called to acquire the internal lock (which will
// subsequently wait for the other thread to exit the Signal method).
EXPECT_TRUE(leveldb_close_event.IsSignaled());
} }
// All leveldb databases are closed, and they can be deleted. // All leveldb databases are closed, and they can be deleted.
for (auto origin : idb_context_->GetAllOrigins()) { for (auto origin : idb_context_->GetAllOrigins()) {
......
...@@ -769,7 +769,7 @@ IndexedDBFactoryImpl::GetOrOpenOriginFactory( ...@@ -769,7 +769,7 @@ IndexedDBFactoryImpl::GetOrOpenOriginFactory(
// Scopes must be single sequence to keep methods like ForceClose synchronous. // Scopes must be single sequence to keep methods like ForceClose synchronous.
// See https://crbug.com/980685 // See https://crbug.com/980685
s = backing_store->db()->scopes()->StartRecoveryAndCleanupTasks( s = backing_store->db()->scopes()->StartRecoveryAndCleanupTasks(
LevelDBScopes::TaskRunnerMode::kUseCurrentSequence); LevelDBScopes::TaskRunnerMode::kNewCleanupAndRevertSequences);
if (UNLIKELY(!s.ok())) { if (UNLIKELY(!s.ok())) {
ReportOpenStatus(indexed_db::INDEXED_DB_BACKING_STORE_OPEN_NO_RECOVERY, ReportOpenStatus(indexed_db::INDEXED_DB_BACKING_STORE_OPEN_NO_RECOVERY,
......
...@@ -90,32 +90,18 @@ class IndexedDBFactoryTest : public testing::Test { ...@@ -90,32 +90,18 @@ class IndexedDBFactoryTest : public testing::Test {
// deletion of the leveldb state. Once the states are no longer around, // deletion of the leveldb state. Once the states are no longer around,
// delete all of the databases on disk. // delete all of the databases on disk.
auto open_factory_origins = factory->GetOpenOrigins(); auto open_factory_origins = factory->GetOpenOrigins();
base::RunLoop loop;
auto callback = base::BarrierClosure(
open_factory_origins.size(), base::BindLambdaForTesting([&]() {
// All leveldb databases are closed, and they can be deleted.
for (auto origin : context_->GetAllOrigins()) {
bool success = false;
storage::mojom::IndexedDBControlAsyncWaiter waiter(
context_.get());
waiter.DeleteForOrigin(origin, &success);
EXPECT_TRUE(success);
}
loop.Quit();
}));
for (auto origin : open_factory_origins) { for (auto origin : open_factory_origins) {
IndexedDBOriginState* per_origin_factory =
factory->GetOriginFactory(origin);
per_origin_factory->backing_store()
->db()
->leveldb_state()
->RequestDestruction(callback,
base::SequencedTaskRunnerHandle::Get());
context_->ForceCloseSync( context_->ForceCloseSync(
origin, origin,
storage::mojom::ForceCloseReason::FORCE_CLOSE_DELETE_ORIGIN); storage::mojom::ForceCloseReason::FORCE_CLOSE_DELETE_ORIGIN);
} }
loop.Run(); // All leveldb databases are closed, and they can be deleted.
for (auto origin : context_->GetAllOrigins()) {
bool success = false;
storage::mojom::IndexedDBControlAsyncWaiter waiter(context_.get());
waiter.DeleteForOrigin(origin, &success);
EXPECT_TRUE(success);
}
} }
if (temp_dir_.IsValid()) if (temp_dir_.IsValid())
ASSERT_TRUE(temp_dir_.Delete()); ASSERT_TRUE(temp_dir_.Delete());
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "base/feature_list.h" #include "base/feature_list.h"
#include "base/rand_util.h" #include "base/rand_util.h"
#include "base/stl_util.h" #include "base/stl_util.h"
#include "base/synchronization/waitable_event.h"
#include "components/services/storage/indexed_db/transactional_leveldb/transactional_leveldb_database.h" #include "components/services/storage/indexed_db/transactional_leveldb/transactional_leveldb_database.h"
#include "components/services/storage/indexed_db/transactional_leveldb/transactional_leveldb_factory.h" #include "components/services/storage/indexed_db/transactional_leveldb/transactional_leveldb_factory.h"
#include "components/services/storage/indexed_db/transactional_leveldb/transactional_leveldb_transaction.h" #include "components/services/storage/indexed_db/transactional_leveldb/transactional_leveldb_transaction.h"
...@@ -109,8 +110,16 @@ IndexedDBOriginState::IndexedDBOriginState( ...@@ -109,8 +110,16 @@ IndexedDBOriginState::IndexedDBOriginState(
IndexedDBOriginState::~IndexedDBOriginState() { IndexedDBOriginState::~IndexedDBOriginState() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (backing_store_ && backing_store_->IsBlobCleanupPending()) if (!backing_store_)
return;
if (backing_store_->IsBlobCleanupPending())
backing_store_->ForceRunBlobCleanup(); backing_store_->ForceRunBlobCleanup();
base::WaitableEvent leveldb_destruct_event;
backing_store_->db()->leveldb_state()->RequestDestruction(
&leveldb_destruct_event);
backing_store_.reset();
leveldb_destruct_event.Wait();
} }
void IndexedDBOriginState::AbortAllTransactions(bool compact) { void IndexedDBOriginState::AbortAllTransactions(bool compact) {
......
...@@ -213,7 +213,7 @@ class CONTENT_EXPORT IndexedDBOriginState { ...@@ -213,7 +213,7 @@ class CONTENT_EXPORT IndexedDBOriginState {
ClosingState closing_stage_ = ClosingState::kNotClosing; ClosingState closing_stage_ = ClosingState::kNotClosing;
base::OneShotTimer close_timer_; base::OneShotTimer close_timer_;
const std::unique_ptr<DisjointRangeLockManager> lock_manager_; const std::unique_ptr<DisjointRangeLockManager> lock_manager_;
const std::unique_ptr<IndexedDBBackingStore> backing_store_; std::unique_ptr<IndexedDBBackingStore> backing_store_;
OriginDBMap databases_; OriginDBMap databases_;
// This is the refcount for the number of IndexedDBOriginStateHandle's given // This is the refcount for the number of IndexedDBOriginStateHandle's given
......
...@@ -119,33 +119,20 @@ class IndexedDBTest : public testing::Test { ...@@ -119,33 +119,20 @@ class IndexedDBTest : public testing::Test {
// deletion of the leveldb state. Once the states are no longer around, // deletion of the leveldb state. Once the states are no longer around,
// delete all of the databases on disk. // delete all of the databases on disk.
auto open_factory_origins = factory->GetOpenOrigins(); auto open_factory_origins = factory->GetOpenOrigins();
base::RunLoop loop;
auto callback = base::BarrierClosure(
open_factory_origins.size(), base::BindLambdaForTesting([&]() {
// All leveldb databases are closed, and they can be deleted.
for (auto origin : context_->GetAllOrigins()) {
bool success = false;
storage::mojom::IndexedDBControlAsyncWaiter waiter(
context_.get());
waiter.DeleteForOrigin(origin, &success);
EXPECT_TRUE(success);
}
loop.Quit();
}));
for (auto origin : open_factory_origins) { for (auto origin : open_factory_origins) {
IndexedDBOriginState* per_origin_factory =
factory->GetOriginFactory(origin);
per_origin_factory->backing_store()
->db()
->leveldb_state()
->RequestDestruction(callback,
base::SequencedTaskRunnerHandle::Get());
context_->ForceCloseSync( context_->ForceCloseSync(
origin, origin,
storage::mojom::ForceCloseReason::FORCE_CLOSE_DELETE_ORIGIN); storage::mojom::ForceCloseReason::FORCE_CLOSE_DELETE_ORIGIN);
} }
loop.Run(); // All leveldb databases are closed, and they can be deleted.
for (auto origin : context_->GetAllOrigins()) {
bool success = false;
storage::mojom::IndexedDBControlAsyncWaiter waiter(context_.get());
waiter.DeleteForOrigin(origin, &success);
EXPECT_TRUE(success);
}
} }
if (temp_dir_.IsValid()) if (temp_dir_.IsValid())
ASSERT_TRUE(temp_dir_.Delete()); ASSERT_TRUE(temp_dir_.Delete());
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment