Commit 9e0e8b2b authored by xunjieli's avatar xunjieli Committed by Commit bot

Add EOF signal in FilterSourceStream::FilterData()

Add a bool argument in FilterData() to signal that upstream's eof has been
reached.

R=rdsmith@chromium.org

BUG=474859

Review-Url: https://codereview.chromium.org/2331193005
Cr-Commit-Position: refs/heads/master@{#418713}
parent 9f16719b
......@@ -126,8 +126,9 @@ int FilterSourceStream::DoFilterData() {
DCHECK(output_buffer_);
DCHECK(drainable_input_buffer_);
int bytes_output = FilterData(output_buffer_.get(), output_buffer_size_,
drainable_input_buffer_.get());
int bytes_output =
FilterData(output_buffer_.get(), output_buffer_size_,
drainable_input_buffer_.get(), upstream_end_reached_);
if (bytes_output == ERR_CONTENT_DECODING_FAILED) {
UMA_HISTOGRAM_ENUMERATION("Net.ContentDecodingFailed.FilterType", type(),
TYPE_MAX);
......
......@@ -69,12 +69,13 @@ class NET_EXPORT_PRIVATE FilterSourceStream : public SourceStream {
// FilterData() will be repeatedly invoked with the same |input_buffer| until
// FilterData() returns 0 or an error. If FilterData() returns 0,
// |input_buffer| must be fully drained. Upstream EOF is reached when
// FilterData() is called with |input_buffer->BytesRemaining() == 0|.
// FilterData() is called with |upstream_eof_reached| = true.
// TODO(xunjieli): consider allowing asynchronous response via callback
// to support off-thread decompression.
virtual int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) = 0;
DrainableIOBuffer* input_buffer,
bool upstream_eof_reached) = 0;
// Returns a string representation of the type of this FilterSourceStream.
// This is for UMA logging.
......
......@@ -66,16 +66,17 @@ class NeedsAllInputFilterSourceStream : public TestFilterSourceStreamBase {
expected_input_bytes_(expected_input_bytes) {}
int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) override {
DrainableIOBuffer* input_buffer,
bool upstream_eof_reached) override {
buffer_.append(input_buffer->data(), input_buffer->BytesRemaining());
EXPECT_GE(expected_input_bytes_, input_buffer->BytesRemaining());
expected_input_bytes_ -= input_buffer->BytesRemaining();
input_buffer->DidConsume(input_buffer->BytesRemaining());
if (expected_input_bytes_ > 0) {
// Keep returning 0 bytes read until all |expected_input_bytes| have
// been read from |upstream|.
if (!upstream_eof_reached) {
// Keep returning 0 bytes read until all input has been consumed.
return 0;
}
EXPECT_EQ(0, expected_input_bytes_);
return WriteBufferToOutput(output_buffer, output_buffer_size);
}
......@@ -95,7 +96,8 @@ class MultiplySourceStream : public TestFilterSourceStreamBase {
multiplier_(multiplier) {}
int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) override {
DrainableIOBuffer* input_buffer,
bool /*upstream_eof_reached*/) override {
for (int i = 0; i < input_buffer->BytesRemaining(); i++) {
for (int j = 0; j < multiplier_; j++)
buffer_.append(input_buffer->data() + i, 1);
......@@ -117,7 +119,8 @@ class PassThroughFilterSourceStream : public TestFilterSourceStreamBase {
: TestFilterSourceStreamBase(std::move(upstream)) {}
int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) override {
DrainableIOBuffer* input_buffer,
bool /*upstream_eof_reached*/) override {
buffer_.append(input_buffer->data(), input_buffer->BytesRemaining());
input_buffer->DidConsume(input_buffer->BytesRemaining());
return WriteBufferToOutput(output_buffer, output_buffer_size);
......@@ -135,7 +138,8 @@ class ThrottleSourceStream : public TestFilterSourceStreamBase {
: TestFilterSourceStreamBase(std::move(upstream)) {}
int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) override {
DrainableIOBuffer* input_buffer,
bool /*upstream_eof_reached*/) override {
buffer_.append(input_buffer->data(), input_buffer->BytesRemaining());
input_buffer->DidConsume(input_buffer->BytesRemaining());
int bytes_to_read = std::min(1, static_cast<int>(buffer_.size()));
......@@ -158,7 +162,8 @@ class NoOutputSourceStream : public TestFilterSourceStreamBase {
consumed_all_input_(false) {}
int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) override {
DrainableIOBuffer* input_buffer,
bool /*upstream_eof_reached*/) override {
expected_input_size_ -= input_buffer->BytesRemaining();
input_buffer->DidConsume(input_buffer->BytesRemaining());
EXPECT_LE(0, expected_input_size_);
......@@ -184,7 +189,8 @@ class ErrorFilterSourceStream : public FilterSourceStream {
int FilterData(IOBuffer* output_buffer,
int output_buffer_size,
DrainableIOBuffer* input_buffer) override {
DrainableIOBuffer* input_buffer,
bool /*upstream_eof_reached*/) override {
return ERR_CONTENT_DECODING_FAILED;
}
std::string GetTypeAsString() const override { return ""; }
......@@ -238,16 +244,26 @@ TEST_P(FilterSourceStreamTest, FilterDataReturnNoBytesExceptLast) {
GetParam());
num_reads++;
}
source->AddReadResult(input.data(), 0, OK, GetParam()); // EOF
num_reads++;
MockSourceStream* mock_stream = source.get();
NeedsAllInputFilterSourceStream stream(std::move(source), input.length());
scoped_refptr<IOBufferWithSize> output_buffer =
new IOBufferWithSize(kDefaultBufferSize);
TestCompletionCallback callback;
int rv = stream.Read(output_buffer.get(), output_buffer->size(),
callback.callback());
rv = CompleteReadIfAsync(rv, &callback, mock_stream, num_reads);
ASSERT_EQ(static_cast<int>(input.length()), rv);
EXPECT_EQ(input, std::string(output_buffer->data(), rv));
std::string actual_output;
while (true) {
int rv = stream.Read(output_buffer.get(), output_buffer->size(),
callback.callback());
if (rv == ERR_IO_PENDING)
rv = CompleteReadIfAsync(rv, &callback, mock_stream, num_reads);
if (rv == OK)
break;
ASSERT_GT(rv, OK);
actual_output.append(output_buffer->data(), rv);
}
EXPECT_EQ(input, actual_output);
}
// Tests that FilterData() returns 0 byte read because the upstream gives an
......@@ -423,6 +439,8 @@ TEST_P(FilterSourceStreamTest, FilterChaining) {
std::unique_ptr<MockSourceStream> source(new MockSourceStream);
std::string input = "hello, world!";
source->AddReadResult(input.data(), input.length(), OK, GetParam());
source->AddReadResult(input.data(), 0, OK, GetParam()); // EOF
MockSourceStream* mock_stream = source.get();
std::unique_ptr<PassThroughFilterSourceStream> pass_through_source(
new PassThroughFilterSourceStream(std::move(source)));
......@@ -438,13 +456,18 @@ TEST_P(FilterSourceStreamTest, FilterChaining) {
new IOBufferWithSize(kDefaultBufferSize);
TestCompletionCallback callback;
int bytes_read = second_pass_through_source->Read(
output_buffer.get(), output_buffer->size(), callback.callback());
bytes_read =
CompleteReadIfAsync(bytes_read, &callback, mock_stream, /*num_reads=*/1);
ASSERT_EQ(input.length(), static_cast<size_t>(bytes_read));
EXPECT_EQ(input, std::string(output_buffer->data(), bytes_read));
std::string actual_output;
while (true) {
int rv = second_pass_through_source->Read(
output_buffer.get(), output_buffer->size(), callback.callback());
if (rv == ERR_IO_PENDING)
rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/2);
if (rv == OK)
break;
ASSERT_GT(rv, OK);
actual_output.append(output_buffer->data(), rv);
}
EXPECT_EQ(input, actual_output);
// Type string (from left to right) should be the order of data flow.
EXPECT_EQ("FIRST_PASS_THROUGH,NEEDS_ALL,SECOND_PASS_THROUGH",
second_pass_through_source->Description());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment