Commit 8e4e785c authored by Hajime Hoshi's avatar Hajime Hoshi Committed by Commit Bot

Use more appropriate task runner for MessagePortChannel::State

This is part of efforts to replace base::ThreadTaskRunnerHandle::Get()
with other appropriate task runners in the renderer.

Bug: 786332
Change-Id: Id3a68d0efd803d9ab3c1670bfa641ca9641c3af9
Reviewed-on: https://chromium-review.googlesource.com/776641Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Reviewed-by: default avatarKinuko Yasuda <kinuko@chromium.org>
Commit-Queue: Hajime Hoshi <hajimehoshi@chromium.org>
Cr-Commit-Position: refs/heads/master@{#519596}
parent 35e271db
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "base/android/jni_android.h" #include "base/android/jni_android.h"
#include "base/android/jni_string.h" #include "base/android/jni_string.h"
#include "base/bind.h" #include "base/bind.h"
#include "base/threading/thread_task_runner_handle.h"
#include "content/browser/android/string_message_codec.h" #include "content/browser/android/string_message_codec.h"
#include "jni/AppWebMessagePort_jni.h" #include "jni/AppWebMessagePort_jni.h"
...@@ -121,7 +122,8 @@ void AppWebMessagePort::StartReceivingMessages( ...@@ -121,7 +122,8 @@ void AppWebMessagePort::StartReceivingMessages(
JNIEnv* env, JNIEnv* env,
const base::android::JavaParamRef<jobject>& jcaller) { const base::android::JavaParamRef<jobject>& jcaller) {
channel_.SetCallback(base::Bind(&AppWebMessagePort::OnMessagesAvailable, channel_.SetCallback(base::Bind(&AppWebMessagePort::OnMessagesAvailable,
base::Unretained(this))); base::Unretained(this)),
base::ThreadTaskRunnerHandle::Get());
} }
AppWebMessagePort::AppWebMessagePort( AppWebMessagePort::AppWebMessagePort(
......
...@@ -126,7 +126,7 @@ std::vector<uint8_t> StringPieceToVector(base::StringPiece s) { ...@@ -126,7 +126,7 @@ std::vector<uint8_t> StringPieceToVector(base::StringPiece s) {
void BlockingReadFromMessagePort(MessagePortChannel port, void BlockingReadFromMessagePort(MessagePortChannel port,
std::vector<uint8_t>* message) { std::vector<uint8_t>* message) {
base::RunLoop run_loop; base::RunLoop run_loop;
port.SetCallback(run_loop.QuitClosure()); port.SetCallback(run_loop.QuitClosure(), base::ThreadTaskRunnerHandle::Get());
run_loop.Run(); run_loop.Run();
std::vector<MessagePortChannel> should_be_empty; std::vector<MessagePortChannel> should_be_empty;
......
...@@ -124,8 +124,10 @@ void MessagePort::start() { ...@@ -124,8 +124,10 @@ void MessagePort::start() {
return; return;
// Note that MessagePortChannel may call this callback on any thread. // Note that MessagePortChannel may call this callback on any thread.
channel_.SetCallback(ConvertToBaseCallback(CrossThreadBind( channel_.SetCallback(
&MessagePort::MessageAvailable, WrapCrossThreadWeakPersistent(this)))); ConvertToBaseCallback(CrossThreadBind(
&MessagePort::MessageAvailable, WrapCrossThreadWeakPersistent(this))),
task_runner_->ToSingleThreadTaskRunner());
started_ = true; started_ = true;
MessageAvailable(); MessageAvailable();
} }
......
...@@ -106,9 +106,11 @@ bool MessagePortChannel::GetMojoMessage(mojo::Message* message) { ...@@ -106,9 +106,11 @@ bool MessagePortChannel::GetMojoMessage(mojo::Message* message) {
return true; return true;
} }
void MessagePortChannel::SetCallback(const base::Closure& callback) { void MessagePortChannel::SetCallback(
const base::Closure& callback,
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
state_->StopWatching(); state_->StopWatching();
state_->StartWatching(callback); state_->StartWatching(callback, task_runner);
} }
void MessagePortChannel::ClearCallback() { void MessagePortChannel::ClearCallback() {
...@@ -120,11 +122,15 @@ MessagePortChannel::State::State() {} ...@@ -120,11 +122,15 @@ MessagePortChannel::State::State() {}
MessagePortChannel::State::State(mojo::ScopedMessagePipeHandle handle) MessagePortChannel::State::State(mojo::ScopedMessagePipeHandle handle)
: handle_(std::move(handle)) {} : handle_(std::move(handle)) {}
void MessagePortChannel::State::StartWatching(const base::Closure& callback) { void MessagePortChannel::State::StartWatching(
const base::Closure& callback,
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
base::AutoLock lock(lock_); base::AutoLock lock(lock_);
DCHECK(!callback_); DCHECK(!callback_);
DCHECK(handle_.is_valid()); DCHECK(handle_.is_valid());
DCHECK(task_runner);
callback_ = callback; callback_ = callback;
task_runner_ = task_runner;
DCHECK(!watcher_handle_.is_valid()); DCHECK(!watcher_handle_.is_valid());
MojoResult rv = CreateWatcher(&State::CallOnHandleReady, &watcher_handle_); MojoResult rv = CreateWatcher(&State::CallOnHandleReady, &watcher_handle_);
...@@ -152,6 +158,7 @@ void MessagePortChannel::State::StopWatching() { ...@@ -152,6 +158,7 @@ void MessagePortChannel::State::StopWatching() {
base::AutoLock lock(lock_); base::AutoLock lock(lock_);
watcher_handle = std::move(watcher_handle_); watcher_handle = std::move(watcher_handle_);
callback_.Reset(); callback_.Reset();
task_runner_ = nullptr;
} }
} }
...@@ -186,7 +193,7 @@ void MessagePortChannel::State::ArmWatcher() { ...@@ -186,7 +193,7 @@ void MessagePortChannel::State::ArmWatcher() {
if (ready_result == MOJO_RESULT_OK) { if (ready_result == MOJO_RESULT_OK) {
// The handle is already signaled, so we trigger a callback now. // The handle is already signaled, so we trigger a callback now.
base::ThreadTaskRunnerHandle::Get()->PostTask( task_runner_->PostTask(
FROM_HERE, base::BindOnce(&State::OnHandleReady, this, MOJO_RESULT_OK)); FROM_HERE, base::BindOnce(&State::OnHandleReady, this, MOJO_RESULT_OK));
return; return;
} }
......
...@@ -83,7 +83,8 @@ class BLINK_COMMON_EXPORT MessagePortChannel { ...@@ -83,7 +83,8 @@ class BLINK_COMMON_EXPORT MessagePortChannel {
// This callback will be invoked on a background thread when messages are // This callback will be invoked on a background thread when messages are
// available to be read via GetMessage. It must not synchronously call back // available to be read via GetMessage. It must not synchronously call back
// into the MessagePortChannel instance. // into the MessagePortChannel instance.
void SetCallback(const base::Closure& callback); void SetCallback(const base::Closure& callback,
scoped_refptr<base::SingleThreadTaskRunner> task_runner);
// Clears any callback specified by a prior call to SetCallback. // Clears any callback specified by a prior call to SetCallback.
void ClearCallback(); void ClearCallback();
...@@ -94,7 +95,8 @@ class BLINK_COMMON_EXPORT MessagePortChannel { ...@@ -94,7 +95,8 @@ class BLINK_COMMON_EXPORT MessagePortChannel {
State(); State();
explicit State(mojo::ScopedMessagePipeHandle handle); explicit State(mojo::ScopedMessagePipeHandle handle);
void StartWatching(const base::Closure& callback); void StartWatching(const base::Closure& callback,
scoped_refptr<base::SingleThreadTaskRunner> task_runner);
void StopWatching(); void StopWatching();
mojo::ScopedMessagePipeHandle TakeHandle(); mojo::ScopedMessagePipeHandle TakeHandle();
...@@ -123,6 +125,8 @@ class BLINK_COMMON_EXPORT MessagePortChannel { ...@@ -123,6 +125,8 @@ class BLINK_COMMON_EXPORT MessagePortChannel {
// Callback to invoke when the State is notified about a change to // Callback to invoke when the State is notified about a change to
// |handle_|'s signaling state. // |handle_|'s signaling state.
base::Closure callback_; base::Closure callback_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
}; };
mutable scoped_refptr<State> state_; mutable scoped_refptr<State> state_;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment