Commit 8866b1dd authored by Yuzu Saijo's avatar Yuzu Saijo Committed by Commit Bot

Store per-frame task runners for IPC messages task execution

This CL stores per-frame task runners for IPC messages to be executed on
using routing_id. The task runners are stored on IPC::ChannelProxy::Context and will be
looked up each time when IPC messages arrives to the renderer.

Code-wise, this CL
- Introduces IPC::ChannelProxy::Context::{AddListenerTaskRunner, GetTaskRunner}
- {Associate, query} a task runner {to, from} a IPC routing_id
- Before this CL, all IPC tasks were PostTask to {ipc, listener} task runner, but after this CL,
 they are PostTask to the task runner associated to their IPC message's routing_id.
- Renames kInternalNavigation to kInternalNavigationAssociated,
- Associate frame routing_id to per-frame task runner, so the frame IPC messages are routed to
 the task runner.

Design-doc: https://docs.google.com/document/d/1dfQEPeeqOBigyzaeLR5dYaAgeeNDhCkWsPyxodD5xTA/edit?usp=sharing

Change-Id: Ie7500355ca489ee192f9de53f05d07774cf286fb
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1526067
Commit-Queue: Yuzu Saijo <yuzus@chromium.org>
Reviewed-by: default avatarKen Rockot <rockot@google.com>
Reviewed-by: default avatarKouhei Ueno <kouhei@chromium.org>
Reviewed-by: default avatarDaniel Cheng <dcheng@chromium.org>
Reviewed-by: default avatarAlexander Timin <altimin@chromium.org>
Reviewed-by: default avatarKentaro Hara <haraken@chromium.org>
Cr-Commit-Position: refs/heads/master@{#662948}
parent c6391e1a
...@@ -58,8 +58,8 @@ void NavigationClient::CommitFailedNavigation( ...@@ -58,8 +58,8 @@ void NavigationClient::CommitFailedNavigation(
void NavigationClient::Bind(mojom::NavigationClientAssociatedRequest request) { void NavigationClient::Bind(mojom::NavigationClientAssociatedRequest request) {
navigation_client_binding_.Bind( navigation_client_binding_.Bind(
std::move(request), std::move(request), render_frame_->GetTaskRunner(
render_frame_->GetTaskRunner(blink::TaskType::kInternalNavigation)); blink::TaskType::kInternalNavigationAssociated));
SetDisconnectionHandler(); SetDisconnectionHandler();
} }
......
...@@ -1069,6 +1069,10 @@ void RenderThreadImpl::AddRoute(int32_t routing_id, IPC::Listener* listener) { ...@@ -1069,6 +1069,10 @@ void RenderThreadImpl::AddRoute(int32_t routing_id, IPC::Listener* listener) {
if (!frame) if (!frame)
return; return;
GetChannel()->AddListenerTaskRunner(
routing_id,
frame->GetTaskRunner(blink::TaskType::kInternalNavigationAssociated));
scoped_refptr<PendingFrameCreate> create(it->second); scoped_refptr<PendingFrameCreate> create(it->second);
frame->BindFrame(it->second->browser_info(), it->second->TakeFrameRequest()); frame->BindFrame(it->second->browser_info(), it->second->TakeFrameRequest());
pending_frame_creates_.erase(it); pending_frame_creates_.erase(it);
...@@ -1076,6 +1080,7 @@ void RenderThreadImpl::AddRoute(int32_t routing_id, IPC::Listener* listener) { ...@@ -1076,6 +1080,7 @@ void RenderThreadImpl::AddRoute(int32_t routing_id, IPC::Listener* listener) {
void RenderThreadImpl::RemoveRoute(int32_t routing_id) { void RenderThreadImpl::RemoveRoute(int32_t routing_id) {
ChildThreadImpl::GetRouter()->RemoveRoute(routing_id); ChildThreadImpl::GetRouter()->RemoveRoute(routing_id);
GetChannel()->RemoveListenerTaskRunner(routing_id);
} }
void RenderThreadImpl::RegisterPendingFrameCreate( void RenderThreadImpl::RegisterPendingFrameCreate(
......
...@@ -32,7 +32,7 @@ ChannelProxy::Context::Context( ...@@ -32,7 +32,7 @@ ChannelProxy::Context::Context(
Listener* listener, Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
: listener_task_runner_(listener_task_runner), : default_listener_task_runner_(listener_task_runner),
listener_(listener), listener_(listener),
ipc_task_runner_(ipc_task_runner), ipc_task_runner_(ipc_task_runner),
channel_connected_called_(false), channel_connected_called_(false),
...@@ -47,7 +47,8 @@ ChannelProxy::Context::Context( ...@@ -47,7 +47,8 @@ ChannelProxy::Context::Context(
// Note, we currently make an exception for a NULL listener. That usage // Note, we currently make an exception for a NULL listener. That usage
// basically works, but is outside the intent of ChannelProxy. This support // basically works, but is outside the intent of ChannelProxy. This support
// will disappear, so please don't rely on it. See crbug.com/364241 // will disappear, so please don't rely on it. See crbug.com/364241
DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get())); DCHECK(!listener ||
(ipc_task_runner_.get() != default_listener_task_runner_.get()));
} }
ChannelProxy::Context::~Context() = default; ChannelProxy::Context::~Context() = default;
...@@ -85,9 +86,9 @@ bool ChannelProxy::Context::TryFilters(const Message& message) { ...@@ -85,9 +86,9 @@ bool ChannelProxy::Context::TryFilters(const Message& message) {
if (message_filter_router_->TryFilters(message)) { if (message_filter_router_->TryFilters(message)) {
if (message.dispatch_error()) { if (message.dispatch_error()) {
listener_task_runner_->PostTask( GetTaskRunner(message.routing_id())
FROM_HERE, ->PostTask(FROM_HERE, base::BindOnce(&Context::OnDispatchBadMessage,
base::BindOnce(&Context::OnDispatchBadMessage, this, message)); this, message));
} }
#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED) #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
if (logger->Enabled()) if (logger->Enabled())
...@@ -126,8 +127,9 @@ bool ChannelProxy::Context::OnMessageReceived(const Message& message) { ...@@ -126,8 +127,9 @@ bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
// Called on the IPC::Channel thread // Called on the IPC::Channel thread
bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
listener_task_runner_->PostTask( GetTaskRunner(message.routing_id())
FROM_HERE, base::BindOnce(&Context::OnDispatchMessage, this, message)); ->PostTask(FROM_HERE,
base::BindOnce(&Context::OnDispatchMessage, this, message));
return true; return true;
} }
...@@ -145,8 +147,8 @@ void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) { ...@@ -145,8 +147,8 @@ void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) {
// the filter is run on the IO thread. // the filter is run on the IO thread.
OnAddFilter(); OnAddFilter();
// See above comment about using listener_task_runner_ here. // See above comment about using default_listener_task_runner_ here.
listener_task_runner_->PostTask( default_listener_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&Context::OnDispatchConnected, this)); FROM_HERE, base::BindOnce(&Context::OnDispatchConnected, this));
} }
...@@ -155,8 +157,8 @@ void ChannelProxy::Context::OnChannelError() { ...@@ -155,8 +157,8 @@ void ChannelProxy::Context::OnChannelError() {
for (size_t i = 0; i < filters_.size(); ++i) for (size_t i = 0; i < filters_.size(); ++i)
filters_[i]->OnChannelError(); filters_[i]->OnChannelError();
// See above comment about using listener_task_runner_ here. // See above comment about using default_listener_task_runner_ here.
listener_task_runner_->PostTask( default_listener_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&Context::OnDispatchError, this)); FROM_HERE, base::BindOnce(&Context::OnDispatchError, this));
} }
...@@ -164,7 +166,7 @@ void ChannelProxy::Context::OnChannelError() { ...@@ -164,7 +166,7 @@ void ChannelProxy::Context::OnChannelError() {
void ChannelProxy::Context::OnAssociatedInterfaceRequest( void ChannelProxy::Context::OnAssociatedInterfaceRequest(
const std::string& interface_name, const std::string& interface_name,
mojo::ScopedInterfaceEndpointHandle handle) { mojo::ScopedInterfaceEndpointHandle handle) {
listener_task_runner_->PostTask( default_listener_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&Context::OnDispatchAssociatedInterfaceRequest, FROM_HERE, base::BindOnce(&Context::OnDispatchAssociatedInterfaceRequest,
this, interface_name, std::move(handle))); this, interface_name, std::move(handle)));
} }
...@@ -331,6 +333,40 @@ void ChannelProxy::Context::OnDispatchMessage(const Message& message) { ...@@ -331,6 +333,40 @@ void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
#endif #endif
} }
// Called on the listener's thread.
void ChannelProxy::Context::AddListenerTaskRunner(
int32_t routing_id,
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
DCHECK(default_listener_task_runner_->BelongsToCurrentThread());
DCHECK(task_runner);
base::AutoLock lock(listener_thread_task_runners_lock_);
if (!base::ContainsKey(listener_thread_task_runners_, routing_id))
listener_thread_task_runners_.insert({routing_id, std::move(task_runner)});
}
// Called on the listener's thread.
void ChannelProxy::Context::RemoveListenerTaskRunner(int32_t routing_id) {
DCHECK(default_listener_task_runner_->BelongsToCurrentThread());
base::AutoLock lock(listener_thread_task_runners_lock_);
if (base::ContainsKey(listener_thread_task_runners_, routing_id))
listener_thread_task_runners_.erase(routing_id);
}
// Called on the IPC::Channel thread.
base::SingleThreadTaskRunner* ChannelProxy::Context::GetTaskRunner(
int32_t routing_id) {
DCHECK(ipc_task_runner_->BelongsToCurrentThread());
if (routing_id == MSG_ROUTING_NONE)
return default_listener_task_runner_.get();
base::AutoLock lock(listener_thread_task_runners_lock_);
base::SingleThreadTaskRunner* task_runner =
listener_thread_task_runners_[routing_id].get();
if (task_runner)
return task_runner;
return default_listener_task_runner_.get();
}
// Called on the listener's thread // Called on the listener's thread
void ChannelProxy::Context::OnDispatchConnected() { void ChannelProxy::Context::OnDispatchConnected() {
if (channel_connected_called_) if (channel_connected_called_)
......
...@@ -263,7 +263,7 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender { ...@@ -263,7 +263,7 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
} }
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner() { scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner() {
return listener_task_runner_; return default_listener_task_runner_;
} }
// Dispatches a message on the listener thread. // Dispatches a message on the listener thread.
...@@ -272,6 +272,18 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender { ...@@ -272,6 +272,18 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
// Sends |message| from appropriate thread. // Sends |message| from appropriate thread.
void Send(Message* message, const char* debug_name); void Send(Message* message, const char* debug_name);
// Adds |task_runner| for the task to be executed later.
void AddListenerTaskRunner(
int32_t routing_id,
scoped_refptr<base::SingleThreadTaskRunner> task_runner);
// Removes task runner for |routing_id|.
void RemoveListenerTaskRunner(int32_t routing_id);
// Called on the IPC::Channel thread.
// Returns the task runner associated with |routing_id|.
base::SingleThreadTaskRunner* GetTaskRunner(int32_t routing_id);
protected: protected:
friend class base::RefCountedThreadSafe<Context>; friend class base::RefCountedThreadSafe<Context>;
~Context() override; ~Context() override;
...@@ -336,7 +348,13 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender { ...@@ -336,7 +348,13 @@ class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
const std::string& name, const std::string& name,
const GenericAssociatedInterfaceFactory& factory); const GenericAssociatedInterfaceFactory& factory);
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; base::Lock listener_thread_task_runners_lock_;
// Map of routing_id and listener's thread task runner.
std::map<int32_t, scoped_refptr<base::SingleThreadTaskRunner>>
listener_thread_task_runners_
GUARDED_BY(listener_thread_task_runners_lock_);
scoped_refptr<base::SingleThreadTaskRunner> default_listener_task_runner_;
Listener* listener_; Listener* listener_;
// List of filters. This is only accessed on the IPC thread. // List of filters. This is only accessed on the IPC thread.
......
...@@ -578,6 +578,16 @@ SyncChannel::SyncChannel( ...@@ -578,6 +578,16 @@ SyncChannel::SyncChannel(
StartWatching(); StartWatching();
} }
void SyncChannel::AddListenerTaskRunner(
int32_t routing_id,
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
context()->AddListenerTaskRunner(routing_id, std::move(task_runner));
}
void SyncChannel::RemoveListenerTaskRunner(int32_t routing_id) {
context()->RemoveListenerTaskRunner(routing_id);
}
SyncChannel::~SyncChannel() = default; SyncChannel::~SyncChannel() = default;
void SyncChannel::SetRestrictDispatchChannelGroup(int group) { void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
......
...@@ -96,6 +96,12 @@ class COMPONENT_EXPORT(IPC) SyncChannel : public ChannelProxy { ...@@ -96,6 +96,12 @@ class COMPONENT_EXPORT(IPC) SyncChannel : public ChannelProxy {
const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner, const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
base::WaitableEvent* shutdown_event); base::WaitableEvent* shutdown_event);
void AddListenerTaskRunner(
int32_t routing_id,
scoped_refptr<base::SingleThreadTaskRunner> task_runner);
void RemoveListenerTaskRunner(int32_t routing_id);
~SyncChannel() override; ~SyncChannel() override;
bool Send(Message* message) override; bool Send(Message* message) override;
......
...@@ -203,8 +203,12 @@ enum class TaskType : unsigned char { ...@@ -203,8 +203,12 @@ enum class TaskType : unsigned char {
// Task used for ContentCapture. // Task used for ContentCapture.
kInternalContentCapture = 61, kInternalContentCapture = 61,
// Task used for Navigations. // Navigation tasks and tasks which have to run in order with them, including
kInternalNavigation = 63, // legacy IPCs and channel associated interfaces.
// Note that the ordering between tasks related to different frames is not
// always guaranteed - tasks belonging to different frames can be reordered
// when one of the frames is frozen.
kInternalNavigationAssociated = 63,
/////////////////////////////////////// ///////////////////////////////////////
// The following task types are only for thread-local queues. // The following task types are only for thread-local queues.
......
...@@ -470,7 +470,7 @@ base::Optional<QueueTraits> FrameSchedulerImpl::CreateQueueTraitsForTaskType( ...@@ -470,7 +470,7 @@ base::Optional<QueueTraits> FrameSchedulerImpl::CreateQueueTraitsForTaskType(
case TaskType::kInternalTranslation: case TaskType::kInternalTranslation:
return ForegroundOnlyTaskQueueTraits(); return ForegroundOnlyTaskQueueTraits();
// Navigation IPCs do not run using virtual time to avoid hanging. // Navigation IPCs do not run using virtual time to avoid hanging.
case TaskType::kInternalNavigation: case TaskType::kInternalNavigationAssociated:
return DoesNotUseVirtualTimeTaskQueueTraits(); return DoesNotUseVirtualTimeTaskQueueTraits();
case TaskType::kDeprecatedNone: case TaskType::kDeprecatedNone:
case TaskType::kMainThreadTaskQueueV8: case TaskType::kMainThreadTaskQueueV8:
......
...@@ -129,8 +129,8 @@ const char* TaskTypeNames::TaskTypeToString(TaskType task_type) { ...@@ -129,8 +129,8 @@ const char* TaskTypeNames::TaskTypeToString(TaskType task_type) {
return "InternalTranslation"; return "InternalTranslation";
case TaskType::kInternalContentCapture: case TaskType::kInternalContentCapture:
return "InternalContentCapture"; return "InternalContentCapture";
case TaskType::kInternalNavigation: case TaskType::kInternalNavigationAssociated:
return "InternalNavigation"; return "InternalNavigationAssociated";
case TaskType::kCount: case TaskType::kCount:
return "Count"; return "Count";
} }
......
...@@ -165,7 +165,7 @@ scoped_refptr<base::SingleThreadTaskRunner> WorkerScheduler::GetTaskRunner( ...@@ -165,7 +165,7 @@ scoped_refptr<base::SingleThreadTaskRunner> WorkerScheduler::GetTaskRunner(
case TaskType::kInternalIPC: case TaskType::kInternalIPC:
case TaskType::kInternalInspector: case TaskType::kInternalInspector:
case TaskType::kInternalTest: case TaskType::kInternalTest:
case TaskType::kInternalNavigation: case TaskType::kInternalNavigationAssociated:
// UnthrottledTaskRunner is generally discouraged in future. // UnthrottledTaskRunner is generally discouraged in future.
// TODO(nhiroki): Identify which tasks can be throttled / suspendable and // TODO(nhiroki): Identify which tasks can be throttled / suspendable and
// move them into other task runners. See also comments in // move them into other task runners. See also comments in
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment