Commit ddb15328 authored by mek's avatar mek Committed by Commit bot

Properly queue messages sent to message ports that are transferred to a service worker.

There can be some delay between the message port being transferred to the
browser process and the renderer for the service worker being available, so
this change makes sure that during this period messages sent to these message
ports are queued, rather than get lost/cause assertions to fail.

Additionally, ports that are send to these ports while they are queued will be similarly delayed, and thus also need to be put in this special state.

Finally when for some reason launching the service worker fails all these ports (that are not associated with any renderer) need to be cleaned up, which might have to happen asynchronously, since the source renderer process might not have finished sending the message queue for this port yet.

Some layout tests for this are in https://codereview.chromium.org/729923004/

BUG=432678

Review URL: https://codereview.chromium.org/737833002

Cr-Commit-Position: refs/heads/master@{#305120}
parent 7f4660dc
......@@ -42,6 +42,9 @@ bool MessagePortMessageFilter::OnMessageReceived(const IPC::Message& message) {
IPC_MESSAGE_FORWARD(MessagePortHostMsg_SendQueuedMessages,
MessagePortService::GetInstance(),
MessagePortService::SendQueuedMessages)
IPC_MESSAGE_FORWARD(MessagePortHostMsg_ReleaseMessages,
MessagePortService::GetInstance(),
MessagePortService::ReleaseMessages)
IPC_MESSAGE_UNHANDLED(handled = false)
IPC_END_MESSAGE_MAP()
......
......@@ -27,7 +27,24 @@ struct MessagePortService::MessagePort {
// system.
// This flag should only be set to true in response to
// MessagePortHostMsg_QueueMessages.
bool queue_messages;
bool queue_for_inflight_messages;
// If true, all messages to this message port are queued and not delivered.
// This is needed so that when a message port is sent to a new process all
// messages are held in the browser process until the destination process is
// ready to receive messages. This flag is set true when a message port is
// transferred to a different process but there isn't immediately a
// MessagePortMessageFilter available for that new process. Once the
// destination process is ready to receive messages it sends
// MessagePortHostMsg_ReleaseMessages to set this flag to false.
bool hold_messages_for_destination;
// Returns true if messages should be queued for either reason.
bool queue_messages() const {
return queue_for_inflight_messages || hold_messages_for_destination;
}
// If true, the message port should be destroyed but was currently still
// waiting for a SendQueuedMessages message from a renderer. As soon as that
// message is received the port will actually be destroyed.
bool should_be_destroyed;
QueuedMessages queued_messages;
};
......@@ -79,7 +96,9 @@ void MessagePortService::Create(int route_id,
port.route_id = route_id;
port.message_port_id = *message_port_id;
port.entangled_message_port_id = MSG_ROUTING_NONE;
port.queue_messages = false;
port.queue_for_inflight_messages = false;
port.hold_messages_for_destination = false;
port.should_be_destroyed = false;
message_ports_[*message_port_id] = port;
}
......@@ -90,6 +109,7 @@ void MessagePortService::Destroy(int message_port_id) {
}
DCHECK(message_ports_[message_port_id].queued_messages.empty());
Erase(message_port_id);
}
......@@ -150,7 +170,14 @@ void MessagePortService::PostMessageTo(
for (size_t i = 0; i < sent_message_port_ids.size(); ++i)
sent_ports[i] = &message_ports_[sent_message_port_ids[i]];
if (entangled_port.queue_messages) {
if (entangled_port.queue_messages()) {
// If the target port is currently holding messages because the destination
// renderer isn't available yet, all message ports being sent should also be
// put in this state.
if (entangled_port.hold_messages_for_destination) {
for (int sent_message_port_id : sent_message_port_ids)
HoldMessages(sent_message_port_id);
}
entangled_port.queued_messages.push_back(
std::make_pair(message, sent_message_port_ids));
return;
......@@ -188,7 +215,7 @@ void MessagePortService::QueueMessages(int message_port_id) {
MessagePort& port = message_ports_[message_port_id];
if (port.filter) {
port.filter->Send(new MessagePortMsg_MessagesQueued(port.route_id));
port.queue_messages = true;
port.queue_for_inflight_messages = true;
port.filter = NULL;
}
}
......@@ -204,11 +231,24 @@ void MessagePortService::SendQueuedMessages(
// Send the queued messages to the port again. This time they'll reach the
// new location.
MessagePort& port = message_ports_[message_port_id];
port.queue_messages = false;
port.queue_for_inflight_messages = false;
// If the port is currently holding messages waiting for the target renderer,
// all ports in messages being sent to the port should also be put on hold.
if (port.hold_messages_for_destination) {
for (const auto& message : queued_messages)
for (int sent_message_port_id : message.second)
HoldMessages(sent_message_port_id);
}
port.queued_messages.insert(port.queued_messages.begin(),
queued_messages.begin(),
queued_messages.end());
SendQueuedMessagesIfPossible(message_port_id);
if (port.should_be_destroyed)
ClosePort(message_port_id);
else
SendQueuedMessagesIfPossible(message_port_id);
}
void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
......@@ -218,7 +258,7 @@ void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
}
MessagePort& port = message_ports_[message_port_id];
if (port.queue_messages || !port.filter)
if (port.queue_messages() || !port.filter)
return;
for (QueuedMessages::iterator iter = port.queued_messages.begin();
......@@ -228,6 +268,49 @@ void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
port.queued_messages.clear();
}
void MessagePortService::HoldMessages(int message_port_id) {
if (!message_ports_.count(message_port_id)) {
NOTREACHED();
return;
}
// Any ports in messages currently in the queue should also be put on hold.
for (const auto& message : message_ports_[message_port_id].queued_messages)
for (int sent_message_port_id : message.second)
HoldMessages(sent_message_port_id);
message_ports_[message_port_id].hold_messages_for_destination = true;
}
void MessagePortService::ClosePort(int message_port_id) {
if (!message_ports_.count(message_port_id)) {
NOTREACHED();
return;
}
if (message_ports_[message_port_id].queue_for_inflight_messages) {
message_ports_[message_port_id].should_be_destroyed = true;
return;
}
// First close any message ports in the queue for this message port.
for (const auto& message : message_ports_[message_port_id].queued_messages)
for (int sent_message_port_id : message.second)
ClosePort(sent_message_port_id);
Erase(message_port_id);
}
void MessagePortService::ReleaseMessages(int message_port_id) {
if (!message_ports_.count(message_port_id)) {
NOTREACHED();
return;
}
message_ports_[message_port_id].hold_messages_for_destination = false;
SendQueuedMessagesIfPossible(message_port_id);
}
void MessagePortService::Erase(int message_port_id) {
MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
DCHECK(erase_item != message_ports_.end());
......
......@@ -37,6 +37,7 @@ class MessagePortService {
void QueueMessages(int message_port_id);
void SendQueuedMessages(int message_port_id,
const QueuedMessages& queued_messages);
void ReleaseMessages(int message_port_id);
// Updates the information needed to reach a message port when it's sent to a
// (possibly different) process.
......@@ -45,6 +46,20 @@ class MessagePortService {
MessagePortMessageFilter* filter,
int routing_id);
// The message port is being transferred to a new renderer process, but the
// code doing that isn't able to immediately update the message port with a
// new filter and routing_id. This queues up all messages sent to this port
// until later ReleaseMessages is called for this port (this will happen
// automatically as soon as a WebMessagePortChannelImpl instance is created
// for this port in the renderer. The browser side code is still responsible
// for updating the port with a new filter before that happens. If ultimately
// transfering the port to a new process fails, ClosePort should be called to
// clean up the port.
void HoldMessages(int message_port_id);
// Closes and cleans up the message port.
void ClosePort(int message_port_id);
void OnMessagePortMessageFilterClosing(MessagePortMessageFilter* filter);
// Attempts to send the queued messages for a message port.
......
......@@ -8,6 +8,7 @@
#include "base/stl_util.h"
#include "base/strings/string16.h"
#include "content/browser/message_port_message_filter.h"
#include "content/browser/message_port_service.h"
#include "content/browser/service_worker/embedded_worker_instance.h"
#include "content/browser/service_worker/embedded_worker_registry.h"
#include "content/browser/service_worker/service_worker_context_core.h"
......@@ -88,6 +89,17 @@ void RunErrorFetchCallback(const ServiceWorkerVersion::FetchCallback& callback,
ServiceWorkerResponse());
}
void RunErrorMessageCallback(
const std::vector<int>& sent_message_port_ids,
const ServiceWorkerVersion::StatusCallback& callback,
ServiceWorkerStatusCode status) {
// Transfering the message ports failed, so destroy the ports.
for (int message_port_id : sent_message_port_ids) {
MessagePortService::GetInstance()->ClosePort(message_port_id);
}
callback.Run(status);
}
} // namespace
ServiceWorkerVersion::ServiceWorkerVersion(
......@@ -252,12 +264,25 @@ void ServiceWorkerVersion::DispatchMessageEvent(
const base::string16& message,
const std::vector<int>& sent_message_port_ids,
const StatusCallback& callback) {
for (int message_port_id : sent_message_port_ids) {
MessagePortService::GetInstance()->HoldMessages(message_port_id);
}
DispatchMessageEventInternal(message, sent_message_port_ids, callback);
}
void ServiceWorkerVersion::DispatchMessageEventInternal(
const base::string16& message,
const std::vector<int>& sent_message_port_ids,
const StatusCallback& callback) {
if (running_status() != RUNNING) {
// Schedule calling this method after starting the worker.
StartWorker(base::Bind(
&RunTaskAfterStartWorker, weak_factory_.GetWeakPtr(), callback,
base::Bind(&self::DispatchMessageEvent, weak_factory_.GetWeakPtr(),
message, sent_message_port_ids, callback)));
&RunTaskAfterStartWorker, weak_factory_.GetWeakPtr(),
base::Bind(&RunErrorMessageCallback, sent_message_port_ids, callback),
base::Bind(&self::DispatchMessageEventInternal,
weak_factory_.GetWeakPtr(), message, sent_message_port_ids,
callback)));
return;
}
......
......@@ -272,6 +272,11 @@ class CONTENT_EXPORT ServiceWorkerVersion
const StatusCallback& callback);
void DispatchActivateEventAfterStartWorker(const StatusCallback& callback);
void DispatchMessageEventInternal(
const base::string16& message,
const std::vector<int>& sent_message_port_ids,
const StatusCallback& callback);
// Message handlers.
void OnGetClientDocuments(int request_id);
void OnActivateEventFinished(int request_id,
......
......@@ -165,6 +165,8 @@ void WebMessagePortChannelImpl::Init() {
DCHECK(message_port_id_ == MSG_ROUTING_NONE);
Send(new MessagePortHostMsg_CreateMessagePort(
&route_id_, &message_port_id_));
} else if (message_port_id_ != MSG_ROUTING_NONE) {
Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_));
}
ChildThread::current()->GetRouter()->AddRoute(route_id_, this);
......
......@@ -83,3 +83,9 @@ IPC_MESSAGE_CONTROL1(MessagePortHostMsg_QueueMessages,
IPC_MESSAGE_CONTROL2(MessagePortHostMsg_SendQueuedMessages,
int /* message_port_id */,
std::vector<QueuedMessage> /* queued_messages */)
// Tells the browser this message port is ready to receive messages. If the
// browser was holding messages to this port because no destination for the
// port was available yet this will cause the browser to release those messages.
IPC_MESSAGE_CONTROL1(MessagePortHostMsg_ReleaseMessages,
int /* message_port_id */)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment