Commit c47efc0b authored by felipeg@chromium.org's avatar felipeg@chromium.org

Support overlapping operations on the SimpleEntryImpl.

Using a std::queue<Closure>, it serializes the operations and runs each operation in the correct order.

Review URL: https://chromiumcodereview.appspot.com/14130015

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194908 0039d316-1c4b-4281-b951-d872f2087c98
parent 7d392b9e
......@@ -2118,3 +2118,97 @@ TEST_F(DiskCacheEntryTest, KeySanityCheck) {
ASSERT_NE(net::OK, OpenEntry(key, &entry));
DisableIntegrityCheck();
}
// The simple cache backend isn't intended to work on Windows, which has very
// different file system guarantees from Linux.
#if !defined(OS_WIN)
TEST_F(DiskCacheEntryTest, SimpleCacheInternalAsyncIO) {
SetSimpleCacheMode();
InitCache();
InternalAsyncIO();
}
TEST_F(DiskCacheEntryTest, SimpleCacheExternalAsyncIO) {
SetSimpleCacheMode();
InitCache();
ExternalAsyncIO();
}
// TODO(felipeg): flaky, failing to WritePlatformFile in
// simple_synchronous_entry.cc. It failed in linux_asan bot.
TEST_F(DiskCacheEntryTest, DISABLED_SimpleCacheReleaseBuffer) {
SetSimpleCacheMode();
InitCache();
ReleaseBuffer();
}
TEST_F(DiskCacheEntryTest, SimpleCacheStreamAccess) {
SetSimpleCacheMode();
InitCache();
StreamAccess();
}
TEST_F(DiskCacheEntryTest, SimpleCacheGetKey) {
SetSimpleCacheMode();
InitCache();
GetKey();
}
TEST_F(DiskCacheEntryTest, DISABLED_SimpleCacheGetTimes) {
SetSimpleCacheMode();
InitCache();
GetTimes();
}
TEST_F(DiskCacheEntryTest, DISABLED_SimpleCacheGrowData) {
SetSimpleCacheMode();
InitCache();
GrowData();
}
TEST_F(DiskCacheEntryTest, SimpleCacheTruncateData) {
SetSimpleCacheMode();
InitCache();
TruncateData();
}
TEST_F(DiskCacheEntryTest, DISABLED_SimpleCacheZeroLengthIO) {
SetSimpleCacheMode();
InitCache();
ZeroLengthIO();
}
TEST_F(DiskCacheEntryTest, SimpleCacheReuseExternalEntry) {
SetSimpleCacheMode();
SetMaxSize(200 * 1024);
InitCache();
ReuseEntry(20 * 1024);
}
TEST_F(DiskCacheEntryTest, SimpleCacheReuseInternalEntry) {
SetSimpleCacheMode();
SetMaxSize(100 * 1024);
InitCache();
ReuseEntry(10 * 1024);
}
TEST_F(DiskCacheEntryTest, SimpleCacheInvalidData) {
SetSimpleCacheMode();
InitCache();
InvalidData();
}
TEST_F(DiskCacheEntryTest, SimpleCacheDoomEntry) {
SetSimpleCacheMode();
InitCache();
DoomNormalEntry();
}
TEST_F(DiskCacheEntryTest, SimpleCacheDoomedEntry) {
SetSimpleCacheMode();
InitCache();
DoomedEntry();
}
#endif // !defined(OS_WIN)
......@@ -105,7 +105,11 @@ void SimpleEntryImpl::Doom() {
void SimpleEntryImpl::Close() {
DCHECK(io_thread_checker_.CalledOnValidThread());
Release(); // Balanced in CreationOperationCompleted().
// Postpone close operation.
// Push the close operation to the end of the line. This way we run all
// operations before we are able close.
pending_operations_.push(base::Bind(&SimpleEntryImpl::CloseInternal, this));
RunNextOperationIfNeeded();
}
std::string SimpleEntryImpl::GetKey() const {
......@@ -134,26 +138,21 @@ int SimpleEntryImpl::ReadData(int index,
int buf_len,
const CompletionCallback& callback) {
DCHECK(io_thread_checker_.CalledOnValidThread());
// TODO(gavinp): Add support for overlapping reads. The net::HttpCache does
// make overlapping read requests when multiple transactions access the same
// entry as read only. This might make calling SimpleSynchronousEntry::Close()
// correctly more tricky (see SimpleEntryImpl::EntryOperationComplete).
if (synchronous_entry_in_use_by_worker_) {
NOTIMPLEMENTED();
CHECK(false);
}
synchronous_entry_in_use_by_worker_ = true;
if (index_)
index_->UseIfExists(key_);
SynchronousOperationCallback sync_operation_callback =
base::Bind(&SimpleEntryImpl::EntryOperationComplete,
this, callback);
WorkerPool::PostTask(FROM_HERE,
base::Bind(&SimpleSynchronousEntry::ReadData,
base::Unretained(synchronous_entry_),
index, offset, make_scoped_refptr(buf),
buf_len, sync_operation_callback),
true);
if (index < 0 || index >= kSimpleEntryFileCount || buf_len < 0)
return net::ERR_INVALID_ARGUMENT;
if (offset >= data_size_[index] || offset < 0 || !buf_len)
return 0;
// TODO(felipeg): Optimization: Add support for truly parallel read
// operations.
pending_operations_.push(
base::Bind(&SimpleEntryImpl::ReadDataInternal,
this,
index,
offset,
make_scoped_refptr(buf),
buf_len,
callback));
RunNextOperationIfNeeded();
return net::ERR_IO_PENDING;
}
......@@ -164,22 +163,20 @@ int SimpleEntryImpl::WriteData(int index,
const CompletionCallback& callback,
bool truncate) {
DCHECK(io_thread_checker_.CalledOnValidThread());
if (synchronous_entry_in_use_by_worker_) {
NOTIMPLEMENTED();
CHECK(false);
}
synchronous_entry_in_use_by_worker_ = true;
if (index_)
index_->UseIfExists(key_);
SynchronousOperationCallback sync_operation_callback =
base::Bind(&SimpleEntryImpl::EntryOperationComplete,
this, callback);
WorkerPool::PostTask(FROM_HERE,
base::Bind(&SimpleSynchronousEntry::WriteData,
base::Unretained(synchronous_entry_),
index, offset, make_scoped_refptr(buf),
buf_len, sync_operation_callback, truncate),
true);
if (index < 0 || index >= kSimpleEntryFileCount || offset < 0 || buf_len < 0)
return net::ERR_INVALID_ARGUMENT;
pending_operations_.push(
base::Bind(&SimpleEntryImpl::WriteDataInternal,
this,
index,
offset,
make_scoped_refptr(buf),
buf_len,
callback,
truncate));
RunNextOperationIfNeeded();
// TODO(felipeg): Optimization: Add support for optimistic writes, quickly
// returning net::OK here.
return net::ERR_IO_PENDING;
}
......@@ -235,27 +232,86 @@ int SimpleEntryImpl::ReadyForSparseIO(const CompletionCallback& callback) {
SimpleEntryImpl::SimpleEntryImpl(SimpleIndex* index,
const FilePath& path,
const std::string& key)
: constructor_thread_(MessageLoopProxy::current()),
index_(index->AsWeakPtr()),
path_(path),
key_(key),
synchronous_entry_(NULL),
synchronous_entry_in_use_by_worker_(false) {
: index_(index->AsWeakPtr()),
path_(path),
key_(key),
synchronous_entry_(NULL),
operation_running_(false) {
}
SimpleEntryImpl::~SimpleEntryImpl() {
if (synchronous_entry_) {
base::Closure close_sync_entry =
base::Bind(&SimpleSynchronousEntry::Close,
base::Unretained(synchronous_entry_));
// We aren't guaranteed to be able to run IO on our constructor thread, but
// we are also not guaranteed to be allowed to run WorkerPool::PostTask on
// our other threads.
if (constructor_thread_->BelongsToCurrentThread())
WorkerPool::PostTask(FROM_HERE, close_sync_entry, true);
else
close_sync_entry.Run();
}
DCHECK_EQ(0U, pending_operations_.size());
DCHECK(!operation_running_);
}
bool SimpleEntryImpl::RunNextOperationIfNeeded() {
DCHECK(io_thread_checker_.CalledOnValidThread());
if (pending_operations_.size() <= 0 || operation_running_)
return false;
base::Closure operation = pending_operations_.front();
pending_operations_.pop();
operation.Run();
return true;
}
void SimpleEntryImpl::CloseInternal() {
DCHECK(io_thread_checker_.CalledOnValidThread());
DCHECK(pending_operations_.size() == 0);
DCHECK(!operation_running_);
DCHECK(synchronous_entry_);
WorkerPool::PostTask(FROM_HERE,
base::Bind(&SimpleSynchronousEntry::Close,
base::Unretained(synchronous_entry_)),
true);
// Entry::Close() is expected to delete this entry. See disk_cache.h for
// details.
Release(); // Balanced in CreationOperationCompleted().
}
void SimpleEntryImpl::ReadDataInternal(int index,
int offset,
scoped_refptr<net::IOBuffer> buf,
int buf_len,
const CompletionCallback& callback) {
DCHECK(io_thread_checker_.CalledOnValidThread());
DCHECK(!operation_running_);
operation_running_ = true;
if (index_)
index_->UseIfExists(key_);
SynchronousOperationCallback sync_operation_callback =
base::Bind(&SimpleEntryImpl::EntryOperationComplete,
this, callback);
WorkerPool::PostTask(FROM_HERE,
base::Bind(&SimpleSynchronousEntry::ReadData,
base::Unretained(synchronous_entry_),
index, offset, buf,
buf_len, sync_operation_callback),
true);
}
void SimpleEntryImpl::WriteDataInternal(int index,
int offset,
scoped_refptr<net::IOBuffer> buf,
int buf_len,
const CompletionCallback& callback,
bool truncate) {
DCHECK(io_thread_checker_.CalledOnValidThread());
DCHECK(!operation_running_);
operation_running_ = true;
if (index_)
index_->UseIfExists(key_);
// TODO(felipeg): When adding support for optimistic writes we need to set
// data_size_[index] = buf_len here.
SynchronousOperationCallback sync_operation_callback =
base::Bind(&SimpleEntryImpl::EntryOperationComplete,
this, callback);
WorkerPool::PostTask(FROM_HERE,
base::Bind(&SimpleSynchronousEntry::WriteData,
base::Unretained(synchronous_entry_),
index, offset, buf,
buf_len, sync_operation_callback, truncate),
true);
}
void SimpleEntryImpl::CreationOperationComplete(
......@@ -275,7 +331,7 @@ void SimpleEntryImpl::CreationOperationComplete(
// The Backend interface requires us to return |this|, and keep the Entry
// alive until Entry::Close(). Adding a reference to self will keep |this|
// alive after the scope of the Callback calling us is destroyed.
AddRef(); // Balanced in Close().
AddRef(); // Balanced in CloseInternal().
synchronous_entry_ = sync_entry;
SetSynchronousData();
if (index_)
......@@ -289,8 +345,9 @@ void SimpleEntryImpl::EntryOperationComplete(
int result) {
DCHECK(io_thread_checker_.CalledOnValidThread());
DCHECK(synchronous_entry_);
DCHECK(synchronous_entry_in_use_by_worker_);
synchronous_entry_in_use_by_worker_ = false;
DCHECK(operation_running_);
operation_running_ = false;
SetSynchronousData();
if (index_) {
if (result >= 0) {
......@@ -301,11 +358,12 @@ void SimpleEntryImpl::EntryOperationComplete(
}
}
completion_callback.Run(result);
RunNextOperationIfNeeded();
}
void SimpleEntryImpl::SetSynchronousData() {
DCHECK(io_thread_checker_.CalledOnValidThread());
DCHECK(!synchronous_entry_in_use_by_worker_);
DCHECK(!operation_running_);
// TODO(felipeg): These copies to avoid data races are not optimal. While
// adding an IO thread index (for fast misses etc...), we can store this data
// in that structure. This also solves problems with last_used() on ext4
......
......@@ -5,6 +5,7 @@
#ifndef NET_DISK_CACHE_SIMPLE_SIMPLE_ENTRY_IMPL_H_
#define NET_DISK_CACHE_SIMPLE_SIMPLE_ENTRY_IMPL_H_
#include <queue>
#include <string>
#include "base/files/file_path.h"
......@@ -92,6 +93,25 @@ class SimpleEntryImpl : public Entry,
virtual ~SimpleEntryImpl();
// Runs the next operation in the queue, if any and if there is no other
// operation running at the moment. Returns true if a operation has run.
bool RunNextOperationIfNeeded();
void CloseInternal();
void ReadDataInternal(int index,
int offset,
scoped_refptr<net::IOBuffer> buf,
int buf_len,
const CompletionCallback& callback);
void WriteDataInternal(int index,
int offset,
scoped_refptr<net::IOBuffer> buf,
int buf_len,
const CompletionCallback& callback,
bool truncate);
// Called after a SimpleSynchronousEntry has completed CreateEntry() or
// OpenEntry(). If |sync_entry| is non-NULL, creation is successful and we
// can return |this| SimpleEntryImpl to |*out_entry|. Runs
......@@ -117,7 +137,6 @@ class SimpleEntryImpl : public Entry,
// thread, in all cases. |io_thread_checker_| documents and enforces this.
base::ThreadChecker io_thread_checker_;
const scoped_refptr<base::SingleThreadTaskRunner> constructor_thread_;
const base::WeakPtr<SimpleIndex> index_;
const base::FilePath path_;
const std::string key_;
......@@ -129,16 +148,15 @@ class SimpleEntryImpl : public Entry,
int32 data_size_[kSimpleEntryFileCount];
// The |synchronous_entry_| is the worker thread object that performs IO on
// entries. It's owned by this SimpleEntryImpl whenever
// |synchronous_entry_in_use_by_worker_| is false (i.e. when an operation
// is not pending on the worker pool). When an operation is pending on the
// worker pool, the |synchronous_entry_| is owned by itself.
// entries. It's owned by this SimpleEntryImpl whenever |operation_running_|
// is false (i.e. when an operation is not pending on the worker pool).
SimpleSynchronousEntry* synchronous_entry_;
// Set to true when a worker operation is posted on the |synchronous_entry_|,
// and false after. Used to ensure thread safety by not allowing multiple
// threads to access the |synchronous_entry_| simultaneously.
bool synchronous_entry_in_use_by_worker_;
bool operation_running_;
std::queue<base::Closure> pending_operations_;
};
} // namespace disk_cache
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment