Commit 09721f55 authored by yhirano's avatar yhirano Committed by Commit bot

Handle MOJO_RESULT_BUSY result in URLResponseBodyConsumer

URLResponseBodyConsumer reads data from a mojo data pipe and dispatches it as a
URLResponseBodyConsumer::ReceivedData. When the dispatched data is destructed,
mojo::EndReadDataRaw is called. Until then, mojo::BeginRead will return
MOJO_RESULT_BUSY. This CL implements the correct handling for such a case.

This mis-handling was the cause of the failure of
virtual/mojo-loading/http/tests/inspector/network/network-fetch.html. This CL
also adds text() call in makeFetch function so that the backpressure mechanism
won't stop the loading.

BUG=669357

Review-Url: https://codereview.chromium.org/2573743002
Cr-Commit-Position: refs/heads/master@{#438756}
parent 519baeec
...@@ -95,7 +95,7 @@ class URLLoaderClientImpl final : public mojom::URLLoaderClient { ...@@ -95,7 +95,7 @@ class URLLoaderClientImpl final : public mojom::URLLoaderClient {
mojom::DownloadedTempFilePtr downloaded_file) override { mojom::DownloadedTempFilePtr downloaded_file) override {
has_received_response_ = true; has_received_response_ = true;
if (body_consumer_) if (body_consumer_)
body_consumer_->Start(task_runner_.get()); body_consumer_->Start();
downloaded_file_ = std::move(downloaded_file); downloaded_file_ = std::move(downloaded_file);
resource_dispatcher_->OnMessageReceived( resource_dispatcher_->OnMessageReceived(
ResourceMsg_ReceivedResponse(request_id_, response_head)); ResourceMsg_ReceivedResponse(request_id_, response_head));
...@@ -120,7 +120,7 @@ class URLLoaderClientImpl final : public mojom::URLLoaderClient { ...@@ -120,7 +120,7 @@ class URLLoaderClientImpl final : public mojom::URLLoaderClient {
body_consumer_ = new URLResponseBodyConsumer( body_consumer_ = new URLResponseBodyConsumer(
request_id_, resource_dispatcher_, std::move(body), task_runner_); request_id_, resource_dispatcher_, std::move(body), task_runner_);
if (has_received_response_) if (has_received_response_)
body_consumer_->Start(task_runner_.get()); body_consumer_->Start();
} }
void OnComplete(const ResourceRequestCompletionStatus& status) override { void OnComplete(const ResourceRequestCompletionStatus& status) override {
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "content/child/url_response_body_consumer.h" #include "content/child/url_response_body_consumer.h"
#include "base/auto_reset.h"
#include "base/bind.h" #include "base/bind.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
...@@ -45,17 +46,18 @@ URLResponseBodyConsumer::URLResponseBodyConsumer( ...@@ -45,17 +46,18 @@ URLResponseBodyConsumer::URLResponseBodyConsumer(
resource_dispatcher_(resource_dispatcher), resource_dispatcher_(resource_dispatcher),
handle_(std::move(handle)), handle_(std::move(handle)),
handle_watcher_(task_runner), handle_watcher_(task_runner),
task_runner_(task_runner),
has_seen_end_of_data_(!handle_.is_valid()) {} has_seen_end_of_data_(!handle_.is_valid()) {}
URLResponseBodyConsumer::~URLResponseBodyConsumer() {} URLResponseBodyConsumer::~URLResponseBodyConsumer() {}
void URLResponseBodyConsumer::Start(base::SingleThreadTaskRunner* task_runner) { void URLResponseBodyConsumer::Start() {
if (has_been_cancelled_) if (has_been_cancelled_)
return; return;
handle_watcher_.Start( handle_watcher_.Start(
handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, handle_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&URLResponseBodyConsumer::OnReadable, base::Unretained(this))); base::Bind(&URLResponseBodyConsumer::OnReadable, base::Unretained(this)));
task_runner->PostTask( task_runner_->PostTask(
FROM_HERE, base::Bind(&URLResponseBodyConsumer::OnReadable, AsWeakPtr(), FROM_HERE, base::Bind(&URLResponseBodyConsumer::OnReadable, AsWeakPtr(),
MOJO_RESULT_OK)); MOJO_RESULT_OK));
} }
...@@ -77,14 +79,24 @@ void URLResponseBodyConsumer::Cancel() { ...@@ -77,14 +79,24 @@ void URLResponseBodyConsumer::Cancel() {
void URLResponseBodyConsumer::Reclaim(uint32_t size) { void URLResponseBodyConsumer::Reclaim(uint32_t size) {
MojoResult result = mojo::EndReadDataRaw(handle_.get(), size); MojoResult result = mojo::EndReadDataRaw(handle_.get(), size);
DCHECK_EQ(MOJO_RESULT_OK, result); DCHECK_EQ(MOJO_RESULT_OK, result);
if (is_in_on_readable_)
return;
task_runner_->PostTask(
FROM_HERE, base::Bind(&URLResponseBodyConsumer::OnReadable, AsWeakPtr(),
MOJO_RESULT_OK));
} }
void URLResponseBodyConsumer::OnReadable(MojoResult unused) { void URLResponseBodyConsumer::OnReadable(MojoResult unused) {
DCHECK(!is_in_on_readable_);
if (has_been_cancelled_ || has_seen_end_of_data_) if (has_been_cancelled_ || has_seen_end_of_data_)
return; return;
// Protect |this| as RequestPeer::OnReceivedData may call deref. // Protect |this| as RequestPeer::OnReceivedData may call deref.
scoped_refptr<URLResponseBodyConsumer> protect(this); scoped_refptr<URLResponseBodyConsumer> protect(this);
base::AutoReset<bool> is_in_on_readable(&is_in_on_readable_, true);
// TODO(yhirano): Suppress notification when deferred. // TODO(yhirano): Suppress notification when deferred.
while (!has_been_cancelled_) { while (!has_been_cancelled_) {
...@@ -92,7 +104,7 @@ void URLResponseBodyConsumer::OnReadable(MojoResult unused) { ...@@ -92,7 +104,7 @@ void URLResponseBodyConsumer::OnReadable(MojoResult unused) {
uint32_t available = 0; uint32_t available = 0;
MojoResult result = mojo::BeginReadDataRaw( MojoResult result = mojo::BeginReadDataRaw(
handle_.get(), &buffer, &available, MOJO_READ_DATA_FLAG_NONE); handle_.get(), &buffer, &available, MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) if (result == MOJO_RESULT_SHOULD_WAIT || result == MOJO_RESULT_BUSY)
return; return;
if (result == MOJO_RESULT_FAILED_PRECONDITION) { if (result == MOJO_RESULT_FAILED_PRECONDITION) {
has_seen_end_of_data_ = true; has_seen_end_of_data_ = true;
......
...@@ -38,7 +38,7 @@ class CONTENT_EXPORT URLResponseBodyConsumer final ...@@ -38,7 +38,7 @@ class CONTENT_EXPORT URLResponseBodyConsumer final
scoped_refptr<base::SingleThreadTaskRunner> task_runner); scoped_refptr<base::SingleThreadTaskRunner> task_runner);
// Starts watching the handle. // Starts watching the handle.
void Start(base::SingleThreadTaskRunner* task_runner); void Start();
// Sets the completion status. The completion status is dispatched to the // Sets the completion status. The completion status is dispatched to the
// ResourceDispatcher when the both following conditions hold: // ResourceDispatcher when the both following conditions hold:
...@@ -66,10 +66,12 @@ class CONTENT_EXPORT URLResponseBodyConsumer final ...@@ -66,10 +66,12 @@ class CONTENT_EXPORT URLResponseBodyConsumer final
mojo::ScopedDataPipeConsumerHandle handle_; mojo::ScopedDataPipeConsumerHandle handle_;
mojo::Watcher handle_watcher_; mojo::Watcher handle_watcher_;
ResourceRequestCompletionStatus completion_status_; ResourceRequestCompletionStatus completion_status_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
bool has_received_completion_ = false; bool has_received_completion_ = false;
bool has_been_cancelled_ = false; bool has_been_cancelled_ = false;
bool has_seen_end_of_data_; bool has_seen_end_of_data_;
bool is_in_on_readable_ = false;
DISALLOW_COPY_AND_ASSIGN(URLResponseBodyConsumer); DISALLOW_COPY_AND_ASSIGN(URLResponseBodyConsumer);
}; };
......
...@@ -29,7 +29,9 @@ namespace { ...@@ -29,7 +29,9 @@ namespace {
class TestRequestPeer : public RequestPeer { class TestRequestPeer : public RequestPeer {
public: public:
struct Context; struct Context;
explicit TestRequestPeer(Context* context) : context_(context) {} TestRequestPeer(Context* context,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: context_(context), task_runner_(std::move(task_runner)) {}
void OnUploadProgress(uint64_t position, uint64_t size) override { void OnUploadProgress(uint64_t position, uint64_t size) override {
ADD_FAILURE() << "OnUploadProgress should not be called."; ADD_FAILURE() << "OnUploadProgress should not be called.";
...@@ -52,6 +54,8 @@ class TestRequestPeer : public RequestPeer { ...@@ -52,6 +54,8 @@ class TestRequestPeer : public RequestPeer {
void OnReceivedData(std::unique_ptr<ReceivedData> data) override { void OnReceivedData(std::unique_ptr<ReceivedData> data) override {
EXPECT_FALSE(context_->complete); EXPECT_FALSE(context_->complete);
context_->data.append(data->payload(), data->length()); context_->data.append(data->payload(), data->length());
if (context_->release_data_asynchronously)
task_runner_->DeleteSoon(FROM_HERE, data.release());
context_->run_loop_quit_closure.Run(); context_->run_loop_quit_closure.Run();
} }
...@@ -75,10 +79,12 @@ class TestRequestPeer : public RequestPeer { ...@@ -75,10 +79,12 @@ class TestRequestPeer : public RequestPeer {
bool complete = false; bool complete = false;
base::Closure run_loop_quit_closure; base::Closure run_loop_quit_closure;
int error_code = net::OK; int error_code = net::OK;
bool release_data_asynchronously = false;
}; };
private: private:
Context* context_; Context* context_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
DISALLOW_COPY_AND_ASSIGN(TestRequestPeer); DISALLOW_COPY_AND_ASSIGN(TestRequestPeer);
}; };
...@@ -130,7 +136,7 @@ class URLResponseBodyConsumerTest : public ::testing::Test, ...@@ -130,7 +136,7 @@ class URLResponseBodyConsumerTest : public ::testing::Test,
TestRequestPeer::Context* context) { TestRequestPeer::Context* context) {
return dispatcher_->StartAsync( return dispatcher_->StartAsync(
std::move(request), 0, nullptr, url::Origin(), std::move(request), 0, nullptr, url::Origin(),
base::MakeUnique<TestRequestPeer>(context), base::MakeUnique<TestRequestPeer>(context, message_loop_.task_runner()),
blink::WebURLRequest::LoadingIPCType::ChromeIPC, nullptr, nullptr); blink::WebURLRequest::LoadingIPCType::ChromeIPC, nullptr, nullptr);
} }
...@@ -154,7 +160,7 @@ TEST_F(URLResponseBodyConsumerTest, ReceiveData) { ...@@ -154,7 +160,7 @@ TEST_F(URLResponseBodyConsumerTest, ReceiveData) {
scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer( scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer(
request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle), request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle),
message_loop_.task_runner())); message_loop_.task_runner()));
consumer->Start(message_loop_.task_runner().get()); consumer->Start();
mojo::ScopedDataPipeProducerHandle writer = mojo::ScopedDataPipeProducerHandle writer =
std::move(data_pipe.producer_handle); std::move(data_pipe.producer_handle);
...@@ -180,7 +186,43 @@ TEST_F(URLResponseBodyConsumerTest, OnCompleteThenClose) { ...@@ -180,7 +186,43 @@ TEST_F(URLResponseBodyConsumerTest, OnCompleteThenClose) {
scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer( scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer(
request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle), request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle),
message_loop_.task_runner())); message_loop_.task_runner()));
consumer->Start(message_loop_.task_runner().get()); consumer->Start();
consumer->OnComplete(ResourceRequestCompletionStatus());
mojo::ScopedDataPipeProducerHandle writer =
std::move(data_pipe.producer_handle);
std::string buffer = "hello";
uint32_t size = buffer.size();
MojoResult result =
mojo::WriteDataRaw(writer.get(), buffer.c_str(), &size, kNone);
ASSERT_EQ(MOJO_RESULT_OK, result);
ASSERT_EQ(buffer.size(), size);
Run(&context);
writer.reset();
EXPECT_FALSE(context.complete);
EXPECT_EQ("hello", context.data);
Run(&context);
EXPECT_TRUE(context.complete);
EXPECT_EQ("hello", context.data);
}
// Release the received data asynchronously. This leads to MOJO_RESULT_BUSY
// from the BeginReadDataRaw call in OnReadable.
TEST_F(URLResponseBodyConsumerTest, OnCompleteThenCloseWithAsyncRelease) {
TestRequestPeer::Context context;
context.release_data_asynchronously = true;
std::unique_ptr<ResourceRequest> request(CreateResourceRequest());
int request_id = SetUpRequestPeer(std::move(request), &context);
mojo::DataPipe data_pipe(CreateDataPipeOptions());
scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer(
request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle),
message_loop_.task_runner()));
consumer->Start();
consumer->OnComplete(ResourceRequestCompletionStatus()); consumer->OnComplete(ResourceRequestCompletionStatus());
mojo::ScopedDataPipeProducerHandle writer = mojo::ScopedDataPipeProducerHandle writer =
...@@ -213,7 +255,7 @@ TEST_F(URLResponseBodyConsumerTest, CloseThenOnComplete) { ...@@ -213,7 +255,7 @@ TEST_F(URLResponseBodyConsumerTest, CloseThenOnComplete) {
scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer( scoped_refptr<URLResponseBodyConsumer> consumer(new URLResponseBodyConsumer(
request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle), request_id, dispatcher_.get(), std::move(data_pipe.consumer_handle),
message_loop_.task_runner())); message_loop_.task_runner()));
consumer->Start(message_loop_.task_runner().get()); consumer->Start();
ResourceRequestCompletionStatus status; ResourceRequestCompletionStatus status;
status.error_code = net::ERR_FAILED; status.error_code = net::ERR_FAILED;
......
...@@ -1924,7 +1924,6 @@ crbug.com/659917 virtual/mojo-loading/http/tests/xmlhttprequest/workers/xmlhttpr ...@@ -1924,7 +1924,6 @@ crbug.com/659917 virtual/mojo-loading/http/tests/xmlhttprequest/workers/xmlhttpr
crbug.com/659917 virtual/mojo-loading/http/tests/xmlhttprequest/workers/xmlhttprequest-response-type-blob-sync.html [ Pass Timeout ] crbug.com/659917 virtual/mojo-loading/http/tests/xmlhttprequest/workers/xmlhttprequest-response-type-blob-sync.html [ Pass Timeout ]
crbug.com/659917 virtual/mojo-loading/http/tests/xmlhttprequest/workers/shared-worker-response-type-blob-sync.html [ Pass Timeout ] crbug.com/659917 virtual/mojo-loading/http/tests/xmlhttprequest/workers/shared-worker-response-type-blob-sync.html [ Pass Timeout ]
crbug.com/669357 virtual/mojo-loading/http/tests/inspector/network/network-fetch.html [ Failure ]
crbug.com/669357 virtual/mojo-loading/http/tests/inspector-protocol/network-data-length.html [ Failure ] crbug.com/669357 virtual/mojo-loading/http/tests/inspector-protocol/network-data-length.html [ Failure ]
crbug.com/669357 virtual/mojo-loading/http/tests/inspector/network/network-datareceived.html [ Failure ] crbug.com/669357 virtual/mojo-loading/http/tests/inspector/network/network-datareceived.html [ Failure ]
crbug.com/669357 virtual/mojo-loading/http/tests/inspector/tracing/timeline-receive-response-event.html [ Failure ] crbug.com/669357 virtual/mojo-loading/http/tests/inspector/tracing/timeline-receive-response-event.html [ Failure ]
......
...@@ -45,7 +45,11 @@ function makeXHRForJSONArguments(jsonArgs) ...@@ -45,7 +45,11 @@ function makeXHRForJSONArguments(jsonArgs)
function makeFetch(url, requestInitializer) function makeFetch(url, requestInitializer)
{ {
return fetch(url, requestInitializer).catch(e => e); return fetch(url, requestInitializer).then(res => {
// Call text(). Otherwise the backpressure mechanism may block loading.
res.text();
return res;
}).catch(e => e);
} }
var initialize_NetworkTest = function() { var initialize_NetworkTest = function() {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment