Commit e8a2301b authored by Sigurdur Asgeirsson's avatar Sigurdur Asgeirsson Committed by Commit Bot

Mojo: Bound the number of dispatch tasks posted by Connector.

Change-Id: I2d01fc0a7ffdb3b5b73f02cf07461c4832745904
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1865598Reviewed-by: default avatarKen Rockot <rockot@google.com>
Commit-Queue: Sigurður Ásgeirsson <siggi@chromium.org>
Cr-Commit-Position: refs/heads/master@{#707078}
parent a1c51e8c
......@@ -227,6 +227,13 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// validation).
bool DispatchMessage(Message message);
// Posts a task to dispatch the next message in |dispatch_queue_|. These two
// functions keep |num_pending_dispatch_tasks_| up to date, so as to allow
// bounding the number of posted tasks when the Connector is e.g. paused and
// resumed repeatedly.
void PostDispatchNextMessageInQueue();
void CallDispatchNextMessageInQueue();
// Used to schedule dispatch of a single message from the front of
// |dispatch_queue_|. Returns |true| if the dispatch succeeded and |false|
// otherwise (e.g. if the message failed validation).
......@@ -328,6 +335,9 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// nested dispatch operations.
bool is_dispatching_ = false;
// The number of outstanding tasks for CallDispatchNextMessageInQueue.
size_t num_pending_dispatch_tasks_ = 0;
#if defined(ENABLE_IPC_FUZZER)
std::unique_ptr<MessageReceiver> message_dumper_;
#endif
......
......@@ -359,12 +359,8 @@ void Connector::ResumeIncomingMethodCallProcessing() {
if (!weak_self)
return;
} else {
for (size_t i = 0; i < dispatch_queue_.size(); ++i) {
task_runner_->PostTask(
FROM_HERE, base::BindOnce(base::IgnoreResult(
&Connector::DispatchNextMessageInQueue),
weak_self_));
}
while (num_pending_dispatch_tasks_ < dispatch_queue_.size())
PostDispatchNextMessageInQueue();
}
paused_ = false;
......@@ -619,6 +615,19 @@ bool Connector::DispatchMessage(Message message) {
return true;
}
void Connector::PostDispatchNextMessageInQueue() {
DCHECK_LT(num_pending_dispatch_tasks_, dispatch_queue_.size());
++num_pending_dispatch_tasks_;
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&Connector::CallDispatchNextMessageInQueue, weak_self_));
}
void Connector::CallDispatchNextMessageInQueue() {
--num_pending_dispatch_tasks_;
DispatchNextMessageInQueue();
}
bool Connector::DispatchNextMessageInQueue() {
if (error_ || paused_)
return false;
......@@ -681,10 +690,8 @@ void Connector::ReadAllAvailableMessages() {
return;
} else {
dispatch_queue_.push(std::move(message));
task_runner_->PostTask(
FROM_HERE, base::BindOnce(base::IgnoreResult(
&Connector::DispatchNextMessageInQueue),
weak_self_));
if (num_pending_dispatch_tasks_ < dispatch_queue_.size())
PostDispatchNextMessageInQueue();
}
first_message_in_batch = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment