Commit 0a002495 authored by Marijn Kruisselbrink's avatar Marijn Kruisselbrink Committed by Commit Bot

Simplify MessagePort by using mojo::Connector.

Bug: 750468
Change-Id: I4b6367418a8f11de8a6702cb137c02343be3b233
Reviewed-on: https://chromium-review.googlesource.com/842856
Commit-Queue: Marijn Kruisselbrink <mek@chromium.org>
Reviewed-by: default avatarDaniel Cheng <dcheng@chromium.org>
Reviewed-by: default avatarJeremy Roman <jbroman@chromium.org>
Cr-Commit-Position: refs/heads/master@{#527050}
parent 4decf32c
......@@ -88,32 +88,18 @@ void MessagePort::postMessage(ScriptState* script_state,
if (exception_state.HadException())
return;
channel_.PostMojoMessage(
mojom::blink::TransferableMessage::WrapAsMessage(std::move(msg)));
mojo::Message mojo_message =
mojom::blink::TransferableMessage::WrapAsMessage(std::move(msg));
connector_->Accept(&mojo_message);
}
MessagePortChannel MessagePort::Disentangle() {
DCHECK(!IsNeutered());
channel_.ClearCallback();
auto result = std::move(channel_);
channel_ = MessagePortChannel();
auto result = MessagePortChannel(connector_->PassMessagePipe());
connector_ = nullptr;
return result;
}
// Invoked to notify us that there are messages available for this port.
// This code may be called from another thread, and so should not call any
// non-threadsafe APIs (i.e. should not call into the entangled channel or
// access mutable variables).
void MessagePort::MessageAvailable() {
// Don't post another task if there's an identical one pending.
if (AtomicTestAndSetToOne(&pending_dispatch_task_))
return;
PostCrossThreadTask(*task_runner_, FROM_HERE,
CrossThreadBind(&MessagePort::DispatchMessages,
WrapCrossThreadPersistent(this)));
}
void MessagePort::start() {
// Do nothing if we've been cloned or closed.
if (!IsEntangled())
......@@ -123,98 +109,39 @@ void MessagePort::start() {
if (started_)
return;
// Note that MessagePortChannel may call this callback on any thread.
channel_.SetCallback(
ConvertToBaseCallback(CrossThreadBind(
&MessagePort::MessageAvailable, WrapCrossThreadWeakPersistent(this))),
task_runner_);
started_ = true;
MessageAvailable();
connector_->ResumeIncomingMethodCallProcessing();
}
void MessagePort::close() {
// A closed port should not be neutered, so rather than merely disconnecting
// from the mojo message pipe, also entangle with a new dangling message pipe.
channel_.ClearCallback();
if (IsEntangled())
channel_ = MessagePortChannel(mojo::MessagePipe().handle0);
if (!IsNeutered()) {
connector_ = nullptr;
Entangle(mojo::MessagePipe().handle0);
}
closed_ = true;
}
void MessagePort::Entangle(mojo::ScopedMessagePipeHandle handle) {
Entangle(MessagePortChannel(std::move(handle)));
}
void MessagePort::Entangle(MessagePortChannel channel) {
// Only invoked to set our initial entanglement.
DCHECK(!channel_.GetHandle().is_valid());
DCHECK(channel.GetHandle().is_valid());
DCHECK(handle.is_valid());
DCHECK(!connector_);
DCHECK(GetExecutionContext());
connector_ = std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, task_runner_);
connector_->PauseIncomingMethodCallProcessing();
connector_->set_incoming_receiver(this);
}
channel_ = std::move(channel);
void MessagePort::Entangle(MessagePortChannel channel) {
Entangle(channel.ReleaseHandle());
}
const AtomicString& MessagePort::InterfaceName() const {
return EventTargetNames::MessagePort;
}
bool MessagePort::TryGetMessage(BlinkTransferableMessage& message) {
if (IsNeutered())
return false;
mojo::Message mojo_message;
if (!channel_.GetMojoMessage(&mojo_message))
return false;
if (!mojom::blink::TransferableMessage::DeserializeFromMessage(
std::move(mojo_message), &message)) {
return false;
}
return true;
}
void MessagePort::DispatchMessages() {
// Signal to |MessageAvailable()| that there are no ongoing
// dispatches of messages. This can cause redundantly posted
// tasks, but safely avoids messages languishing.
ReleaseStore(&pending_dispatch_task_, 0);
// Messages for contexts that are not fully active get dispatched too, but
// JSAbstractEventListener::handleEvent() doesn't call handlers for these.
// The HTML5 spec specifies that any messages sent to a document that is not
// fully active should be dropped, so this behavior is OK.
if (!Started())
return;
// There's an upper bound on the loop iterations in one DispatchMessages call,
// otherwise page JS could make it loop forever or starve other work.
for (int i = 0; i < kMaximumMessagesPerTask; ++i) {
// Because close() doesn't cancel any in flight calls to dispatchMessages(),
// and can be triggered by the onmessage event handler, we need to check if
// the port is still open before each dispatch.
if (closed_)
break;
// WorkerGlobalScope::close() in Worker onmessage handler should prevent
// the next message from dispatching.
if (GetExecutionContext()->IsWorkerGlobalScope() &&
ToWorkerGlobalScope(GetExecutionContext())->IsClosing()) {
break;
}
BlinkTransferableMessage message;
if (!TryGetMessage(message))
break;
MessagePortArray* ports = MessagePort::EntanglePorts(
*GetExecutionContext(), std::move(message.ports));
Event* evt = MessageEvent::Create(ports, std::move(message.message));
DispatchEvent(evt);
}
}
bool MessagePort::HasPendingActivity() const {
// The spec says that entangled message ports should always be treated as if
// they have a strong reference.
......@@ -285,7 +212,7 @@ MessagePortArray* MessagePort::EntanglePorts(
}
MojoHandle MessagePort::EntangledHandleForTesting() const {
return channel_.GetHandle().get().value();
return connector_->handle().value();
}
void MessagePort::Trace(blink::Visitor* visitor) {
......@@ -293,4 +220,47 @@ void MessagePort::Trace(blink::Visitor* visitor) {
EventTargetWithInlineData::Trace(visitor);
}
bool MessagePort::Accept(mojo::Message* mojo_message) {
// Connector repeatedly calls Accept as long as any messages are available. To
// avoid completely starving the event loop and give some time for other tasks
// the connector is temporarily paused after |kMaximumMessagesPerTask| have
// been received without other tasks having had a chance to run (in particular
// the ResetMessageCount task posted here).
if (messages_in_current_task_ == 0) {
task_runner_->PostTask(FROM_HERE, WTF::Bind(&MessagePort::ResetMessageCount,
WrapWeakPersistent(this)));
}
++messages_in_current_task_;
if (messages_in_current_task_ > kMaximumMessagesPerTask) {
connector_->PauseIncomingMethodCallProcessing();
}
BlinkTransferableMessage message;
if (!mojom::blink::TransferableMessage::DeserializeFromMessage(
std::move(*mojo_message), &message)) {
return false;
}
// WorkerGlobalScope::close() in Worker onmessage handler should prevent
// the next message from dispatching.
if (GetExecutionContext()->IsWorkerGlobalScope() &&
ToWorkerGlobalScope(GetExecutionContext())->IsClosing()) {
return true;
}
MessagePortArray* ports = MessagePort::EntanglePorts(
*GetExecutionContext(), std::move(message.ports));
Event* evt = MessageEvent::Create(ports, std::move(message.message));
DispatchEvent(evt);
return true;
}
void MessagePort::ResetMessageCount() {
DCHECK_GT(messages_in_current_task_, 0);
messages_in_current_task_ = 0;
// No-op if not paused already.
if (connector_)
connector_->ResumeIncomingMethodCallProcessing();
}
} // namespace blink
......@@ -42,13 +42,13 @@
namespace blink {
struct BlinkTransferableMessage;
class ExceptionState;
class ExecutionContext;
class ScriptState;
class SerializedScriptValue;
class CORE_EXPORT MessagePort : public EventTargetWithInlineData,
public mojo::MessageReceiver,
public ActiveScriptWrappable<MessagePort>,
public ContextLifecycleObserver {
DEFINE_WRAPPERTYPEINFO();
......@@ -119,7 +119,7 @@ class CORE_EXPORT MessagePort : public EventTargetWithInlineData,
// A port gets neutered when it is transferred to a new owner via
// postMessage().
bool IsNeutered() const { return !channel_.GetHandle().is_valid(); }
bool IsNeutered() const { return !connector_ || !connector_->is_valid(); }
// For testing only: allows inspection of the entangled channel.
MojoHandle EntangledHandleForTesting() const;
......@@ -128,15 +128,15 @@ class CORE_EXPORT MessagePort : public EventTargetWithInlineData,
protected:
explicit MessagePort(ExecutionContext&);
bool TryGetMessage(BlinkTransferableMessage&);
private:
void MessageAvailable();
void DispatchMessages();
// mojo::MessageReceiver implementation.
bool Accept(mojo::Message*) override;
void ResetMessageCount();
MessagePortChannel channel_;
std::unique_ptr<mojo::Connector> connector_;
int messages_in_current_task_ = 0;
int pending_dispatch_task_ = 0;
bool started_ = false;
bool closed_ = false;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment