Commit dc3ecbb0 authored by fdoray's avatar fdoray Committed by Commit bot

Allow URLFetcher to be used from sequenced tasks.

Before this CL, URLFetcher could only be instantiated from a
single-threaded task. The response was dispatched on the origin thread.

With this CL, URLFetcher can also be instantiated from a sequenced
task. The response is dispatched on the origin sequence.

Since SequencedTaskRunnerHandle behaves the same way as
ThreadTaskRunnerHandle when used from a single-threaded task, this
CL does not affect the behavior of existing URLFetcher call sites
(they are necessarily in single-threaded tasks because
base::ThreadTaskRunnerHandle::Get() crashes when called from
a non-single-threaded task).

BUG=675631

Review-Url: https://codereview.chromium.org/2599873002
Cr-Commit-Position: refs/heads/master@{#443224}
parent 91a4892f
...@@ -67,8 +67,8 @@ class URLRequestStatus; ...@@ -67,8 +67,8 @@ class URLRequestStatus;
// the URLFetcher instance. If the URLFetcher instance is destroyed before the // the URLFetcher instance. If the URLFetcher instance is destroyed before the
// callback happens, the fetch will be canceled and no callback will occur. // callback happens, the fetch will be canceled and no callback will occur.
// //
// You may create the URLFetcher instance on any thread; OnURLFetchComplete() // You may create the URLFetcher instance on any sequence; OnURLFetchComplete()
// will be called back on the same thread you use to create the instance. // will be called back on the same sequence you use to create the instance.
// //
// //
// NOTE: By default URLFetcher requests are NOT intercepted, except when // NOTE: By default URLFetcher requests are NOT intercepted, except when
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include "base/sequenced_task_runner.h" #include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/stl_util.h" #include "base/stl_util.h"
#include "base/threading/thread_task_runner_handle.h" #include "base/threading/sequenced_task_runner_handle.h"
#include "base/tracked_objects.h" #include "base/tracked_objects.h"
#include "net/base/elements_upload_data_stream.h" #include "net/base/elements_upload_data_stream.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
...@@ -79,7 +79,7 @@ URLFetcherCore::URLFetcherCore(URLFetcher* fetcher, ...@@ -79,7 +79,7 @@ URLFetcherCore::URLFetcherCore(URLFetcher* fetcher,
original_url_(original_url), original_url_(original_url),
request_type_(request_type), request_type_(request_type),
delegate_(d), delegate_(d),
delegate_task_runner_(base::ThreadTaskRunnerHandle::Get()), delegate_task_runner_(base::SequencedTaskRunnerHandle::Get()),
load_flags_(LOAD_NORMAL), load_flags_(LOAD_NORMAL),
response_code_(URLFetcher::RESPONSE_CODE_INVALID), response_code_(URLFetcher::RESPONSE_CODE_INVALID),
buffer_(new IOBuffer(kBufferSize)), buffer_(new IOBuffer(kBufferSize)),
...@@ -109,7 +109,7 @@ URLFetcherCore::URLFetcherCore(URLFetcher* fetcher, ...@@ -109,7 +109,7 @@ URLFetcherCore::URLFetcherCore(URLFetcher* fetcher,
} }
void URLFetcherCore::Start() { void URLFetcherCore::Start() {
DCHECK(delegate_task_runner_.get()); DCHECK(delegate_task_runner_);
DCHECK(request_context_getter_.get()) << "We need an URLRequestContext!"; DCHECK(request_context_getter_.get()) << "We need an URLRequestContext!";
if (network_task_runner_.get()) { if (network_task_runner_.get()) {
DCHECK_EQ(network_task_runner_, DCHECK_EQ(network_task_runner_,
...@@ -124,8 +124,8 @@ void URLFetcherCore::Start() { ...@@ -124,8 +124,8 @@ void URLFetcherCore::Start() {
} }
void URLFetcherCore::Stop() { void URLFetcherCore::Stop() {
if (delegate_task_runner_.get()) // May be NULL in tests. if (delegate_task_runner_) // May be NULL in tests.
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
delegate_ = NULL; delegate_ = NULL;
fetcher_ = NULL; fetcher_ = NULL;
...@@ -204,7 +204,7 @@ void URLFetcherCore::SetChunkedUpload(const std::string& content_type) { ...@@ -204,7 +204,7 @@ void URLFetcherCore::SetChunkedUpload(const std::string& content_type) {
void URLFetcherCore::AppendChunkToUpload(const std::string& content, void URLFetcherCore::AppendChunkToUpload(const std::string& content,
bool is_last_chunk) { bool is_last_chunk) {
DCHECK(delegate_task_runner_.get()); DCHECK(delegate_task_runner_);
DCHECK(network_task_runner_.get()); DCHECK(network_task_runner_.get());
DCHECK(is_chunked_upload_); DCHECK(is_chunked_upload_);
network_task_runner_->PostTask( network_task_runner_->PostTask(
...@@ -289,21 +289,21 @@ void URLFetcherCore::SetAutomaticallyRetryOnNetworkChanges(int max_retries) { ...@@ -289,21 +289,21 @@ void URLFetcherCore::SetAutomaticallyRetryOnNetworkChanges(int max_retries) {
void URLFetcherCore::SaveResponseToFileAtPath( void URLFetcherCore::SaveResponseToFileAtPath(
const base::FilePath& file_path, const base::FilePath& file_path,
scoped_refptr<base::SequencedTaskRunner> file_task_runner) { scoped_refptr<base::SequencedTaskRunner> file_task_runner) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
SaveResponseWithWriter(std::unique_ptr<URLFetcherResponseWriter>( SaveResponseWithWriter(std::unique_ptr<URLFetcherResponseWriter>(
new URLFetcherFileWriter(file_task_runner, file_path))); new URLFetcherFileWriter(file_task_runner, file_path)));
} }
void URLFetcherCore::SaveResponseToTemporaryFile( void URLFetcherCore::SaveResponseToTemporaryFile(
scoped_refptr<base::SequencedTaskRunner> file_task_runner) { scoped_refptr<base::SequencedTaskRunner> file_task_runner) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
SaveResponseWithWriter(std::unique_ptr<URLFetcherResponseWriter>( SaveResponseWithWriter(std::unique_ptr<URLFetcherResponseWriter>(
new URLFetcherFileWriter(file_task_runner, base::FilePath()))); new URLFetcherFileWriter(file_task_runner, base::FilePath())));
} }
void URLFetcherCore::SaveResponseWithWriter( void URLFetcherCore::SaveResponseWithWriter(
std::unique_ptr<URLFetcherResponseWriter> response_writer) { std::unique_ptr<URLFetcherResponseWriter> response_writer) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
response_writer_ = std::move(response_writer); response_writer_ = std::move(response_writer);
} }
...@@ -351,7 +351,7 @@ int URLFetcherCore::GetResponseCode() const { ...@@ -351,7 +351,7 @@ int URLFetcherCore::GetResponseCode() const {
} }
void URLFetcherCore::ReceivedContentWasMalformed() { void URLFetcherCore::ReceivedContentWasMalformed() {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
if (network_task_runner_.get()) { if (network_task_runner_.get()) {
network_task_runner_->PostTask( network_task_runner_->PostTask(
FROM_HERE, base::Bind(&URLFetcherCore::NotifyMalformedContent, this)); FROM_HERE, base::Bind(&URLFetcherCore::NotifyMalformedContent, this));
...@@ -373,7 +373,7 @@ bool URLFetcherCore::GetResponseAsString( ...@@ -373,7 +373,7 @@ bool URLFetcherCore::GetResponseAsString(
bool URLFetcherCore::GetResponseAsFilePath(bool take_ownership, bool URLFetcherCore::GetResponseAsFilePath(bool take_ownership,
base::FilePath* out_response_path) { base::FilePath* out_response_path) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
URLFetcherFileWriter* file_writer = URLFetcherFileWriter* file_writer =
response_writer_ ? response_writer_->AsFileWriter() : NULL; response_writer_ ? response_writer_->AsFileWriter() : NULL;
...@@ -670,7 +670,7 @@ void URLFetcherCore::StartURLRequestWhenAppropriate() { ...@@ -670,7 +670,7 @@ void URLFetcherCore::StartURLRequestWhenAppropriate() {
original_url_throttler_entry_->ReserveSendingTimeForNextRequest( original_url_throttler_entry_->ReserveSendingTimeForNextRequest(
GetBackoffReleaseTime()); GetBackoffReleaseTime());
if (delay != 0) { if (delay != 0) {
base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( network_task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&URLFetcherCore::StartURLRequest, this), FROM_HERE, base::Bind(&URLFetcherCore::StartURLRequest, this),
base::TimeDelta::FromMilliseconds(delay)); base::TimeDelta::FromMilliseconds(delay));
return; return;
...@@ -710,7 +710,7 @@ void URLFetcherCore::CancelURLRequest(int error) { ...@@ -710,7 +710,7 @@ void URLFetcherCore::CancelURLRequest(int error) {
void URLFetcherCore::OnCompletedURLRequest( void URLFetcherCore::OnCompletedURLRequest(
base::TimeDelta backoff_delay) { base::TimeDelta backoff_delay) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
// Save the status and backoff_delay so that delegates can read it. // Save the status and backoff_delay so that delegates can read it.
if (delegate_) { if (delegate_) {
...@@ -720,7 +720,7 @@ void URLFetcherCore::OnCompletedURLRequest( ...@@ -720,7 +720,7 @@ void URLFetcherCore::OnCompletedURLRequest(
} }
void URLFetcherCore::InformDelegateFetchIsComplete() { void URLFetcherCore::InformDelegateFetchIsComplete() {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
if (delegate_) if (delegate_)
delegate_->OnURLFetchComplete(fetcher_); delegate_->OnURLFetchComplete(fetcher_);
} }
...@@ -911,16 +911,16 @@ void URLFetcherCore::InformDelegateUploadProgress() { ...@@ -911,16 +911,16 @@ void URLFetcherCore::InformDelegateUploadProgress() {
delegate_task_runner_->PostTask( delegate_task_runner_->PostTask(
FROM_HERE, FROM_HERE,
base::Bind( base::Bind(
&URLFetcherCore::InformDelegateUploadProgressInDelegateThread, &URLFetcherCore::InformDelegateUploadProgressInDelegateSequence,
this, current, total)); this, current, total));
} }
} }
} }
void URLFetcherCore::InformDelegateUploadProgressInDelegateThread( void URLFetcherCore::InformDelegateUploadProgressInDelegateSequence(
int64_t current, int64_t current,
int64_t total) { int64_t total) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
if (delegate_) if (delegate_)
delegate_->OnURLFetchUploadProgress(fetcher_, current, total); delegate_->OnURLFetchUploadProgress(fetcher_, current, total);
} }
...@@ -936,16 +936,16 @@ void URLFetcherCore::InformDelegateDownloadProgress() { ...@@ -936,16 +936,16 @@ void URLFetcherCore::InformDelegateDownloadProgress() {
delegate_task_runner_->PostTask( delegate_task_runner_->PostTask(
FROM_HERE, FROM_HERE,
base::Bind( base::Bind(
&URLFetcherCore::InformDelegateDownloadProgressInDelegateThread, this, &URLFetcherCore::InformDelegateDownloadProgressInDelegateSequence,
current_response_bytes_, total_response_bytes_, this, current_response_bytes_, total_response_bytes_,
request_->GetTotalReceivedBytes())); request_->GetTotalReceivedBytes()));
} }
void URLFetcherCore::InformDelegateDownloadProgressInDelegateThread( void URLFetcherCore::InformDelegateDownloadProgressInDelegateSequence(
int64_t current, int64_t current,
int64_t total, int64_t total,
int64_t current_network_bytes) { int64_t current_network_bytes) {
DCHECK(delegate_task_runner_->BelongsToCurrentThread()); DCHECK(delegate_task_runner_->RunsTasksOnCurrentThread());
if (delegate_) if (delegate_)
delegate_->OnURLFetchDownloadProgress(fetcher_, current, total, delegate_->OnURLFetchDownloadProgress(fetcher_, current, total,
current_network_bytes); current_network_bytes);
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "url/gurl.h" #include "url/gurl.h"
namespace base { namespace base {
class SequencedTaskRunner;
class SingleThreadTaskRunner; class SingleThreadTaskRunner;
} // namespace base } // namespace base
...@@ -216,10 +217,10 @@ class URLFetcherCore : public base::RefCountedThreadSafe<URLFetcherCore>, ...@@ -216,10 +217,10 @@ class URLFetcherCore : public base::RefCountedThreadSafe<URLFetcherCore>,
// Notify Delegate about the progress of upload/download. // Notify Delegate about the progress of upload/download.
void InformDelegateUploadProgress(); void InformDelegateUploadProgress();
void InformDelegateUploadProgressInDelegateThread(int64_t current, void InformDelegateUploadProgressInDelegateSequence(int64_t current,
int64_t total); int64_t total);
void InformDelegateDownloadProgress(); void InformDelegateDownloadProgress();
void InformDelegateDownloadProgressInDelegateThread( void InformDelegateDownloadProgressInDelegateSequence(
int64_t current, int64_t current,
int64_t total, int64_t total,
int64_t current_network_bytes); int64_t current_network_bytes);
...@@ -233,8 +234,8 @@ class URLFetcherCore : public base::RefCountedThreadSafe<URLFetcherCore>, ...@@ -233,8 +234,8 @@ class URLFetcherCore : public base::RefCountedThreadSafe<URLFetcherCore>,
URLFetcher::RequestType request_type_; // What type of request is this? URLFetcher::RequestType request_type_; // What type of request is this?
URLRequestStatus status_; // Status of the request URLRequestStatus status_; // Status of the request
URLFetcherDelegate* delegate_; // Object to notify on completion URLFetcherDelegate* delegate_; // Object to notify on completion
// Task runner for the creating thread. Used to interact with the delegate. // Task runner for the creating sequence. Used to interact with the delegate.
scoped_refptr<base::SingleThreadTaskRunner> delegate_task_runner_; const scoped_refptr<base::SequencedTaskRunner> delegate_task_runner_;
// Task runner for network operations. // Task runner for network operations.
scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_; scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
// Task runner for upload file access. // Task runner for upload file access.
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/strings/stringprintf.h" #include "base/strings/stringprintf.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#include "base/task_scheduler/post_task.h"
#include "base/test/scoped_task_scheduler.h"
#include "base/test/test_timeouts.h" #include "base/test/test_timeouts.h"
#include "base/threading/platform_thread.h" #include "base/threading/platform_thread.h"
#include "base/threading/thread.h" #include "base/threading/thread.h"
...@@ -515,6 +517,33 @@ TEST_F(URLFetcherTest, DifferentThreadsTest) { ...@@ -515,6 +517,33 @@ TEST_F(URLFetcherTest, DifferentThreadsTest) {
EXPECT_EQ(kDefaultResponseBody, data); EXPECT_EQ(kDefaultResponseBody, data);
} }
// Create the fetcher from a sequenced (not single-threaded) task. Verify that
// the expected response is received.
TEST_F(URLFetcherTest, SequencedTaskTest) {
base::test::ScopedTaskScheduler scoped_task_scheduler(
base::MessageLoop::current());
auto sequenced_task_runner =
base::CreateSequencedTaskRunnerWithTraits(base::TaskTraits());
auto delegate = base::MakeUnique<WaitingURLFetcherDelegate>();
sequenced_task_runner->PostTask(
FROM_HERE, base::Bind(&WaitingURLFetcherDelegate::CreateFetcher,
base::Unretained(delegate.get()),
test_server_->GetURL(kDefaultResponsePath),
URLFetcher::GET, CreateCrossThreadContextGetter()));
base::RunLoop().RunUntilIdle();
delegate->StartFetcherAndWait();
EXPECT_TRUE(delegate->fetcher()->GetStatus().is_success());
EXPECT_EQ(200, delegate->fetcher()->GetResponseCode());
std::string data;
ASSERT_TRUE(delegate->fetcher()->GetResponseAsString(&data));
EXPECT_EQ(kDefaultResponseBody, data);
sequenced_task_runner->DeleteSoon(FROM_HERE, delegate.release());
base::RunLoop().RunUntilIdle();
}
// Tests to make sure CancelAll() will successfully cancel existing URLFetchers. // Tests to make sure CancelAll() will successfully cancel existing URLFetchers.
TEST_F(URLFetcherTest, CancelAll) { TEST_F(URLFetcherTest, CancelAll) {
EXPECT_EQ(0, GetNumFetcherCores()); EXPECT_EQ(0, GetNumFetcherCores());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment