Commit 4ba3cada authored by abarth's avatar abarth Committed by Commit bot

Add the ability to observe MessagePumpMojo

We need the ability to observe the Mojo message pump in order to correctly
resolve promises at the end of microtasks triggered by signaling Mojo handles.

Another approach James and I discussed for this CL was to trigger
MessageLoop::TaskObserver, but that requires a PendingTask, which we don't
have. I investigated removing the PendingTask argument from
MessageLoop::TaskObserver, but it's used in a number of tests. Instead, this CL
follows the approach used by MessagePumpLibevent and introduces a specialized
observer interface for the message pump.

R=jamesr@chromium.org

Review URL: https://codereview.chromium.org/663873002

Cr-Commit-Position: refs/heads/master@{#300169}
parent ceb89f23
......@@ -97,6 +97,14 @@ void MessagePumpMojo::RemoveHandler(const Handle& handle) {
handlers_.erase(handle);
}
void MessagePumpMojo::AddObserver(Observer* observer) {
observers_.AddObserver(observer);
}
void MessagePumpMojo::RemoveObserver(Observer* observer) {
observers_.RemoveObserver(observer);
}
void MessagePumpMojo::Run(Delegate* delegate) {
RunState run_state;
// TODO: better deal with error handling.
......@@ -175,8 +183,10 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
} else if (result > 0) {
const size_t index = static_cast<size_t>(result);
DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
WillSignalHandler();
handlers_[wait_state.handles[index]].handler->OnHandleReady(
wait_state.handles[index]);
DidSignalHandler();
} else {
switch (result) {
case MOJO_RESULT_CANCELLED:
......@@ -204,7 +214,9 @@ bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
if (!i->second.deadline.is_null() && i->second.deadline < now &&
handlers_.find(i->first) != handlers_.end() &&
handlers_[i->first].id == i->second.id) {
WillSignalHandler();
i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
DidSignalHandler();
handlers_.erase(i->first);
did_work = true;
}
......@@ -232,7 +244,9 @@ void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
MessagePumpMojoHandler* handler =
handlers_[wait_state.handles[i]].handler;
handlers_.erase(wait_state.handles[i]);
WillSignalHandler();
handler->OnHandleError(wait_state.handles[i], result);
DidSignalHandler();
return;
}
}
......@@ -274,5 +288,13 @@ MojoDeadline MessagePumpMojo::GetDeadlineForWait(
return deadline;
}
void MessagePumpMojo::WillSignalHandler() {
FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
}
void MessagePumpMojo::DidSignalHandler() {
FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
}
} // namespace common
} // namespace mojo
......@@ -10,6 +10,7 @@
#include "base/macros.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_pump.h"
#include "base/observer_list.h"
#include "base/synchronization/lock.h"
#include "base/time/time.h"
#include "mojo/common/mojo_common_export.h"
......@@ -23,6 +24,17 @@ class MessagePumpMojoHandler;
// Mojo implementation of MessagePump.
class MOJO_COMMON_EXPORT MessagePumpMojo : public base::MessagePump {
public:
class Observer {
public:
Observer() {}
virtual void WillSignalHandler() = 0;
virtual void DidSignalHandler() = 0;
protected:
virtual ~Observer() {}
};
MessagePumpMojo();
virtual ~MessagePumpMojo();
......@@ -45,6 +57,9 @@ class MOJO_COMMON_EXPORT MessagePumpMojo : public base::MessagePump {
void RemoveHandler(const Handle& handle);
void AddObserver(Observer*);
void RemoveObserver(Observer*);
// MessagePump:
virtual void Run(Delegate* delegate) override;
virtual void Quit() override;
......@@ -88,6 +103,9 @@ class MOJO_COMMON_EXPORT MessagePumpMojo : public base::MessagePump {
// Returns the deadline for the call to MojoWaitMany().
MojoDeadline GetDeadlineForWait(const RunState& run_state) const;
void WillSignalHandler();
void DidSignalHandler();
// If non-NULL we're running (inside Run()). Member is reference to value on
// stack.
RunState* run_state_;
......@@ -106,6 +124,8 @@ class MOJO_COMMON_EXPORT MessagePumpMojo : public base::MessagePump {
// notify it.
int next_handler_id_;
ObserverList<Observer> observers_;
DISALLOW_COPY_AND_ASSIGN(MessagePumpMojo);
};
......
......@@ -47,6 +47,19 @@ class CountingMojoHandler : public MessagePumpMojoHandler {
DISALLOW_COPY_AND_ASSIGN(CountingMojoHandler);
};
class CountingObserver : public MessagePumpMojo::Observer {
public:
virtual void WillSignalHandler() override {
will_signal_handler_count++;
}
virtual void DidSignalHandler() override {
did_signal_handler_count++;
}
int will_signal_handler_count = 0;
int did_signal_handler_count = 0;
};
TEST(MessagePumpMojo, RunUntilIdle) {
base::MessageLoop message_loop(MessagePumpMojo::Create());
CountingMojoHandler handler;
......@@ -64,6 +77,36 @@ TEST(MessagePumpMojo, RunUntilIdle) {
EXPECT_EQ(2, handler.success_count());
}
TEST(MessagePumpMojo, Observer) {
base::MessageLoop message_loop(MessagePumpMojo::Create());
CountingObserver observer;
MessagePumpMojo::current()->AddObserver(&observer);
CountingMojoHandler handler;
MessagePipe handles;
MessagePumpMojo::current()->AddHandler(&handler,
handles.handle0.get(),
MOJO_HANDLE_SIGNAL_READABLE,
base::TimeTicks());
WriteMessageRaw(
handles.handle1.get(), NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
base::RunLoop run_loop;
run_loop.RunUntilIdle();
EXPECT_EQ(1, handler.success_count());
EXPECT_EQ(1, observer.will_signal_handler_count);
EXPECT_EQ(1, observer.did_signal_handler_count);
MessagePumpMojo::current()->RemoveObserver(&observer);
WriteMessageRaw(
handles.handle1.get(), NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
base::RunLoop run_loop2;
run_loop2.RunUntilIdle();
EXPECT_EQ(2, handler.success_count());
EXPECT_EQ(1, observer.will_signal_handler_count);
EXPECT_EQ(1, observer.did_signal_handler_count);
}
TEST(MessagePumpMojo, UnregisterAfterDeadline) {
base::MessageLoop message_loop(MessagePumpMojo::Create());
CountingMojoHandler handler;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment