Commit 9971f557 authored by Alexander Timin's avatar Alexander Timin Committed by Commit Bot

[blink] Add a time limit to message processing in MessagePort

In addition to existing limit of 200 messages in a single task also
limit it to 10 milliseconds to avoid starvation.

This logic will be removed when message-per-task mojo dispatching
will be implemented.

BUG=867133
R=mek@chromium.org

Change-Id: I397d60d06e62507e8a322ee9e68be36093fefa57
Reviewed-on: https://chromium-review.googlesource.com/1152815Reviewed-by: default avatarMarijn Kruisselbrink <mek@chromium.org>
Commit-Queue: Alexander Timin <altimin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#579751}
parent d0ad806d
......@@ -47,8 +47,11 @@
namespace blink {
// TODO(altimin): Remove these after per-task mojo dispatching.
// The maximum number of MessageEvents to dispatch from one task.
static const int kMaximumMessagesPerTask = 200;
constexpr int kMaximumMessagesPerTask = 200;
// The threshold to stop processing new tasks.
constexpr base::TimeDelta kYieldThreshold = base::TimeDelta::FromMilliseconds(10);
MessagePort* MessagePort::Create(ExecutionContext& execution_context) {
return new MessagePort(execution_context);
......@@ -239,12 +242,13 @@ bool MessagePort::Accept(mojo::Message* mojo_message) {
// the connector is temporarily paused after |kMaximumMessagesPerTask| have
// been received without other tasks having had a chance to run (in particular
// the ResetMessageCount task posted here).
// TODO(altimin): Remove this after per-task mojo dispatching lands[1].
// [1] https://chromium-review.googlesource.com/c/chromium/src/+/1145692
if (messages_in_current_task_ == 0) {
task_runner_->PostTask(FROM_HERE, WTF::Bind(&MessagePort::ResetMessageCount,
WrapWeakPersistent(this)));
}
++messages_in_current_task_;
if (messages_in_current_task_ > kMaximumMessagesPerTask) {
if (ShouldYieldAfterNewMessage()) {
connector_->PauseIncomingMethodCallProcessing();
}
......@@ -278,9 +282,20 @@ bool MessagePort::Accept(mojo::Message* mojo_message) {
void MessagePort::ResetMessageCount() {
DCHECK_GT(messages_in_current_task_, 0);
messages_in_current_task_ = 0;
task_start_time_ = base::nullopt;
// No-op if not paused already.
if (connector_)
connector_->ResumeIncomingMethodCallProcessing();
}
bool MessagePort::ShouldYieldAfterNewMessage() {
++messages_in_current_task_;
if (messages_in_current_task_ > kMaximumMessagesPerTask)
return true;
base::TimeTicks now = base::TimeTicks::Now();
if (!task_start_time_)
task_start_time_ = now;
return now - task_start_time_.value() > kYieldThreshold;
}
} // namespace blink
......@@ -135,6 +135,7 @@ class CORE_EXPORT MessagePort : public EventTargetWithInlineData,
// mojo::MessageReceiver implementation.
bool Accept(mojo::Message*) override;
void ResetMessageCount();
bool ShouldYieldAfterNewMessage();
std::unique_ptr<mojo::Connector> connector_;
int messages_in_current_task_ = 0;
......@@ -142,6 +143,8 @@ class CORE_EXPORT MessagePort : public EventTargetWithInlineData,
bool started_ = false;
bool closed_ = false;
base::Optional<base::TimeTicks> task_start_time_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment