Commit 558ef0cf authored by Adrienne Walker's avatar Adrienne Walker Committed by Commit Bot

indexeddb: remove IO thread functions from ChainedBlobWriter

These functions are unnecessary and make it more complicated to
remove the use of BrowserThread::IO from idb code.

In particular, the delegate can be owned by the callback rather
than by the ChainedBlobWriter, and the flush policy can be
accessed from the idb sequence instead.

Change-Id: I8d73085a9aa374699305cd37457a2f075bb2a791
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1954524
Commit-Queue: enne <enne@chromium.org>
Reviewed-by: default avatarDaniel Murphy <dmurph@chromium.org>
Cr-Commit-Position: refs/heads/master@{#722608}
parent b5675935
...@@ -1386,11 +1386,11 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl ...@@ -1386,11 +1386,11 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl
static scoped_refptr<ChainedBlobWriterImpl> Create( static scoped_refptr<ChainedBlobWriterImpl> Create(
int64_t database_id, int64_t database_id,
WriteDescriptorVec* blobs, WriteDescriptorVec* blobs,
blink::mojom::IDBTransactionDurability durability, storage::FlushPolicy flush_policy,
WriteBlobFileCallback write_file_callback, WriteBlobFileCallback write_file_callback,
BlobWriteCallback callback) { BlobWriteCallback callback) {
auto writer = base::WrapRefCounted(new ChainedBlobWriterImpl( auto writer = base::WrapRefCounted(new ChainedBlobWriterImpl(
database_id, durability, std::move(write_file_callback), database_id, flush_policy, std::move(write_file_callback),
std::move(callback))); std::move(callback)));
writer->blobs_.swap(*blobs); writer->blobs_.swap(*blobs);
writer->iter_ = writer->blobs_.begin(); writer->iter_ = writer->blobs_.begin();
...@@ -1400,19 +1400,11 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl ...@@ -1400,19 +1400,11 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl
return writer; return writer;
} }
void set_delegate(std::unique_ptr<FileWriterDelegate> delegate) override {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
delegate_.reset(delegate.release());
}
void ReportWriteCompletion(bool succeeded, int64_t bytes_written) override { void ReportWriteCompletion(bool succeeded, int64_t bytes_written) override {
DCHECK_CALLED_ON_VALID_SEQUENCE(idb_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(idb_sequence_checker_);
DCHECK(waiting_for_callback_); DCHECK(waiting_for_callback_);
DCHECK(!succeeded || bytes_written >= 0); DCHECK(!succeeded || bytes_written >= 0);
waiting_for_callback_ = false; waiting_for_callback_ = false;
if (delegate_.get()) // Only present for Blob, not File.
base::DeleteSoon(FROM_HERE, {content::BrowserThread::IO},
delegate_.release());
if (aborted_) { if (aborted_) {
self_ref_ = nullptr; self_ref_ = nullptr;
return; return;
...@@ -1436,24 +1428,20 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl ...@@ -1436,24 +1428,20 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl
} }
storage::FlushPolicy GetFlushPolicy() const override { storage::FlushPolicy GetFlushPolicy() const override {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO); DCHECK_CALLED_ON_VALID_SEQUENCE(idb_sequence_checker_);
return IndexedDBBackingStore::ShouldSyncOnCommit(durability_) return flush_policy_;
? storage::FlushPolicy::FLUSH_ON_COMPLETION
: storage::FlushPolicy::NO_FLUSH_ON_COMPLETION;
} }
private: private:
// Must be called on the IDB task runner. // Must be called on the IDB task runner.
ChainedBlobWriterImpl(int64_t database_id, ChainedBlobWriterImpl(int64_t database_id,
blink::mojom::IDBTransactionDurability durability, storage::FlushPolicy flush_policy,
WriteBlobFileCallback write_file_callback, WriteBlobFileCallback write_file_callback,
BlobWriteCallback callback) BlobWriteCallback callback)
: waiting_for_callback_(false), : flush_policy_(flush_policy),
durability_(durability),
database_id_(database_id), database_id_(database_id),
write_file_callback_(std::move(write_file_callback)), write_file_callback_(std::move(write_file_callback)),
callback_(std::move(callback)), callback_(std::move(callback)) {}
aborted_(false) {}
~ChainedBlobWriterImpl() override { ~ChainedBlobWriterImpl() override {
DCHECK_CALLED_ON_VALID_SEQUENCE(idb_sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(idb_sequence_checker_);
} }
...@@ -1478,8 +1466,7 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl ...@@ -1478,8 +1466,7 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl
} }
} }
bool waiting_for_callback_; storage::FlushPolicy flush_policy_;
blink::mojom::IDBTransactionDurability durability_;
scoped_refptr<ChainedBlobWriterImpl> self_ref_; scoped_refptr<ChainedBlobWriterImpl> self_ref_;
WriteDescriptorVec blobs_; WriteDescriptorVec blobs_;
WriteDescriptorVec::const_iterator iter_; WriteDescriptorVec::const_iterator iter_;
...@@ -1489,8 +1476,8 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl ...@@ -1489,8 +1476,8 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl
// operations queue. Errors are instead handled in // operations queue. Errors are instead handled in
// IndexedDBTransaction::BlobWriteComplete. // IndexedDBTransaction::BlobWriteComplete.
BlobWriteCallback callback_; BlobWriteCallback callback_;
std::unique_ptr<FileWriterDelegate> delegate_; bool aborted_ = false;
bool aborted_; bool waiting_for_callback_ = false;
SEQUENCE_CHECKER(idb_sequence_checker_); SEQUENCE_CHECKER(idb_sequence_checker_);
...@@ -1507,10 +1494,12 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl ...@@ -1507,10 +1494,12 @@ class IndexedDBBackingStore::Transaction::ChainedBlobWriterImpl
class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> { class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> {
public: public:
LocalWriteClosure(scoped_refptr<ChainedBlobWriter> chained_blob_writer, LocalWriteClosure(scoped_refptr<ChainedBlobWriter> chained_blob_writer,
std::unique_ptr<FileWriterDelegate> delegate,
scoped_refptr<base::SequencedTaskRunner> idb_task_runner, scoped_refptr<base::SequencedTaskRunner> idb_task_runner,
FilePath file_path, FilePath file_path,
base::Time last_modified) base::Time last_modified)
: chained_blob_writer_(std::move(chained_blob_writer)), : chained_blob_writer_(std::move(chained_blob_writer)),
delegate_(std::move(delegate)),
idb_task_runner_(std::move(idb_task_runner)), idb_task_runner_(std::move(idb_task_runner)),
file_path_(std::move(file_path)), file_path_(std::move(file_path)),
last_modified_(last_modified) { last_modified_(last_modified) {
...@@ -1522,6 +1511,7 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> { ...@@ -1522,6 +1511,7 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> {
int64_t bytes, int64_t bytes,
FileWriterDelegate::WriteProgressStatus write_status) { FileWriterDelegate::WriteProgressStatus write_status) {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO); DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
DCHECK_GE(bytes, 0); DCHECK_GE(bytes, 0);
bytes_written_ += bytes; bytes_written_ += bytes;
if (write_status == FileWriterDelegate::SUCCESS_IO_PENDING) if (write_status == FileWriterDelegate::SUCCESS_IO_PENDING)
...@@ -1567,10 +1557,14 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> { ...@@ -1567,10 +1557,14 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> {
}, },
success, file_path_, last_modified_, bytes_written_, success, file_path_, last_modified_, bytes_written_,
chained_blob_writer_)); chained_blob_writer_));
// Explicitly delete FileWriterDelegate to break reference cycle.
delegate_.reset();
} }
static void WriteBlobToFileOnIOThread( static void WriteBlobToFileOnIOThread(
scoped_refptr<ChainedBlobWriter> chained_blob_writer, scoped_refptr<ChainedBlobWriter> chained_blob_writer,
storage::FlushPolicy flush_policy,
scoped_refptr<base::SequencedTaskRunner> idb_task_runner, scoped_refptr<base::SequencedTaskRunner> idb_task_runner,
const FilePath& file_path, const FilePath& file_path,
mojo::SharedRemote<blink::mojom::Blob> blob, mojo::SharedRemote<blink::mojom::Blob> blob,
...@@ -1581,8 +1575,7 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> { ...@@ -1581,8 +1575,7 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> {
idb_task_runner.get(), file_path, 0, idb_task_runner.get(), file_path, 0,
storage::FileStreamWriter::CREATE_NEW_FILE); storage::FileStreamWriter::CREATE_NEW_FILE);
std::unique_ptr<FileWriterDelegate> delegate( std::unique_ptr<FileWriterDelegate> delegate(
std::make_unique<FileWriterDelegate>( std::make_unique<FileWriterDelegate>(std::move(writer), flush_policy));
std::move(writer), chained_blob_writer->GetFlushPolicy()));
DCHECK(blob); DCHECK(blob);
MojoCreateDataPipeOptions options; MojoCreateDataPipeOptions options;
...@@ -1602,12 +1595,14 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> { ...@@ -1602,12 +1595,14 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> {
} }
blob->ReadAll(std::move(producer_handle), mojo::NullRemote()); blob->ReadAll(std::move(producer_handle), mojo::NullRemote());
scoped_refptr<LocalWriteClosure> write_closure(new LocalWriteClosure( auto* raw_delegate = delegate.get();
chained_blob_writer, idb_task_runner, file_path, last_modified)); scoped_refptr<LocalWriteClosure> write_closure(
delegate->Start( new LocalWriteClosure(chained_blob_writer, std::move(delegate),
idb_task_runner, file_path, last_modified));
raw_delegate->Start(
std::move(consumer_handle), std::move(consumer_handle),
base::BindRepeating(&LocalWriteClosure::Run, std::move(write_closure))); base::BindRepeating(&LocalWriteClosure::Run, std::move(write_closure)));
chained_blob_writer->set_delegate(std::move(delegate));
} }
private: private:
...@@ -1621,6 +1616,10 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> { ...@@ -1621,6 +1616,10 @@ class LocalWriteClosure : public base::RefCountedThreadSafe<LocalWriteClosure> {
friend class base::RefCountedThreadSafe<LocalWriteClosure>; friend class base::RefCountedThreadSafe<LocalWriteClosure>;
scoped_refptr<ChainedBlobWriter> chained_blob_writer_; scoped_refptr<ChainedBlobWriter> chained_blob_writer_;
// Preserve lifetime of delegate until Run is called. Note, this is a
// circular reference in that: |this| owns |delegate_| which owns a callback
// which owns |this|. This is broken during the Run callback explicitly.
std::unique_ptr<FileWriterDelegate> delegate_;
scoped_refptr<base::SequencedTaskRunner> idb_task_runner_; scoped_refptr<base::SequencedTaskRunner> idb_task_runner_;
int64_t bytes_written_ = 0; int64_t bytes_written_ = 0;
...@@ -1675,7 +1674,8 @@ bool IndexedDBBackingStore::WriteBlobFile( ...@@ -1675,7 +1674,8 @@ bool IndexedDBBackingStore::WriteBlobFile(
base::PostTask( base::PostTask(
FROM_HERE, {content::BrowserThread::IO}, FROM_HERE, {content::BrowserThread::IO},
base::BindOnce(&LocalWriteClosure::WriteBlobToFileOnIOThread, base::BindOnce(&LocalWriteClosure::WriteBlobToFileOnIOThread,
base::WrapRefCounted(chained_blob_writer), task_runner_, base::WrapRefCounted(chained_blob_writer),
chained_blob_writer->GetFlushPolicy(), task_runner_,
path, descriptor.blob(), descriptor.last_modified())); path, descriptor.blob(), descriptor.last_modified()));
} }
return true; return true;
...@@ -3289,8 +3289,12 @@ leveldb::Status IndexedDBBackingStore::Transaction::WriteNewBlobs( ...@@ -3289,8 +3289,12 @@ leveldb::Status IndexedDBBackingStore::Transaction::WriteNewBlobs(
} }
// Creating the writer will start it going asynchronously. The transaction // Creating the writer will start it going asynchronously. The transaction
// can be destructed before the callback is triggered. // can be destructed before the callback is triggered.
auto flush_policy = IndexedDBBackingStore::ShouldSyncOnCommit(durability_)
? storage::FlushPolicy::FLUSH_ON_COMPLETION
: storage::FlushPolicy::NO_FLUSH_ON_COMPLETION;
chained_blob_writer_ = ChainedBlobWriterImpl::Create( chained_blob_writer_ = ChainedBlobWriterImpl::Create(
database_id_, new_files_to_write, durability_, database_id_, new_files_to_write, flush_policy,
base::BindRepeating( base::BindRepeating(
[](base::WeakPtr<IndexedDBBackingStore> backing_store, [](base::WeakPtr<IndexedDBBackingStore> backing_store,
int64_t database_id, const WriteDescriptor& descriptor, int64_t database_id, const WriteDescriptor& descriptor,
......
...@@ -20,10 +20,6 @@ ...@@ -20,10 +20,6 @@
#include "storage/common/file_system/file_system_mount_option.h" #include "storage/common/file_system/file_system_mount_option.h"
#include "third_party/leveldatabase/src/include/leveldb/status.h" #include "third_party/leveldatabase/src/include/leveldb/status.h"
namespace storage {
class FileWriterDelegate;
} // namespace storage
namespace content { namespace content {
// This file contains all of the classes & types used to store blobs in // This file contains all of the classes & types used to store blobs in
...@@ -118,10 +114,6 @@ typedef std::vector<WriteDescriptor> WriteDescriptorVec; ...@@ -118,10 +114,6 @@ typedef std::vector<WriteDescriptor> WriteDescriptorVec;
// This class facilitates writing multiple blobs to files. // This class facilitates writing multiple blobs to files.
class ChainedBlobWriter : public base::RefCountedThreadSafe<ChainedBlobWriter> { class ChainedBlobWriter : public base::RefCountedThreadSafe<ChainedBlobWriter> {
public: public:
// Called on the IO thread.
virtual void set_delegate(
std::unique_ptr<storage::FileWriterDelegate> delegate) = 0;
// Called on the IDB task runner. // Called on the IDB task runner.
virtual void ReportWriteCompletion(bool succeeded, int64_t bytes_written) = 0; virtual void ReportWriteCompletion(bool succeeded, int64_t bytes_written) = 0;
...@@ -129,7 +121,7 @@ class ChainedBlobWriter : public base::RefCountedThreadSafe<ChainedBlobWriter> { ...@@ -129,7 +121,7 @@ class ChainedBlobWriter : public base::RefCountedThreadSafe<ChainedBlobWriter> {
virtual void Abort() = 0; virtual void Abort() = 0;
// Whether to flush to the file system when writing or not. // Whether to flush to the file system when writing or not.
// Called on the IO thread. // Called on the IDB task runner.
virtual storage::FlushPolicy GetFlushPolicy() const = 0; virtual storage::FlushPolicy GetFlushPolicy() const = 0;
protected: protected:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment