Mojo: DataPipe: Implement "may discard" for two-phase writes.

Arguably more importantly, make data pipe producers not writable during
two-phase writes and consumers not readable during two-phase reads. This
allows multiple threads to simultaneously use the two-phase APIs in a
sane way; they can respond to a "busy" error by waiting. (Why not
"should wait" instead of "busy"? Because waiting is only an appropriate
response if you know that someone else is going to end the two-phase
read/write -- e.g., if a two-phase read/write is happening on another
thread.)

Also refactor the waiter-awakening stuff so that it's taken care of by
the DataPipe superclass, rather than the LocalDataPipe implementation
subclass.

Note that a two-phase write will only discard if "all-or-none" is
requested. Otherwise, as usual, it'll provide as much space as
available.

R=darin@chromium.org

Review URL: https://codereview.chromium.org/129163003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@243920 0039d316-1c4b-4281-b951-d872f2087c98
parent fbd9f4bc
......@@ -79,6 +79,7 @@ void DataPipe::ProducerClose() {
<< "Producer closed with active two-phase write";
producer_two_phase_max_num_bytes_written_ = 0;
ProducerCloseImplNoLock();
AwakeConsumerWaitersForStateChangeNoLock();
}
MojoResult DataPipe::ProducerWriteData(const void* elements,
......@@ -99,7 +100,11 @@ MojoResult DataPipe::ProducerWriteData(const void* elements,
if (*num_bytes == 0)
return MOJO_RESULT_OK; // Nothing to do.
return ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock();
MojoResult rv = ProducerWriteDataImplNoLock(elements, num_bytes, all_or_none);
if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags)
AwakeConsumerWaitersForStateChangeNoLock();
return rv;
}
MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
......@@ -117,7 +122,10 @@ MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
all_or_none);
if (rv != MOJO_RESULT_OK)
return rv;
// Note: No need to awake producer waiters, even though we're going from
// writable to non-writable (since you can't wait on non-writability).
// Similarly, though this may have discarded data (in "may discard" mode),
// making it non-readable, there's still no need to awake consumer waiters.
DCHECK(producer_in_two_phase_write_no_lock());
return MOJO_RESULT_OK;
}
......@@ -131,9 +139,16 @@ MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
// Note: Allow successful completion of the two-phase write even if the
// consumer has been closed.
MojoWaitFlags old_consumer_satisfied_flags = ConsumerSatisfiedFlagsNoLock();
MojoResult rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
// Two-phase write ended even on failure.
DCHECK(!producer_in_two_phase_write_no_lock());
// If we're now writable, we *became* writable (since we weren't writable
// during the two-phase write), so awake producer waiters.
if ((ProducerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_WRITABLE))
AwakeProducerWaitersForStateChangeNoLock();
if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags)
AwakeConsumerWaitersForStateChangeNoLock();
return rv;
}
......@@ -175,6 +190,7 @@ void DataPipe::ConsumerClose() {
<< "Consumer closed with active two-phase read";
consumer_two_phase_max_num_bytes_read_ = 0;
ConsumerCloseImplNoLock();
AwakeProducerWaitersForStateChangeNoLock();
}
MojoResult DataPipe::ConsumerReadData(void* elements,
......@@ -192,7 +208,11 @@ MojoResult DataPipe::ConsumerReadData(void* elements,
if (*num_bytes == 0)
return MOJO_RESULT_OK; // Nothing to do.
return ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
MojoResult rv = ConsumerReadDataImplNoLock(elements, num_bytes, all_or_none);
if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
AwakeProducerWaitersForStateChangeNoLock();
return rv;
}
MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
......@@ -209,7 +229,11 @@ MojoResult DataPipe::ConsumerDiscardData(uint32_t* num_bytes,
if (*num_bytes == 0)
return MOJO_RESULT_OK; // Nothing to do.
return ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
MojoResult rv = ConsumerDiscardDataImplNoLock(num_bytes, all_or_none);
if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
AwakeProducerWaitersForStateChangeNoLock();
return rv;
}
MojoResult DataPipe::ConsumerQueryData(uint32_t* num_bytes) {
......@@ -236,7 +260,6 @@ MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
all_or_none);
if (rv != MOJO_RESULT_OK)
return rv;
DCHECK(consumer_in_two_phase_read_no_lock());
return MOJO_RESULT_OK;
}
......@@ -248,9 +271,16 @@ MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
if (!consumer_in_two_phase_read_no_lock())
return MOJO_RESULT_FAILED_PRECONDITION;
MojoWaitFlags old_producer_satisfied_flags = ProducerSatisfiedFlagsNoLock();
MojoResult rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
// Two-phase read ended even on failure.
DCHECK(!consumer_in_two_phase_read_no_lock());
// If we're now readable, we *became* readable (since we weren't readable
// during the two-phase read), so awake consumer waiters.
if ((ConsumerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_READABLE))
AwakeConsumerWaitersForStateChangeNoLock();
if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags)
AwakeProducerWaitersForStateChangeNoLock();
return rv;
}
......
......@@ -83,9 +83,6 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
friend class base::RefCountedThreadSafe<DataPipe>;
virtual ~DataPipe();
void AwakeProducerWaitersForStateChangeNoLock();
void AwakeConsumerWaitersForStateChangeNoLock();
virtual void ProducerCloseImplNoLock() = 0;
// |*num_bytes| will be a nonzero multiple of |element_num_bytes_|.
virtual MojoResult ProducerWriteDataImplNoLock(const void* elements,
......@@ -97,6 +94,7 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
bool all_or_none) = 0;
virtual MojoResult ProducerEndWriteDataImplNoLock(
uint32_t num_bytes_written) = 0;
// Note: A producer should not be writable during a two-phase write.
virtual MojoWaitFlags ProducerSatisfiedFlagsNoLock() = 0;
virtual MojoWaitFlags ProducerSatisfiableFlagsNoLock() = 0;
......@@ -113,6 +111,7 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
uint32_t* buffer_num_bytes,
bool all_or_none) = 0;
virtual MojoResult ConsumerEndReadDataImplNoLock(uint32_t num_bytes_read) = 0;
// Note: A consumer should not be writable during a two-phase read.
virtual MojoWaitFlags ConsumerSatisfiedFlagsNoLock() = 0;
virtual MojoWaitFlags ConsumerSatisfiableFlagsNoLock() = 0;
......@@ -158,6 +157,9 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
}
private:
void AwakeProducerWaitersForStateChangeNoLock();
void AwakeConsumerWaitersForStateChangeNoLock();
bool has_local_producer_no_lock() const {
lock_.AssertAcquired();
return !!producer_waiter_list_.get();
......
......@@ -42,7 +42,6 @@ void LocalDataPipe::ProducerCloseImplNoLock() {
DCHECK(!consumer_in_two_phase_read_no_lock());
DestroyBufferNoLock();
}
AwakeConsumerWaitersForStateChangeNoLock();
}
MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
......@@ -61,11 +60,10 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
capacity_num_bytes());
if (num_bytes_to_write > capacity_num_bytes() - current_num_bytes_) {
// Discard as much as needed (discard oldest first).
size_t num_bytes_to_discard =
num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_);
start_index_ += num_bytes_to_discard;
start_index_ %= capacity_num_bytes();
current_num_bytes_ -= num_bytes_to_discard;
MarkDataAsConsumedNoLock(
num_bytes_to_write - (capacity_num_bytes() - current_num_bytes_));
// No need to wake up write waiters, since we're definitely going to leave
// the buffer full.
}
} else {
if (all_or_none && *num_bytes > capacity_num_bytes() - current_num_bytes_) {
......@@ -96,14 +94,8 @@ MojoResult LocalDataPipe::ProducerWriteDataImplNoLock(const void* elements,
num_bytes_to_write - num_bytes_to_write_first);
}
bool was_empty = (current_num_bytes_ == 0);
current_num_bytes_ += num_bytes_to_write;
DCHECK_LE(current_num_bytes_, capacity_num_bytes());
if (was_empty && num_bytes_to_write > 0)
AwakeConsumerWaitersForStateChangeNoLock();
*num_bytes = static_cast<uint32_t>(num_bytes_to_write);
return MOJO_RESULT_OK;
}
......@@ -114,19 +106,33 @@ MojoResult LocalDataPipe::ProducerBeginWriteDataImplNoLock(
bool all_or_none) {
DCHECK(consumer_open_no_lock());
// The index we need to start writing at.
size_t write_index =
(start_index_ + current_num_bytes_) % capacity_num_bytes();
size_t max_num_bytes_to_write = GetMaxNumBytesToWriteNoLock();
if (all_or_none && *buffer_num_bytes > max_num_bytes_to_write) {
// Don't return "should wait" since you can't wait for a specified amount of
// data.
return MOJO_RESULT_OUT_OF_RANGE;
// In "may discard" mode, we can always write from the write index to the
// end of the buffer.
if (may_discard() &&
*buffer_num_bytes <= capacity_num_bytes() - write_index) {
// To do so, we need to discard an appropriate amount of data.
// We should only reach here if the start index is after the write index!
DCHECK_GE(start_index_, write_index);
DCHECK_GT(*buffer_num_bytes - max_num_bytes_to_write, 0u);
MarkDataAsConsumedNoLock(*buffer_num_bytes - max_num_bytes_to_write);
max_num_bytes_to_write = *buffer_num_bytes;
} else {
// Don't return "should wait" since you can't wait for a specified amount
// of data.
return MOJO_RESULT_OUT_OF_RANGE;
}
}
// Don't go into a two-phase write if there's no room.
if (max_num_bytes_to_write == 0)
return MOJO_RESULT_SHOULD_WAIT;
size_t write_index =
(start_index_ + current_num_bytes_) % capacity_num_bytes();
EnsureBufferNoLock();
*buffer = buffer_.get() + write_index;
*buffer_num_bytes = static_cast<uint32_t>(max_num_bytes_to_write);
......@@ -143,21 +149,17 @@ MojoResult LocalDataPipe::ProducerEndWriteDataImplNoLock(
return MOJO_RESULT_INVALID_ARGUMENT;
}
bool was_empty = (current_num_bytes_ == 0);
current_num_bytes_ += num_bytes_written;
DCHECK_LE(current_num_bytes_, capacity_num_bytes());
set_producer_two_phase_max_num_bytes_written_no_lock(0);
if (was_empty && num_bytes_written > 0)
AwakeConsumerWaitersForStateChangeNoLock();
return MOJO_RESULT_OK;
}
MojoWaitFlags LocalDataPipe::ProducerSatisfiedFlagsNoLock() {
MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
if (consumer_open_no_lock() && current_num_bytes_ < capacity_num_bytes())
if (consumer_open_no_lock() &&
(may_discard() || current_num_bytes_ < capacity_num_bytes()) &&
!producer_in_two_phase_write_no_lock())
rv |= MOJO_WAIT_FLAG_WRITABLE;
return rv;
}
......@@ -176,7 +178,6 @@ void LocalDataPipe::ConsumerCloseImplNoLock() {
if (!producer_open_no_lock() || !producer_in_two_phase_write_no_lock())
DestroyBufferNoLock();
current_num_bytes_ = 0;
AwakeProducerWaitersForStateChangeNoLock();
}
MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements,
......@@ -211,15 +212,7 @@ MojoResult LocalDataPipe::ConsumerReadDataImplNoLock(void* elements,
num_bytes_to_read - num_bytes_to_read_first);
}
bool was_full = (current_num_bytes_ == capacity_num_bytes());
start_index_ += num_bytes_to_read;
start_index_ %= capacity_num_bytes();
current_num_bytes_ -= num_bytes_to_read;
if (was_full && num_bytes_to_read > 0)
AwakeProducerWaitersForStateChangeNoLock();
MarkDataAsConsumedNoLock(num_bytes_to_read);
*num_bytes = static_cast<uint32_t>(num_bytes_to_read);
return MOJO_RESULT_OK;
}
......@@ -242,16 +235,9 @@ MojoResult LocalDataPipe::ConsumerDiscardDataImplNoLock(uint32_t* num_bytes,
MOJO_RESULT_FAILED_PRECONDITION;
}
bool was_full = (current_num_bytes_ == capacity_num_bytes());
size_t num_bytes_to_discard =
std::min(static_cast<size_t>(*num_bytes), current_num_bytes_);
start_index_ = (start_index_ + num_bytes_to_discard) % capacity_num_bytes();
current_num_bytes_ -= num_bytes_to_discard;
if (was_full && num_bytes_to_discard > 0)
AwakeProducerWaitersForStateChangeNoLock();
MarkDataAsConsumedNoLock(num_bytes_to_discard);
*num_bytes = static_cast<uint32_t>(num_bytes_to_discard);
return MOJO_RESULT_OK;
}
......@@ -295,24 +281,15 @@ MojoResult LocalDataPipe::ConsumerEndReadDataImplNoLock(
return MOJO_RESULT_INVALID_ARGUMENT;
}
bool was_full = (current_num_bytes_ == capacity_num_bytes());
start_index_ += num_bytes_read;
DCHECK_LE(start_index_, capacity_num_bytes());
start_index_ %= capacity_num_bytes();
DCHECK_LE(num_bytes_read, current_num_bytes_);
current_num_bytes_ -= num_bytes_read;
DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes());
MarkDataAsConsumedNoLock(num_bytes_read);
set_consumer_two_phase_max_num_bytes_read_no_lock(0);
if (was_full && num_bytes_read > 0)
AwakeProducerWaitersForStateChangeNoLock();
return MOJO_RESULT_OK;
}
MojoWaitFlags LocalDataPipe::ConsumerSatisfiedFlagsNoLock() {
MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
if (current_num_bytes_ > 0)
if (current_num_bytes_ > 0 && !consumer_in_two_phase_read_no_lock())
rv |= MOJO_WAIT_FLAG_READABLE;
return rv;
}
......@@ -360,5 +337,12 @@ size_t LocalDataPipe::GetMaxNumBytesToReadNoLock() {
return current_num_bytes_;
}
void LocalDataPipe::MarkDataAsConsumedNoLock(size_t num_bytes) {
DCHECK_LE(num_bytes, current_num_bytes_);
start_index_ += num_bytes;
start_index_ %= capacity_num_bytes();
current_num_bytes_ -= num_bytes;
}
} // namespace system
} // namespace mojo
......@@ -65,6 +65,10 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalDataPipe : public DataPipe {
size_t GetMaxNumBytesToWriteNoLock();
size_t GetMaxNumBytesToReadNoLock();
// Marks the given number of bytes as consumed/discarded. |num_bytes| must be
// greater than |current_num_bytes_|.
void MarkDataAsConsumedNoLock(size_t num_bytes);
// The members below are protected by |DataPipe|'s |lock_|:
scoped_ptr_malloc<char, base::ScopedPtrAlignedFree> buffer_;
// Circular buffer.
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment