Commit 97579817 authored by Marijn Kruisselbrink's avatar Marijn Kruisselbrink Committed by Commit Bot

[FileSystem] Add support for passing ReadableStream to write.

Bug: 872462
Change-Id: I0f20f1c71d3d9aee53a2f875198e1e4b3461d90c
Reviewed-on: https://chromium-review.googlesource.com/1232397Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Reviewed-by: default avatarYutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarDaniel Murphy <dmurph@chromium.org>
Commit-Queue: Marijn Kruisselbrink <mek@chromium.org>
Cr-Commit-Position: refs/heads/master@{#595711}
parent 2afc38d9
......@@ -206,7 +206,7 @@ void SyncableFileSystemOperation::Remove(const FileSystemURL& url,
operation_runner_->PostOperationTask(std::move(task));
}
void SyncableFileSystemOperation::Write(
void SyncableFileSystemOperation::WriteBlob(
const FileSystemURL& url,
std::unique_ptr<storage::FileWriterDelegate> writer_delegate,
std::unique_ptr<storage::BlobReader> blob_reader,
......@@ -222,12 +222,34 @@ void SyncableFileSystemOperation::Write(
std::unique_ptr<SyncableFileOperationRunner::Task> task(new QueueableTask(
weak_factory_.GetWeakPtr(),
base::Bind(
&FileSystemOperation::Write, base::Unretained(impl_.get()), url,
&FileSystemOperation::WriteBlob, base::Unretained(impl_.get()), url,
base::Passed(&writer_delegate), base::Passed(&blob_reader),
base::Bind(&self::DidWrite, weak_factory_.GetWeakPtr(), callback))));
operation_runner_->PostOperationTask(std::move(task));
}
void SyncableFileSystemOperation::Write(
const FileSystemURL& url,
std::unique_ptr<storage::FileWriterDelegate> writer_delegate,
mojo::ScopedDataPipeConsumerHandle data_pipe,
const WriteCallback& callback) {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
if (!operation_runner_.get()) {
callback.Run(base::File::FILE_ERROR_NOT_FOUND, 0, true);
return;
}
DCHECK(operation_runner_.get());
target_paths_.push_back(url);
completion_callback_ = base::BindOnce(&WriteCallbackAdapter, callback);
std::unique_ptr<SyncableFileOperationRunner::Task> task(new QueueableTask(
weak_factory_.GetWeakPtr(),
base::Bind(
&FileSystemOperation::Write, base::Unretained(impl_.get()), url,
base::Passed(&writer_delegate), base::Passed(&data_pipe),
base::Bind(&self::DidWrite, weak_factory_.GetWeakPtr(), callback))));
operation_runner_->PostOperationTask(std::move(task));
}
void SyncableFileSystemOperation::Truncate(const FileSystemURL& url,
int64_t length,
StatusCallback callback) {
......
......@@ -61,9 +61,13 @@ class SyncableFileSystemOperation : public storage::FileSystemOperation {
void Remove(const storage::FileSystemURL& url,
bool recursive,
StatusCallback callback) override;
void WriteBlob(const storage::FileSystemURL& url,
std::unique_ptr<storage::FileWriterDelegate> writer_delegate,
std::unique_ptr<storage::BlobReader> blob_reader,
const WriteCallback& callback) override;
void Write(const storage::FileSystemURL& url,
std::unique_ptr<storage::FileWriterDelegate> writer_delegate,
std::unique_ptr<storage::BlobReader> blob_reader,
mojo::ScopedDataPipeConsumerHandle data_pipe,
const WriteCallback& callback) override;
void Truncate(const storage::FileSystemURL& url,
int64_t length,
......
......@@ -320,9 +320,15 @@ class FileSystemOperation {
StatusCallback callback) = 0;
// Writes the data read from |blob_reader| using |writer_delegate|.
virtual void WriteBlob(const FileSystemURL& url,
std::unique_ptr<FileWriterDelegate> writer_delegate,
std::unique_ptr<BlobReader> blob_reader,
const WriteCallback& callback) = 0;
// Writes the data read from |data_pipe| using |writer_delegate|.
virtual void Write(const FileSystemURL& url,
std::unique_ptr<FileWriterDelegate> writer_delegate,
std::unique_ptr<BlobReader> blob_reader,
mojo::ScopedDataPipeConsumerHandle data_pipe,
const WriteCallback& callback) = 0;
// Truncates a file at |path| to |length|. If |length| is larger than
......
......@@ -193,7 +193,7 @@ void FileSystemOperationImpl::Remove(const FileSystemURL& url,
recursive_operation_delegate_->Run();
}
void FileSystemOperationImpl::Write(
void FileSystemOperationImpl::WriteBlob(
const FileSystemURL& url,
std::unique_ptr<FileWriterDelegate> writer_delegate,
std::unique_ptr<BlobReader> blob_reader,
......@@ -206,6 +206,19 @@ void FileSystemOperationImpl::Write(
url, callback));
}
void FileSystemOperationImpl::Write(
const FileSystemURL& url,
std::unique_ptr<FileWriterDelegate> writer_delegate,
mojo::ScopedDataPipeConsumerHandle data_pipe,
const WriteCallback& callback) {
DCHECK(SetPendingOperationType(kOperationWrite));
file_writer_delegate_ = std::move(writer_delegate);
file_writer_delegate_->Start(
std::move(data_pipe),
base::BindRepeating(&FileSystemOperationImpl::DidWrite,
weak_factory_.GetWeakPtr(), url, callback));
}
void FileSystemOperationImpl::Truncate(const FileSystemURL& url,
int64_t length,
StatusCallback callback) {
......
......@@ -61,9 +61,13 @@ class STORAGE_EXPORT FileSystemOperationImpl : public FileSystemOperation {
void Remove(const FileSystemURL& url,
bool recursive,
StatusCallback callback) override;
void WriteBlob(const FileSystemURL& url,
std::unique_ptr<FileWriterDelegate> writer_delegate,
std::unique_ptr<BlobReader> blob_reader,
const WriteCallback& callback) override;
void Write(const FileSystemURL& url,
std::unique_ptr<FileWriterDelegate> writer_delegate,
std::unique_ptr<BlobReader> blob_reader,
mojo::ScopedDataPipeConsumerHandle data_pipe,
const WriteCallback& callback) override;
void Truncate(const FileSystemURL& url,
int64_t length,
......
......@@ -266,9 +266,44 @@ OperationID FileSystemOperationRunner::Write(
blob_reader = blob->CreateReader();
PrepareForWrite(id, url);
operation_raw->Write(url, std::move(writer_delegate), std::move(blob_reader),
base::Bind(&FileSystemOperationRunner::DidWrite,
weak_ptr_, id, callback));
operation_raw->WriteBlob(
url, std::move(writer_delegate), std::move(blob_reader),
base::BindRepeating(&FileSystemOperationRunner::DidWrite, weak_ptr_, id,
callback));
return id;
}
OperationID FileSystemOperationRunner::Write(
const FileSystemURL& url,
mojo::ScopedDataPipeConsumerHandle data_pipe,
int64_t offset,
const WriteCallback& callback) {
base::File::Error error = base::File::FILE_OK;
std::unique_ptr<FileSystemOperation> operation = base::WrapUnique(
file_system_context_->CreateFileSystemOperation(url, &error));
FileSystemOperation* operation_raw = operation.get();
OperationID id = BeginOperation(std::move(operation));
base::AutoReset<bool> beginning(&is_beginning_operation_, true);
if (!operation_raw) {
DidWrite(id, callback, error, 0, true);
return id;
}
std::unique_ptr<FileStreamWriter> writer(
file_system_context_->CreateFileStreamWriter(url, offset));
if (!writer) {
// Write is not supported.
DidWrite(id, callback, base::File::FILE_ERROR_SECURITY, 0, true);
return id;
}
std::unique_ptr<FileWriterDelegate> writer_delegate(new FileWriterDelegate(
std::move(writer), url.mount_option().flush_policy()));
PrepareForWrite(id, url);
operation_raw->Write(url, std::move(writer_delegate), std::move(data_pipe),
base::BindRepeating(&FileSystemOperationRunner::DidWrite,
weak_ptr_, id, callback));
return id;
}
......
......@@ -109,12 +109,18 @@ class STORAGE_EXPORT FileSystemOperationRunner {
bool recursive,
StatusCallback callback);
// Writes contents of |blob_url| to |url| at |offset|.
// Writes contents of |blob| to |url| at |offset|.
OperationID Write(const FileSystemURL& url,
std::unique_ptr<storage::BlobDataHandle> blob,
int64_t offset,
const WriteCallback& callback);
// Writes contents of |data_pipe| to |url| at |offset|.
OperationID Write(const FileSystemURL& url,
mojo::ScopedDataPipeConsumerHandle data_pipe,
int64_t offset,
const WriteCallback& callback);
// Truncates a file at |url| to |length|. If |length| is larger than
// the original file size, the file will be extended, and the extended
// part is filled with null bytes.
......
......@@ -36,6 +36,7 @@ FileWriterDelegate::FileWriterDelegate(
bytes_written_(0),
bytes_read_(0),
io_buffer_(base::MakeRefCounted<net::IOBufferWithSize>(kReadBufSize)),
data_pipe_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
weak_factory_(this) {}
FileWriterDelegate::~FileWriterDelegate() = default;
......@@ -66,9 +67,30 @@ void FileWriterDelegate::Start(std::unique_ptr<BlobReader> blob_reader,
NOTREACHED();
}
void FileWriterDelegate::Start(mojo::ScopedDataPipeConsumerHandle data_pipe,
const DelegateWriteCallback& write_callback) {
write_callback_ = write_callback;
if (!data_pipe) {
OnReadError(base::File::FILE_ERROR_FAILED);
return;
}
data_pipe_ = std::move(data_pipe);
data_pipe_watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&FileWriterDelegate::OnDataPipeReady,
weak_factory_.GetWeakPtr()));
data_pipe_watcher_.ArmOrNotify();
}
void FileWriterDelegate::Cancel() {
// Destroy the reader and invalidate weak ptrs to prevent pending callbacks.
blob_reader_ = nullptr;
data_pipe_watcher_.Cancel();
data_pipe_.reset();
weak_factory_.InvalidateWeakPtrs();
const int status = file_stream_writer_->Cancel(
......@@ -104,24 +126,49 @@ void FileWriterDelegate::OnReadCompleted(int bytes_read) {
void FileWriterDelegate::Read() {
bytes_written_ = 0;
BlobReader::Status status =
blob_reader_->Read(io_buffer_.get(), io_buffer_->size(), &bytes_read_,
base::BindOnce(&FileWriterDelegate::OnReadCompleted,
weak_factory_.GetWeakPtr()));
switch (status) {
case BlobReader::Status::NET_ERROR:
OnReadCompleted(blob_reader_->net_error());
return;
case BlobReader::Status::DONE:
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&FileWriterDelegate::OnReadCompleted,
weak_factory_.GetWeakPtr(), bytes_read_));
return;
case BlobReader::Status::IO_PENDING:
// Do nothing.
return;
if (blob_reader_) {
BlobReader::Status status =
blob_reader_->Read(io_buffer_.get(), io_buffer_->size(), &bytes_read_,
base::BindOnce(&FileWriterDelegate::OnReadCompleted,
weak_factory_.GetWeakPtr()));
switch (status) {
case BlobReader::Status::NET_ERROR:
OnReadCompleted(blob_reader_->net_error());
return;
case BlobReader::Status::DONE:
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&FileWriterDelegate::OnReadCompleted,
weak_factory_.GetWeakPtr(), bytes_read_));
return;
case BlobReader::Status::IO_PENDING:
// Do nothing.
return;
}
NOTREACHED();
return;
}
DCHECK(data_pipe_);
uint32_t num_bytes = io_buffer_->size();
MojoResult result = data_pipe_->ReadData(io_buffer_->data(), &num_bytes,
MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
data_pipe_watcher_.ArmOrNotify();
return;
}
if (result == MOJO_RESULT_OK) {
bytes_read_ = num_bytes;
OnReadCompleted(bytes_read_);
return;
}
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
// Pipe closed, done reading.
OnReadCompleted(0);
return;
}
// Some unknown error, this shouldn't happen.
NOTREACHED();
OnReadError(base::File::FILE_ERROR_FAILED);
}
void FileWriterDelegate::OnDataReceived(int bytes_read) {
......@@ -192,6 +239,8 @@ void FileWriterDelegate::OnReadError(base::File::Error error) {
// Destroy the reader and invalidate weak ptrs to prevent pending callbacks.
blob_reader_.reset();
data_pipe_watcher_.Cancel();
data_pipe_.reset();
weak_factory_.InvalidateWeakPtrs();
if (writing_started_)
......@@ -203,6 +252,8 @@ void FileWriterDelegate::OnReadError(base::File::Error error) {
void FileWriterDelegate::OnWriteError(base::File::Error error) {
// Destroy the reader and invalidate weak ptrs to prevent pending callbacks.
blob_reader_.reset();
data_pipe_watcher_.Cancel();
data_pipe_.reset();
weak_factory_.InvalidateWeakPtrs();
// Errors when writing are not recoverable, so don't bother flushing.
......@@ -270,4 +321,10 @@ void FileWriterDelegate::OnFlushed(base::File::Error error,
write_callback_.Run(error, bytes_written, progress_status);
}
void FileWriterDelegate::OnDataPipeReady(
MojoResult result,
const mojo::HandleSignalsState& state) {
Read();
}
} // namespace storage
......@@ -14,6 +14,8 @@
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/time/time.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "net/base/file_stream.h"
#include "net/base/io_buffer.h"
#include "storage/browser/blob/blob_reader.h"
......@@ -44,6 +46,8 @@ class STORAGE_EXPORT FileWriterDelegate {
void Start(std::unique_ptr<BlobReader> blob_reader,
const DelegateWriteCallback& write_callback);
void Start(mojo::ScopedDataPipeConsumerHandle data_pipe,
const DelegateWriteCallback& write_callback);
// Cancels the current write operation. This will synchronously or
// asynchronously call the given write callback (which may result in
......@@ -56,7 +60,6 @@ class STORAGE_EXPORT FileWriterDelegate {
private:
void OnDidCalculateSize(int net_error);
void Read();
void OnReadCompleted(int bytes_read);
void Write();
......@@ -73,6 +76,9 @@ class STORAGE_EXPORT FileWriterDelegate {
WriteProgressStatus progress_status,
int flush_error);
void OnDataPipeReady(MojoResult result,
const mojo::HandleSignalsState& state);
WriteProgressStatus GetCompletionStatusOnError() const;
DelegateWriteCallback write_callback_;
......@@ -87,8 +93,14 @@ class STORAGE_EXPORT FileWriterDelegate {
base::File::Error saved_read_error_ = base::File::FILE_OK;
scoped_refptr<net::IOBufferWithSize> io_buffer_;
scoped_refptr<net::DrainableIOBuffer> cursor_;
// Used when reading from a blob.
std::unique_ptr<BlobReader> blob_reader_;
// Used when reading from a data pipe.
mojo::ScopedDataPipeConsumerHandle data_pipe_;
mojo::SimpleWatcher data_pipe_watcher_;
base::WeakPtrFactory<FileWriterDelegate> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(FileWriterDelegate);
......
......@@ -5,6 +5,7 @@
#include "storage/browser/fileapi/file_writer_impl.h"
#include "base/callback_helpers.h"
#include "mojo/public/cpp/system/data_pipe_drainer.h"
#include "storage/browser/blob/blob_data_handle.h"
#include "storage/browser/blob/blob_storage_context.h"
......@@ -31,6 +32,25 @@ void FileWriterImpl::Write(uint64_t position,
std::move(callback), position));
}
void FileWriterImpl::WriteStream(uint64_t position,
mojo::ScopedDataPipeConsumerHandle stream,
WriteStreamCallback callback) {
// FileSystemOperationRunner assumes that positions passed to Write are always
// valid, and will NOTREACHED() if that is not the case, so first check the
// size of the file to make sure the position passed in from the renderer is
// in fact valid.
// Of course the file could still change between checking its size and the
// write operation being started, but this is at least a lot better than the
// old implementation where the renderer only checks against how big it thinks
// the file currently is.
operation_runner_->GetMetadata(
url_, FileSystemOperation::GET_METADATA_FIELD_SIZE,
base::BindRepeating(&FileWriterImpl::DoWriteStreamWithFileInfo,
base::Unretained(this),
base::AdaptCallbackForRepeating(std::move(callback)),
position, base::Passed(std::move(stream))));
}
void FileWriterImpl::Truncate(uint64_t length, TruncateCallback callback) {
operation_runner_->Truncate(
url_, length,
......@@ -79,6 +99,23 @@ void FileWriterImpl::DoWriteWithFileInfo(WriteCallback callback,
base::Owned(new WriteState())));
}
void FileWriterImpl::DoWriteStreamWithFileInfo(
WriteStreamCallback callback,
uint64_t position,
mojo::ScopedDataPipeConsumerHandle data_pipe,
base::File::Error result,
const base::File::Info& file_info) {
if (file_info.size < 0 || position > static_cast<uint64_t>(file_info.size)) {
std::move(callback).Run(base::File::FILE_ERROR_FAILED, 0);
return;
}
operation_runner_->Write(
url_, std::move(data_pipe), position,
base::BindRepeating(&FileWriterImpl::DidWrite, base::Unretained(this),
base::AdaptCallbackForRepeating(std::move(callback)),
base::Owned(new WriteState())));
}
void FileWriterImpl::DidWrite(WriteCallback callback,
WriteState* state,
base::File::Error result,
......
......@@ -26,6 +26,9 @@ class STORAGE_EXPORT FileWriterImpl : public blink::mojom::FileWriter {
void Write(uint64_t position,
blink::mojom::BlobPtr blob,
WriteCallback callback) override;
void WriteStream(uint64_t position,
mojo::ScopedDataPipeConsumerHandle stream,
WriteStreamCallback callback) override;
void Truncate(uint64_t length, TruncateCallback callback) override;
private:
......@@ -37,6 +40,11 @@ class STORAGE_EXPORT FileWriterImpl : public blink::mojom::FileWriter {
std::unique_ptr<BlobDataHandle> blob,
base::File::Error result,
const base::File::Info& file_info);
void DoWriteStreamWithFileInfo(WriteCallback callback,
uint64_t position,
mojo::ScopedDataPipeConsumerHandle data_pipe,
base::File::Error result,
const base::File::Info& file_info);
struct WriteState {
uint64_t bytes_written = 0;
......
......@@ -9,6 +9,7 @@
#include "base/guid.h"
#include "base/test/bind_test_util.h"
#include "base/test/scoped_task_environment.h"
#include "mojo/public/cpp/system/string_data_pipe_producer.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/test_completion_callback.h"
......@@ -60,6 +61,25 @@ class FileWriterImplTest : public testing::Test {
return result;
}
mojo::ScopedDataPipeConsumerHandle CreateStream(const std::string& contents) {
// Test with a relatively low capacity pipe to make sure it isn't all
// written/read in one go.
mojo::DataPipe pipe(16);
CHECK(pipe.producer_handle.is_valid());
auto producer = std::make_unique<mojo::StringDataPipeProducer>(
std::move(pipe.producer_handle));
auto* producer_raw = producer.get();
producer_raw->Write(
contents,
mojo::StringDataPipeProducer::AsyncWritingMode::
STRING_MAY_BE_INVALIDATED_BEFORE_COMPLETION,
base::BindOnce(
base::DoNothing::Once<std::unique_ptr<mojo::StringDataPipeProducer>,
MojoResult>(),
std::move(producer)));
return std::move(pipe.consumer_handle);
}
std::string ReadFile(const FileSystemURL& url) {
std::unique_ptr<FileStreamReader> reader =
file_system_context_->CreateFileStreamReader(
......@@ -80,9 +100,9 @@ class FileWriterImplTest : public testing::Test {
}
}
base::File::Error WriteSync(uint64_t position,
blink::mojom::BlobPtr blob,
uint64_t* bytes_written_out) {
base::File::Error WriteBlobSync(uint64_t position,
blink::mojom::BlobPtr blob,
uint64_t* bytes_written_out) {
base::RunLoop loop;
base::File::Error result_out;
writer_->Write(position, std::move(blob),
......@@ -96,6 +116,24 @@ class FileWriterImplTest : public testing::Test {
return result_out;
}
base::File::Error WriteStreamSync(
uint64_t position,
mojo::ScopedDataPipeConsumerHandle data_pipe,
uint64_t* bytes_written_out) {
base::RunLoop loop;
base::File::Error result_out;
writer_->WriteStream(
position, std::move(data_pipe),
base::BindLambdaForTesting(
[&](base::File::Error result, uint64_t bytes_written) {
result_out = result;
*bytes_written_out = bytes_written;
loop.Quit();
}));
loop.Run();
return result_out;
}
base::File::Error TruncateSync(uint64_t length) {
base::RunLoop loop;
base::File::Error result_out;
......@@ -108,6 +146,16 @@ class FileWriterImplTest : public testing::Test {
return result_out;
}
virtual bool WriteUsingBlobs() { return true; }
base::File::Error WriteSync(uint64_t position,
const std::string& contents,
uint64_t* bytes_written_out) {
if (WriteUsingBlobs())
return WriteBlobSync(position, CreateBlob(contents), bytes_written_out);
return WriteStreamSync(position, CreateStream(contents), bytes_written_out);
}
protected:
base::test::ScopedTaskEnvironment scoped_task_environment_;
......@@ -120,55 +168,65 @@ class FileWriterImplTest : public testing::Test {
std::unique_ptr<FileWriterImpl> writer_;
};
class FileWriterImplWriteTest : public FileWriterImplTest,
public testing::WithParamInterface<bool> {
public:
bool WriteUsingBlobs() override { return GetParam(); }
};
INSTANTIATE_TEST_CASE_P(FileWriterImplTest,
FileWriterImplWriteTest,
::testing::Bool());
TEST_F(FileWriterImplTest, WriteInvalidBlob) {
blink::mojom::BlobPtr blob;
MakeRequest(&blob);
uint64_t bytes_written;
base::File::Error result = WriteSync(0, std::move(blob), &bytes_written);
base::File::Error result = WriteBlobSync(0, std::move(blob), &bytes_written);
EXPECT_EQ(result, base::File::FILE_ERROR_FAILED);
EXPECT_EQ(bytes_written, 0u);
EXPECT_EQ("", ReadFile(test_url_));
}
TEST_F(FileWriterImplTest, WriteValidEmptyBlob) {
TEST_P(FileWriterImplWriteTest, WriteValidEmptyString) {
uint64_t bytes_written;
base::File::Error result = WriteSync(0, CreateBlob(""), &bytes_written);
base::File::Error result = WriteSync(0, "", &bytes_written);
EXPECT_EQ(result, base::File::FILE_OK);
EXPECT_EQ(bytes_written, 0u);
EXPECT_EQ("", ReadFile(test_url_));
}
TEST_F(FileWriterImplTest, WriteValidBlob) {
TEST_P(FileWriterImplWriteTest, WriteValidNonEmpty) {
std::string test_data("abcdefghijklmnopqrstuvwxyz");
uint64_t bytes_written;
base::File::Error result =
WriteSync(0, CreateBlob("1234567890"), &bytes_written);
base::File::Error result = WriteSync(0, test_data, &bytes_written);
EXPECT_EQ(result, base::File::FILE_OK);
EXPECT_EQ(bytes_written, 10u);
EXPECT_EQ(bytes_written, test_data.size());
EXPECT_EQ("1234567890", ReadFile(test_url_));
EXPECT_EQ(test_data, ReadFile(test_url_));
}
TEST_F(FileWriterImplTest, WriteWithOffsetInFile) {
TEST_P(FileWriterImplWriteTest, WriteWithOffsetInFile) {
uint64_t bytes_written;
base::File::Error result;
result = WriteSync(0, CreateBlob("1234567890"), &bytes_written);
result = WriteSync(0, "1234567890", &bytes_written);
EXPECT_EQ(result, base::File::FILE_OK);
EXPECT_EQ(bytes_written, 10u);
result = WriteSync(4, CreateBlob("abc"), &bytes_written);
result = WriteSync(4, "abc", &bytes_written);
EXPECT_EQ(result, base::File::FILE_OK);
EXPECT_EQ(bytes_written, 3u);
EXPECT_EQ("1234abc890", ReadFile(test_url_));
}
TEST_F(FileWriterImplTest, WriteWithOffsetPastFile) {
TEST_P(FileWriterImplWriteTest, WriteWithOffsetPastFile) {
uint64_t bytes_written;
base::File::Error result = WriteSync(4, CreateBlob("abc"), &bytes_written);
base::File::Error result = WriteSync(4, "abc", &bytes_written);
EXPECT_EQ(result, base::File::FILE_ERROR_FAILED);
EXPECT_EQ(bytes_written, 0u);
......@@ -179,7 +237,7 @@ TEST_F(FileWriterImplTest, TruncateShrink) {
uint64_t bytes_written;
base::File::Error result;
result = WriteSync(0, CreateBlob("1234567890"), &bytes_written);
result = WriteSync(0, "1234567890", &bytes_written);
EXPECT_EQ(result, base::File::FILE_OK);
EXPECT_EQ(bytes_written, 10u);
......@@ -193,7 +251,7 @@ TEST_F(FileWriterImplTest, TruncateGrow) {
uint64_t bytes_written;
base::File::Error result;
result = WriteSync(0, CreateBlob("abc"), &bytes_written);
result = WriteSync(0, "abc", &bytes_written);
EXPECT_EQ(result, base::File::FILE_OK);
EXPECT_EQ(bytes_written, 3u);
......
......@@ -69,4 +69,34 @@ promise_test(async t => {
assert_equals(await getFileContents(handle), 'abc\0\0');
assert_equals(await getFileSize(handle), 5);
}, 'truncate() to grow a file');
promise_test(async t => {
const handle = await createEmptyFile(t, 'write_stream');
const writer = await handle.createWriter();
const stream = new Response('1234567890').body;
await writer.write(0, stream);
assert_equals(await getFileContents(handle), '1234567890');
assert_equals(await getFileSize(handle), 10);
}, 'write() called with a ReadableStream');
promise_test(async t => {
const handle = await createEmptyFile(t, 'write_stream');
const handle_writer = await handle.createWriter();
const { writable, readable } = new TransformStream();
const write_result = handle_writer.write(0, readable);
const stream_writer = writable.getWriter();
stream_writer.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21]));
garbageCollect();
stream_writer.write(new Uint8Array([0x21, 0x21]));
stream_writer.close();
await write_result;
assert_equals(await getFileContents(handle), 'streams!!!');
assert_equals(await getFileSize(handle), 10);
}, 'Using a WritableStream writer to write');
</script>
......@@ -58,4 +58,20 @@ async function createFileWithContents(test, name, contents, parent) {
const writer = await handle.createWriter();
await writer.write(0, new Blob([contents]));
return handle;
}
\ No newline at end of file
}
function garbageCollect() {
if (self.gc) {
// Use --expose_gc for V8 (and Node.js)
// Exposed in SpiderMonkey shell as well
self.gc();
} else if (self.GCController) {
// Present in some WebKit development environments
GCController.collect();
} else {
/* eslint-disable no-console */
console.warn('Tests are running without the ability to do manual garbage collection. They will still work, but ' +
'coverage will be suboptimal.');
/* eslint-enable no-console */
}
};
\ No newline at end of file
......@@ -17,6 +17,14 @@ interface FileWriter {
Write(uint64 position, Blob blob) => (mojo_base.mojom.FileError result,
uint64 bytes_written);
// Write data from |stream| to the given |position| in the file being written
// to. Returns whether the operation succeeded and if so how many bytes were
// written.
// TODO(mek): This might need some way of reporting progress events back to
// the renderer.
WriteStream(uint64 position, handle<data_pipe_consumer> stream) =>
(mojo_base.mojom.FileError result, uint64 bytes_written);
// Changes the length of the file to be |length|. If |length| is larger than
// the current size of the file, the file will be extended, and the extended
// part is filled with null bytes.
......
......@@ -5,9 +5,13 @@
#include "third_party/blink/renderer/modules/filesystem/file_system_writer.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_blob.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/fetch/fetch_data_loader.h"
#include "third_party/blink/renderer/core/fetch/readable_stream_bytes_consumer.h"
#include "third_party/blink/renderer/core/fileapi/blob.h"
#include "third_party/blink/renderer/core/fileapi/file_error.h"
#include "third_party/blink/renderer/core/streams/readable_stream_operations.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
namespace blink {
......@@ -19,7 +23,26 @@ FileSystemWriter::FileSystemWriter(mojom::blink::FileWriterPtr writer)
ScriptPromise FileSystemWriter::write(ScriptState* script_state,
uint64_t position,
Blob* blob) {
ScriptValue data,
ExceptionState& exception_state) {
v8::Isolate* isolate = script_state->GetIsolate();
if (V8Blob::hasInstance(data.V8Value(), isolate)) {
Blob* blob = V8Blob::ToImpl(data.V8Value().As<v8::Object>());
return WriteBlob(script_state, position, blob);
}
if (!ReadableStreamOperations::IsReadableStream(script_state, data,
exception_state)
.value_or(false)) {
if (!exception_state.HadException())
exception_state.ThrowTypeError("data should be a Blob or ReadableStream");
return ScriptPromise();
}
return WriteStream(script_state, position, data, exception_state);
}
ScriptPromise FileSystemWriter::WriteBlob(ScriptState* script_state,
uint64_t position,
Blob* blob) {
if (!writer_ || pending_operation_) {
return ScriptPromise::RejectWithDOMException(
script_state,
......@@ -33,6 +56,117 @@ ScriptPromise FileSystemWriter::write(ScriptState* script_state,
return result;
}
class FileSystemWriter::StreamWriterClient
: public GarbageCollectedFinalized<StreamWriterClient>,
public FetchDataLoader::Client {
USING_GARBAGE_COLLECTED_MIXIN(StreamWriterClient);
public:
explicit StreamWriterClient(FileSystemWriter* writer) : writer_(writer) {}
void DidFetchDataLoadedDataPipe() override {
// WriteComplete could have been called with an error before we reach this
// point, in that case just return.
if (did_complete_)
return;
DCHECK(!did_finish_writing_to_pipe_);
DCHECK(writer_->pending_operation_);
did_finish_writing_to_pipe_ = true;
}
void DidFetchDataLoadFailed() override {
// WriteComplete could have been called with an error before we reach this
// point, in that case just return.
if (did_complete_)
return;
DCHECK(writer_->pending_operation_);
did_complete_ = true;
writer_->pending_operation_->Reject(
FileError::CreateDOMException(base::File::FILE_ERROR_FAILED));
Reset();
}
void Abort() override {
// WriteComplete could have been called with an error before we reach this
// point, in that case just return.
if (did_complete_)
return;
DCHECK(writer_->pending_operation_);
did_complete_ = true;
writer_->pending_operation_->Reject(
FileError::CreateDOMException(base::File::FILE_ERROR_ABORT));
Reset();
}
void WriteComplete(base::File::Error result, uint64_t bytes_written) {
// Early return if we already completed (with an error) before.
if (did_complete_)
return;
DCHECK(writer_->pending_operation_);
did_complete_ = true;
if (result != base::File::FILE_OK) {
writer_->pending_operation_->Reject(
FileError::CreateDOMException(result));
} else {
DCHECK(did_finish_writing_to_pipe_);
writer_->pending_operation_->Resolve();
}
Reset();
}
void Trace(Visitor* visitor) override {
Client::Trace(visitor);
visitor->Trace(writer_);
}
private:
void Reset() {
writer_->pending_operation_ = nullptr;
writer_->stream_loader_ = nullptr;
}
Member<FileSystemWriter> writer_;
bool did_finish_writing_to_pipe_ = false;
bool did_complete_ = false;
};
ScriptPromise FileSystemWriter::WriteStream(ScriptState* script_state,
uint64_t position,
ScriptValue stream,
ExceptionState& exception_state) {
if (!writer_ || pending_operation_) {
return ScriptPromise::RejectWithDOMException(
script_state,
DOMException::Create(DOMExceptionCode::kInvalidStateError));
}
DCHECK(!stream_loader_);
auto reader = ReadableStreamOperations::GetReader(script_state, stream,
exception_state);
if (exception_state.HadException())
return ScriptPromise();
auto* consumer = new ReadableStreamBytesConsumer(script_state, reader);
mojo::DataPipe pipe;
if (!pipe.consumer_handle.is_valid()) {
return ScriptPromise::RejectWithDOMException(
script_state, DOMException::Create(DOMExceptionCode::kInvalidStateError,
"Failed to create data pipe"));
}
stream_loader_ = FetchDataLoader::CreateLoaderAsDataPipe(
std::move(pipe.producer_handle),
ExecutionContext::From(script_state)
->GetTaskRunner(TaskType::kInternalDefault));
pending_operation_ = ScriptPromiseResolver::Create(script_state);
ScriptPromise result = pending_operation_->Promise();
auto* client = new StreamWriterClient(this);
stream_loader_->Start(consumer, client);
writer_->WriteStream(
position, std::move(pipe.consumer_handle),
WTF::Bind(&StreamWriterClient::WriteComplete, WrapPersistent(client)));
return result;
}
ScriptPromise FileSystemWriter::truncate(ScriptState* script_state,
uint64_t size) {
if (!writer_ || pending_operation_) {
......@@ -60,6 +194,7 @@ ScriptPromise FileSystemWriter::close(ScriptState* script_state) {
void FileSystemWriter::Trace(Visitor* visitor) {
ScriptWrappable::Trace(visitor);
visitor->Trace(pending_operation_);
visitor->Trace(stream_loader_);
}
void FileSystemWriter::WriteComplete(base::File::Error result,
......
......@@ -7,13 +7,17 @@
#include "third_party/blink/public/mojom/filesystem/file_writer.mojom-blink.h"
#include "third_party/blink/renderer/platform/bindings/script_wrappable.h"
#include "third_party/blink/renderer/platform/bindings/trace_wrapper_member.h"
namespace blink {
class Blob;
class ExceptionState;
class FetchDataLoader;
class ScriptPromise;
class ScriptPromiseResolver;
class ScriptState;
class ScriptValue;
class FileSystemWriter final : public ScriptWrappable {
DEFINE_WRAPPERTYPEINFO();
......@@ -21,19 +25,31 @@ class FileSystemWriter final : public ScriptWrappable {
public:
explicit FileSystemWriter(mojom::blink::FileWriterPtr);
ScriptPromise write(ScriptState*, uint64_t position, Blob*);
ScriptPromise write(ScriptState*,
uint64_t position,
ScriptValue data,
ExceptionState&);
ScriptPromise truncate(ScriptState*, uint64_t size);
ScriptPromise close(ScriptState*);
void Trace(Visitor*) override;
private:
class StreamWriterClient;
ScriptPromise WriteBlob(ScriptState*, uint64_t position, Blob*);
ScriptPromise WriteStream(ScriptState*,
uint64_t position,
ScriptValue stream,
ExceptionState&);
void WriteComplete(base::File::Error result, uint64_t bytes_written);
void TruncateComplete(base::File::Error result);
mojom::blink::FileWriterPtr writer_;
Member<ScriptPromiseResolver> pending_operation_;
TraceWrapperMember<FetchDataLoader> stream_loader_;
};
} // namespace blink
......
......@@ -7,8 +7,9 @@
NoInterfaceObject,
RuntimeEnabled=WritableFiles
] interface FileSystemWriter {
// TODO(mek): Support other types, such as ReadableStream, by using 'any'.
[CallWith=ScriptState] Promise<void> write(unsigned long long position, Blob data);
// TODO(mek): 'any' really is 'Blob or ReadableStream', but that's not
// currently supported by our bindings.
[CallWith=ScriptState, RaisesException] Promise<void> write(unsigned long long position, any data);
[CallWith=ScriptState] Promise<void> truncate(unsigned long long size);
[CallWith=ScriptState] Promise<void> close();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment