Commit 89b9278e authored by ikilpatrick's avatar ikilpatrick Committed by Commit bot

[worklets] Split up InProcessWorkerMessagingProxy into a base+worker class.

This pulls out all of the lifetime logic for the WorkerThread into the base class ThreadedMessagingProxyBase.

This will allow us to re-use all of the lifetime logic (I've created a followup demo patch which does this https://codereview.chromium.org/2312493002/)

BUG=567358

Review-Url: https://codereview.chromium.org/2310673002
Cr-Commit-Position: refs/heads/master@{#417161}
parent 8dbd4fb5
...@@ -33,6 +33,8 @@ blink_core_sources("workers") { ...@@ -33,6 +33,8 @@ blink_core_sources("workers") {
"SharedWorkerRepositoryClient.h", "SharedWorkerRepositoryClient.h",
"SharedWorkerThread.cpp", "SharedWorkerThread.cpp",
"SharedWorkerThread.h", "SharedWorkerThread.h",
"ThreadedMessagingProxyBase.cpp",
"ThreadedMessagingProxyBase.h",
"ThreadedWorkletGlobalScope.cpp", "ThreadedWorkletGlobalScope.cpp",
"ThreadedWorkletGlobalScope.h", "ThreadedWorkletGlobalScope.h",
"ThreadedWorkletGlobalScopeProxy.h", "ThreadedWorkletGlobalScopeProxy.h",
......
...@@ -30,7 +30,7 @@ InProcessWorkerBase::~InProcessWorkerBase() ...@@ -30,7 +30,7 @@ InProcessWorkerBase::~InProcessWorkerBase()
DCHECK(isMainThread()); DCHECK(isMainThread());
if (!m_contextProxy) if (!m_contextProxy)
return; return;
m_contextProxy->workerObjectDestroyed(); m_contextProxy->parentObjectDestroyed();
} }
void InProcessWorkerBase::postMessage(ExecutionContext* context, PassRefPtr<SerializedScriptValue> message, const MessagePortArray& ports, ExceptionState& exceptionState) void InProcessWorkerBase::postMessage(ExecutionContext* context, PassRefPtr<SerializedScriptValue> message, const MessagePortArray& ports, ExceptionState& exceptionState)
...@@ -69,7 +69,7 @@ bool InProcessWorkerBase::initialize(ExecutionContext* context, const String& ur ...@@ -69,7 +69,7 @@ bool InProcessWorkerBase::initialize(ExecutionContext* context, const String& ur
void InProcessWorkerBase::terminate() void InProcessWorkerBase::terminate()
{ {
if (m_contextProxy) if (m_contextProxy)
m_contextProxy->terminateWorkerGlobalScope(); m_contextProxy->terminateGlobalScope();
} }
void InProcessWorkerBase::stop() void InProcessWorkerBase::stop()
......
...@@ -27,21 +27,17 @@ ...@@ -27,21 +27,17 @@
#include "core/workers/InProcessWorkerMessagingProxy.h" #include "core/workers/InProcessWorkerMessagingProxy.h"
#include "core/dom/Document.h"
#include "core/dom/ExecutionContextTask.h" #include "core/dom/ExecutionContextTask.h"
#include "core/dom/SecurityContext.h" #include "core/dom/SecurityContext.h"
#include "core/events/ErrorEvent.h" #include "core/events/ErrorEvent.h"
#include "core/events/MessageEvent.h" #include "core/events/MessageEvent.h"
#include "core/frame/FrameConsole.h"
#include "core/frame/LocalFrame.h" #include "core/frame/LocalFrame.h"
#include "core/frame/csp/ContentSecurityPolicy.h" #include "core/frame/csp/ContentSecurityPolicy.h"
#include "core/inspector/ConsoleMessage.h"
#include "core/loader/DocumentLoadTiming.h" #include "core/loader/DocumentLoadTiming.h"
#include "core/loader/DocumentLoader.h" #include "core/loader/DocumentLoader.h"
#include "core/origin_trials/OriginTrialContext.h" #include "core/origin_trials/OriginTrialContext.h"
#include "core/workers/InProcessWorkerBase.h" #include "core/workers/InProcessWorkerBase.h"
#include "core/workers/InProcessWorkerObjectProxy.h" #include "core/workers/InProcessWorkerObjectProxy.h"
#include "core/workers/ParentFrameTaskRunners.h"
#include "core/workers/WorkerClients.h" #include "core/workers/WorkerClients.h"
#include "core/workers/WorkerGlobalScope.h" #include "core/workers/WorkerGlobalScope.h"
#include "core/workers/WorkerInspectorProxy.h" #include "core/workers/WorkerInspectorProxy.h"
...@@ -69,8 +65,6 @@ void processMessageOnWorkerGlobalScope(PassRefPtr<SerializedScriptValue> message ...@@ -69,8 +65,6 @@ void processMessageOnWorkerGlobalScope(PassRefPtr<SerializedScriptValue> message
workerObjectProxy->startPendingActivityTimer(); workerObjectProxy->startPendingActivityTimer();
} }
static int s_liveMessagingProxyCount = 0;
} // namespace } // namespace
InProcessWorkerMessagingProxy::InProcessWorkerMessagingProxy(InProcessWorkerBase* workerObject, WorkerClients* workerClients) InProcessWorkerMessagingProxy::InProcessWorkerMessagingProxy(InProcessWorkerBase* workerObject, WorkerClients* workerClients)
...@@ -79,25 +73,25 @@ InProcessWorkerMessagingProxy::InProcessWorkerMessagingProxy(InProcessWorkerBase ...@@ -79,25 +73,25 @@ InProcessWorkerMessagingProxy::InProcessWorkerMessagingProxy(InProcessWorkerBase
DCHECK(m_workerObject); DCHECK(m_workerObject);
} }
InProcessWorkerMessagingProxy::~InProcessWorkerMessagingProxy() InProcessWorkerMessagingProxy::InProcessWorkerMessagingProxy(ExecutionContext* executionContext, InProcessWorkerBase* workerObject, WorkerClients* workerClients)
: ThreadedMessagingProxyBase(executionContext)
, m_workerObjectProxy(InProcessWorkerObjectProxy::create(this))
, m_workerObject(workerObject)
, m_workerClients(workerClients)
, m_unconfirmedMessageCount(0)
, m_workerGlobalScopeMayHavePendingActivity(false)
{ {
DCHECK(isParentContextThread());
DCHECK(!m_workerObject);
if (m_loaderProxy)
m_loaderProxy->detachProvider(this);
s_liveMessagingProxyCount--;
} }
int InProcessWorkerMessagingProxy::proxyCount() InProcessWorkerMessagingProxy::~InProcessWorkerMessagingProxy()
{ {
DCHECK(isMainThread()); DCHECK(!m_workerObject);
return s_liveMessagingProxyCount;
} }
void InProcessWorkerMessagingProxy::startWorkerGlobalScope(const KURL& scriptURL, const String& userAgent, const String& sourceCode) void InProcessWorkerMessagingProxy::startWorkerGlobalScope(const KURL& scriptURL, const String& userAgent, const String& sourceCode)
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
if (m_askedToTerminate) { if (askedToTerminate()) {
// Worker.terminate() could be called from JS before the thread was // Worker.terminate() could be called from JS before the thread was
// created. // created.
return; return;
...@@ -109,22 +103,18 @@ void InProcessWorkerMessagingProxy::startWorkerGlobalScope(const KURL& scriptURL ...@@ -109,22 +103,18 @@ void InProcessWorkerMessagingProxy::startWorkerGlobalScope(const KURL& scriptURL
ContentSecurityPolicy* csp = m_workerObject->contentSecurityPolicy() ? m_workerObject->contentSecurityPolicy() : document->contentSecurityPolicy(); ContentSecurityPolicy* csp = m_workerObject->contentSecurityPolicy() ? m_workerObject->contentSecurityPolicy() : document->contentSecurityPolicy();
DCHECK(csp); DCHECK(csp);
WorkerThreadStartMode startMode = m_workerInspectorProxy->workerStartMode(document); WorkerThreadStartMode startMode = workerInspectorProxy()->workerStartMode(document);
std::unique_ptr<WorkerSettings> workerSettings = wrapUnique(new WorkerSettings(document->settings())); std::unique_ptr<WorkerSettings> workerSettings = wrapUnique(new WorkerSettings(document->settings()));
std::unique_ptr<WorkerThreadStartupData> startupData = WorkerThreadStartupData::create(scriptURL, userAgent, sourceCode, nullptr, startMode, csp->headers().get(), m_workerObject->referrerPolicy(), starterOrigin, m_workerClients.release(), document->addressSpace(), OriginTrialContext::getTokens(document).get(), std::move(workerSettings)); std::unique_ptr<WorkerThreadStartupData> startupData = WorkerThreadStartupData::create(scriptURL, userAgent, sourceCode, nullptr, startMode, csp->headers().get(), m_workerObject->referrerPolicy(), starterOrigin, m_workerClients.release(), document->addressSpace(), OriginTrialContext::getTokens(document).get(), std::move(workerSettings));
double originTime = document->loader() ? document->loader()->timing().referenceMonotonicTime() : monotonicallyIncreasingTime();
m_loaderProxy = WorkerLoaderProxy::create(this); initializeWorkerThread(std::move(startupData));
m_workerThread = createWorkerThread(originTime); workerInspectorProxy()->workerThreadCreated(document, workerThread(), scriptURL);
m_workerThread->start(std::move(startupData));
workerThreadCreated();
m_workerInspectorProxy->workerThreadCreated(document, m_workerThread.get(), scriptURL);
} }
void InProcessWorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<SerializedScriptValue> message, std::unique_ptr<MessagePortChannelArray> channels) void InProcessWorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<SerializedScriptValue> message, std::unique_ptr<MessagePortChannelArray> channels)
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
if (!m_workerObject || m_askedToTerminate) if (!m_workerObject || askedToTerminate())
return; return;
MessagePortArray* ports = MessagePort::entanglePorts(*getExecutionContext(), std::move(channels)); MessagePortArray* ports = MessagePort::entanglePorts(*getExecutionContext(), std::move(channels));
...@@ -134,37 +124,20 @@ void InProcessWorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<Seriali ...@@ -134,37 +124,20 @@ void InProcessWorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<Seriali
void InProcessWorkerMessagingProxy::postMessageToWorkerGlobalScope(PassRefPtr<SerializedScriptValue> message, std::unique_ptr<MessagePortChannelArray> channels) void InProcessWorkerMessagingProxy::postMessageToWorkerGlobalScope(PassRefPtr<SerializedScriptValue> message, std::unique_ptr<MessagePortChannelArray> channels)
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
if (m_askedToTerminate) if (askedToTerminate())
return; return;
std::unique_ptr<ExecutionContextTask> task = createCrossThreadTask(&processMessageOnWorkerGlobalScope, message, passed(std::move(channels)), crossThreadUnretained(&workerObjectProxy())); std::unique_ptr<ExecutionContextTask> task = createCrossThreadTask(&processMessageOnWorkerGlobalScope, message, passed(std::move(channels)), crossThreadUnretained(&workerObjectProxy()));
if (m_workerThread) { if (workerThread()) {
// A message event is an activity and may initiate another activity. // A message event is an activity and may initiate another activity.
m_workerGlobalScopeMayHavePendingActivity = true; m_workerGlobalScopeMayHavePendingActivity = true;
++m_unconfirmedMessageCount; ++m_unconfirmedMessageCount;
m_workerThread->postTask(BLINK_FROM_HERE, std::move(task)); workerThread()->postTask(BLINK_FROM_HERE, std::move(task));
} else { } else {
m_queuedEarlyTasks.append(std::move(task)); m_queuedEarlyTasks.append(std::move(task));
} }
} }
void InProcessWorkerMessagingProxy::postTaskToWorkerGlobalScope(const WebTraceLocation& location, std::unique_ptr<ExecutionContextTask> task)
{
if (m_askedToTerminate)
return;
DCHECK(m_workerThread);
m_workerThread->postTask(location, std::move(task));
}
void InProcessWorkerMessagingProxy::postTaskToLoader(const WebTraceLocation& location, std::unique_ptr<ExecutionContextTask> task)
{
DCHECK(getExecutionContext()->isDocument());
// TODO(hiroshige,yuryu): Make this not use ExecutionContextTask and use
// m_parentFrameTaskRunners->get(TaskType::Networking) instead.
getExecutionContext()->postTask(location, std::move(task));
}
void InProcessWorkerMessagingProxy::dispatchErrorEvent(const String& errorMessage, std::unique_ptr<SourceLocation> location, int exceptionId) void InProcessWorkerMessagingProxy::dispatchErrorEvent(const String& errorMessage, std::unique_ptr<SourceLocation> location, int exceptionId)
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
...@@ -182,20 +155,11 @@ void InProcessWorkerMessagingProxy::dispatchErrorEvent(const String& errorMessag ...@@ -182,20 +155,11 @@ void InProcessWorkerMessagingProxy::dispatchErrorEvent(const String& errorMessag
postTaskToWorkerGlobalScope(BLINK_FROM_HERE, createCrossThreadTask(&processUnhandledExceptionOnWorkerGlobalScope, exceptionId)); postTaskToWorkerGlobalScope(BLINK_FROM_HERE, createCrossThreadTask(&processUnhandledExceptionOnWorkerGlobalScope, exceptionId));
} }
void InProcessWorkerMessagingProxy::reportConsoleMessage(MessageSource source, MessageLevel level, const String& message, std::unique_ptr<SourceLocation> location)
{
DCHECK(isParentContextThread());
if (m_askedToTerminate)
return;
if (m_workerInspectorProxy)
m_workerInspectorProxy->addConsoleMessageFromWorker(level, message, std::move(location));
}
void InProcessWorkerMessagingProxy::workerThreadCreated() void InProcessWorkerMessagingProxy::workerThreadCreated()
{ {
ThreadedMessagingProxyBase::workerThreadCreated();
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
DCHECK(!m_askedToTerminate);
DCHECK(m_workerThread);
DCHECK(!m_unconfirmedMessageCount); DCHECK(!m_unconfirmedMessageCount);
m_unconfirmedMessageCount = m_queuedEarlyTasks.size(); m_unconfirmedMessageCount = m_queuedEarlyTasks.size();
...@@ -203,88 +167,27 @@ void InProcessWorkerMessagingProxy::workerThreadCreated() ...@@ -203,88 +167,27 @@ void InProcessWorkerMessagingProxy::workerThreadCreated()
// Worker initialization means a pending activity. // Worker initialization means a pending activity.
m_workerGlobalScopeMayHavePendingActivity = true; m_workerGlobalScopeMayHavePendingActivity = true;
for (auto& earlyTasks : m_queuedEarlyTasks) for (auto& earlyTask : m_queuedEarlyTasks)
m_workerThread->postTask(BLINK_FROM_HERE, std::move(earlyTasks)); workerThread()->postTask(BLINK_FROM_HERE, std::move(earlyTask));
m_queuedEarlyTasks.clear(); m_queuedEarlyTasks.clear();
} }
InProcessWorkerMessagingProxy::InProcessWorkerMessagingProxy(ExecutionContext* executionContext, InProcessWorkerBase* workerObject, WorkerClients* workerClients) void InProcessWorkerMessagingProxy::parentObjectDestroyed()
: m_executionContext(executionContext)
, m_workerObjectProxy(InProcessWorkerObjectProxy::create(this))
, m_workerObject(workerObject)
, m_mayBeDestroyed(false)
, m_unconfirmedMessageCount(0)
, m_workerGlobalScopeMayHavePendingActivity(false)
, m_askedToTerminate(false)
, m_workerInspectorProxy(WorkerInspectorProxy::create())
, m_workerClients(workerClients)
, m_parentFrameTaskRunners(ParentFrameTaskRunners::create(toDocument(m_executionContext.get())->frame()))
{
DCHECK(isParentContextThread());
s_liveMessagingProxyCount++;
}
void InProcessWorkerMessagingProxy::workerObjectDestroyed()
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
// workerObjectDestroyed() is called in InProcessWorkerBase's destructor. // parentObjectDestroyed() is called in InProcessWorkerBase's destructor.
// Thus it should be guaranteed that a weak pointer m_workerObject has been // Thus it should be guaranteed that a weak pointer m_workerObject has been
// cleared before this method gets called. // cleared before this method gets called.
DCHECK(!m_workerObject); DCHECK(!m_workerObject);
m_parentFrameTaskRunners->get(TaskType::Internal)->postTask(BLINK_FROM_HERE, WTF::bind(&InProcessWorkerMessagingProxy::workerObjectDestroyedInternal, unretained(this))); ThreadedMessagingProxyBase::parentObjectDestroyed();
}
void InProcessWorkerMessagingProxy::workerObjectDestroyedInternal()
{
DCHECK(isParentContextThread());
m_mayBeDestroyed = true;
if (m_workerThread)
terminateWorkerGlobalScope();
else
workerThreadTerminated();
}
void InProcessWorkerMessagingProxy::workerThreadTerminated()
{
DCHECK(isParentContextThread());
// This method is always the last to be performed, so the proxy is not
// needed for communication in either side any more. However, the Worker
// object may still exist, and it assumes that the proxy exists, too.
m_askedToTerminate = true;
m_workerThread = nullptr;
m_workerInspectorProxy->workerThreadTerminated();
if (m_mayBeDestroyed)
delete this;
}
void InProcessWorkerMessagingProxy::terminateWorkerGlobalScope()
{
DCHECK(isParentContextThread());
if (m_askedToTerminate)
return;
m_askedToTerminate = true;
if (m_workerThread)
m_workerThread->terminate();
m_workerInspectorProxy->workerThreadTerminated();
}
void InProcessWorkerMessagingProxy::postMessageToPageInspector(const String& message)
{
DCHECK(isParentContextThread());
if (m_workerInspectorProxy)
m_workerInspectorProxy->dispatchMessageFromWorker(message);
} }
void InProcessWorkerMessagingProxy::confirmMessageFromWorkerObject() void InProcessWorkerMessagingProxy::confirmMessageFromWorkerObject()
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
if (m_askedToTerminate) if (askedToTerminate())
return; return;
DCHECK(m_unconfirmedMessageCount); DCHECK(m_unconfirmedMessageCount);
--m_unconfirmedMessageCount; --m_unconfirmedMessageCount;
...@@ -305,17 +208,9 @@ void InProcessWorkerMessagingProxy::pendingActivityFinished() ...@@ -305,17 +208,9 @@ void InProcessWorkerMessagingProxy::pendingActivityFinished()
bool InProcessWorkerMessagingProxy::hasPendingActivity() const bool InProcessWorkerMessagingProxy::hasPendingActivity() const
{ {
DCHECK(isParentContextThread()); DCHECK(isParentContextThread());
if (m_askedToTerminate) if (askedToTerminate())
return false; return false;
return m_unconfirmedMessageCount || m_workerGlobalScopeMayHavePendingActivity; return m_unconfirmedMessageCount || m_workerGlobalScopeMayHavePendingActivity;
} }
bool InProcessWorkerMessagingProxy::isParentContextThread() const
{
// TODO(nhiroki): Nested worker is not supported yet, so the parent context
// thread should be equal to the main thread (http://crbug.com/31666).
DCHECK(getExecutionContext()->isDocument());
return isMainThread();
}
} // namespace blink } // namespace blink
...@@ -30,14 +30,11 @@ ...@@ -30,14 +30,11 @@
#include "core/CoreExport.h" #include "core/CoreExport.h"
#include "core/dom/ExecutionContext.h" #include "core/dom/ExecutionContext.h"
#include "core/dom/MessagePort.h" #include "core/dom/MessagePort.h"
#include "core/inspector/ConsoleTypes.h" #include "core/workers/ThreadedMessagingProxyBase.h"
#include "core/workers/WorkerLoaderProxy.h" #include "core/workers/WorkerLoaderProxy.h"
#include "platform/heap/Handle.h" #include "platform/heap/Handle.h"
#include "wtf/Forward.h"
#include "wtf/Noncopyable.h" #include "wtf/Noncopyable.h"
#include "wtf/PassRefPtr.h" #include "wtf/PassRefPtr.h"
#include "wtf/RefPtr.h"
#include "wtf/Vector.h"
#include <memory> #include <memory>
namespace blink { namespace blink {
...@@ -45,91 +42,54 @@ namespace blink { ...@@ -45,91 +42,54 @@ namespace blink {
class ExecutionContext; class ExecutionContext;
class InProcessWorkerBase; class InProcessWorkerBase;
class InProcessWorkerObjectProxy; class InProcessWorkerObjectProxy;
class ParentFrameTaskRunners; class SerializedScriptValue;
class WorkerClients; class WorkerClients;
class WorkerInspectorProxy;
class WorkerThread;
// TODO(nhiroki): "MessagingProxy" is not well-defined term among worker // TODO(nhiroki): "MessagingProxy" is not well-defined term among worker
// components. Probably we should rename this to something more suitable. // components. Probably we should rename this to something more suitable.
// (http://crbug.com/603785) // (http://crbug.com/603785)
class CORE_EXPORT InProcessWorkerMessagingProxy : private WorkerLoaderProxyProvider { class CORE_EXPORT InProcessWorkerMessagingProxy : public ThreadedMessagingProxyBase {
WTF_MAKE_NONCOPYABLE(InProcessWorkerMessagingProxy); WTF_MAKE_NONCOPYABLE(InProcessWorkerMessagingProxy);
public: public:
// These methods should only be used on the parent context thread. // These methods should only be used on the parent context thread.
void startWorkerGlobalScope(const KURL& scriptURL, const String& userAgent, const String& sourceCode); void startWorkerGlobalScope(const KURL& scriptURL, const String& userAgent, const String& sourceCode);
void terminateWorkerGlobalScope();
void postMessageToWorkerGlobalScope(PassRefPtr<SerializedScriptValue>, std::unique_ptr<MessagePortChannelArray>); void postMessageToWorkerGlobalScope(PassRefPtr<SerializedScriptValue>, std::unique_ptr<MessagePortChannelArray>);
void workerThreadCreated() override;
void parentObjectDestroyed() override;
bool hasPendingActivity() const; bool hasPendingActivity() const;
void workerObjectDestroyed();
// These methods come from worker context thread via // These methods come from worker context thread via
// InProcessWorkerObjectProxy and are called on the parent context thread. // InProcessWorkerObjectProxy and are called on the parent context thread.
void postMessageToWorkerObject(PassRefPtr<SerializedScriptValue>, std::unique_ptr<MessagePortChannelArray>); void postMessageToWorkerObject(PassRefPtr<SerializedScriptValue>, std::unique_ptr<MessagePortChannelArray>);
void dispatchErrorEvent(const String& errorMessage, std::unique_ptr<SourceLocation>, int exceptionId); void dispatchErrorEvent(const String& errorMessage, std::unique_ptr<SourceLocation>, int exceptionId);
void reportConsoleMessage(MessageSource, MessageLevel, const String& message, std::unique_ptr<SourceLocation>);
void postMessageToPageInspector(const String&);
// 'virtual' for testing. // 'virtual' for testing.
virtual void confirmMessageFromWorkerObject(); virtual void confirmMessageFromWorkerObject();
virtual void pendingActivityFinished(); virtual void pendingActivityFinished();
virtual void workerThreadTerminated();
void workerThreadCreated();
ExecutionContext* getExecutionContext() const { return m_executionContext.get(); }
ParentFrameTaskRunners* getParentFrameTaskRunners() { return m_parentFrameTaskRunners.get(); }
// Number of live messaging proxies, used by leak detection.
static int proxyCount();
protected: protected:
InProcessWorkerMessagingProxy(InProcessWorkerBase*, WorkerClients*); InProcessWorkerMessagingProxy(InProcessWorkerBase*, WorkerClients*);
~InProcessWorkerMessagingProxy() override; ~InProcessWorkerMessagingProxy() override;
virtual std::unique_ptr<WorkerThread> createWorkerThread(double originTime) = 0;
PassRefPtr<WorkerLoaderProxy> loaderProxy() { return m_loaderProxy; }
InProcessWorkerObjectProxy& workerObjectProxy() { return *m_workerObjectProxy.get(); } InProcessWorkerObjectProxy& workerObjectProxy() { return *m_workerObjectProxy.get(); }
private: private:
friend class InProcessWorkerMessagingProxyForTest; friend class InProcessWorkerMessagingProxyForTest;
InProcessWorkerMessagingProxy(ExecutionContext*, InProcessWorkerBase*, WorkerClients*); InProcessWorkerMessagingProxy(ExecutionContext*, InProcessWorkerBase*, WorkerClients*);
void workerObjectDestroyedInternal();
// WorkerLoaderProxyProvider
// These methods are called on different threads to schedule loading
// requests and to send callbacks back to WorkerGlobalScope.
void postTaskToLoader(const WebTraceLocation&, std::unique_ptr<ExecutionContextTask>) override;
void postTaskToWorkerGlobalScope(const WebTraceLocation&, std::unique_ptr<ExecutionContextTask>) override;
// Returns true if this is called on the parent context thread.
bool isParentContextThread() const;
Persistent<ExecutionContext> m_executionContext;
std::unique_ptr<InProcessWorkerObjectProxy> m_workerObjectProxy; std::unique_ptr<InProcessWorkerObjectProxy> m_workerObjectProxy;
WeakPersistent<InProcessWorkerBase> m_workerObject; WeakPersistent<InProcessWorkerBase> m_workerObject;
bool m_mayBeDestroyed; Persistent<WorkerClients> m_workerClients;
std::unique_ptr<WorkerThread> m_workerThread;
// Unconfirmed messages from the parent context thread to the worker thread.
unsigned m_unconfirmedMessageCount;
bool m_workerGlobalScopeMayHavePendingActivity;
bool m_askedToTerminate;
// Tasks are queued here until there's a thread object created. // Tasks are queued here until there's a thread object created.
Vector<std::unique_ptr<ExecutionContextTask>> m_queuedEarlyTasks; Vector<std::unique_ptr<ExecutionContextTask>> m_queuedEarlyTasks;
Persistent<WorkerInspectorProxy> m_workerInspectorProxy; // Unconfirmed messages from the parent context thread to the worker thread.
unsigned m_unconfirmedMessageCount;
Persistent<WorkerClients> m_workerClients;
Persistent<ParentFrameTaskRunners> m_parentFrameTaskRunners;
RefPtr<WorkerLoaderProxy> m_loaderProxy; bool m_workerGlobalScopeMayHavePendingActivity;
}; };
} // namespace blink } // namespace blink
......
...@@ -124,7 +124,7 @@ void InProcessWorkerObjectProxy::workerGlobalScopeStarted(WorkerOrWorkletGlobalS ...@@ -124,7 +124,7 @@ void InProcessWorkerObjectProxy::workerGlobalScopeStarted(WorkerOrWorkletGlobalS
void InProcessWorkerObjectProxy::workerGlobalScopeClosed() void InProcessWorkerObjectProxy::workerGlobalScopeClosed()
{ {
getParentFrameTaskRunners()->get(TaskType::Internal)->postTask(BLINK_FROM_HERE, crossThreadBind(&InProcessWorkerMessagingProxy::terminateWorkerGlobalScope, crossThreadUnretained(m_messagingProxy))); getParentFrameTaskRunners()->get(TaskType::Internal)->postTask(BLINK_FROM_HERE, crossThreadBind(&InProcessWorkerMessagingProxy::terminateGlobalScope, crossThreadUnretained(m_messagingProxy)));
} }
void InProcessWorkerObjectProxy::workerThreadTerminated() void InProcessWorkerObjectProxy::workerThreadTerminated()
......
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "core/workers/ThreadedMessagingProxyBase.h"
#include "bindings/core/v8/SourceLocation.h"
#include "core/dom/Document.h"
#include "core/loader/DocumentLoader.h"
#include "core/workers/ParentFrameTaskRunners.h"
#include "core/workers/WorkerInspectorProxy.h"
#include "core/workers/WorkerThreadStartupData.h"
#include "wtf/CurrentTime.h"
namespace blink {
namespace {
static int s_liveMessagingProxyCount = 0;
} // namespace
ThreadedMessagingProxyBase::ThreadedMessagingProxyBase(ExecutionContext* executionContext)
: m_executionContext(executionContext)
, m_workerInspectorProxy(WorkerInspectorProxy::create())
, m_parentFrameTaskRunners(ParentFrameTaskRunners::create(toDocument(m_executionContext.get())->frame()))
, m_mayBeDestroyed(false)
, m_askedToTerminate(false)
{
DCHECK(isParentContextThread());
s_liveMessagingProxyCount++;
}
ThreadedMessagingProxyBase::~ThreadedMessagingProxyBase()
{
DCHECK(isParentContextThread());
if (m_loaderProxy)
m_loaderProxy->detachProvider(this);
s_liveMessagingProxyCount--;
}
int ThreadedMessagingProxyBase::proxyCount()
{
DCHECK(isMainThread());
return s_liveMessagingProxyCount;
}
void ThreadedMessagingProxyBase::initializeWorkerThread(std::unique_ptr<WorkerThreadStartupData> startupData)
{
DCHECK(isParentContextThread());
Document* document = toDocument(getExecutionContext());
double originTime = document->loader() ? document->loader()->timing().referenceMonotonicTime() : monotonicallyIncreasingTime();
m_loaderProxy = WorkerLoaderProxy::create(this);
m_workerThread = createWorkerThread(originTime);
m_workerThread->start(std::move(startupData));
workerThreadCreated();
}
void ThreadedMessagingProxyBase::postTaskToWorkerGlobalScope(const WebTraceLocation& location, std::unique_ptr<ExecutionContextTask> task)
{
if (m_askedToTerminate)
return;
DCHECK(m_workerThread);
m_workerThread->postTask(location, std::move(task));
}
void ThreadedMessagingProxyBase::postTaskToLoader(const WebTraceLocation& location, std::unique_ptr<ExecutionContextTask> task)
{
DCHECK(getExecutionContext()->isDocument());
// TODO(hiroshige,yuryu): Make this not use ExecutionContextTask and use
// m_parentFrameTaskRunners->get(TaskType::Networking) instead.
getExecutionContext()->postTask(location, std::move(task));
}
void ThreadedMessagingProxyBase::reportConsoleMessage(MessageSource source, MessageLevel level, const String& message, std::unique_ptr<SourceLocation> location)
{
DCHECK(isParentContextThread());
if (m_askedToTerminate)
return;
if (m_workerInspectorProxy)
m_workerInspectorProxy->addConsoleMessageFromWorker(level, message, std::move(location));
}
void ThreadedMessagingProxyBase::workerThreadCreated()
{
DCHECK(isParentContextThread());
DCHECK(!m_askedToTerminate);
DCHECK(m_workerThread);
}
void ThreadedMessagingProxyBase::parentObjectDestroyed()
{
DCHECK(isParentContextThread());
m_parentFrameTaskRunners->get(TaskType::Internal)->postTask(BLINK_FROM_HERE, WTF::bind(&ThreadedMessagingProxyBase::parentObjectDestroyedInternal, unretained(this)));
}
void ThreadedMessagingProxyBase::parentObjectDestroyedInternal()
{
DCHECK(isParentContextThread());
m_mayBeDestroyed = true;
if (m_workerThread)
terminateGlobalScope();
else
workerThreadTerminated();
}
void ThreadedMessagingProxyBase::workerThreadTerminated()
{
DCHECK(isParentContextThread());
// This method is always the last to be performed, so the proxy is not
// needed for communication in either side any more. However, the Worker
// object may still exist, and it assumes that the proxy exists, too.
m_askedToTerminate = true;
m_workerThread = nullptr;
m_workerInspectorProxy->workerThreadTerminated();
if (m_mayBeDestroyed)
delete this;
}
void ThreadedMessagingProxyBase::terminateGlobalScope()
{
DCHECK(isParentContextThread());
if (m_askedToTerminate)
return;
m_askedToTerminate = true;
if (m_workerThread)
m_workerThread->terminate();
m_workerInspectorProxy->workerThreadTerminated();
}
void ThreadedMessagingProxyBase::postMessageToPageInspector(const String& message)
{
DCHECK(isParentContextThread());
if (m_workerInspectorProxy)
m_workerInspectorProxy->dispatchMessageFromWorker(message);
}
bool ThreadedMessagingProxyBase::isParentContextThread() const
{
// TODO(nhiroki): Nested worker is not supported yet, so the parent context
// thread should be equal to the main thread (http://crbug.com/31666).
DCHECK(getExecutionContext()->isDocument());
return isMainThread();
}
} // namespace blink
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef ThreadedMessagingProxyBase_h
#define ThreadedMessagingProxyBase_h
#include "core/CoreExport.h"
#include "core/inspector/ConsoleTypes.h"
#include "core/workers/WorkerLoaderProxy.h"
#include "wtf/Forward.h"
namespace blink {
class ExecutionContext;
class ParentFrameTaskRunners;
class WorkerInspectorProxy;
class WorkerLoaderProxy;
class WorkerThread;
class WorkerThreadStartupData;
class CORE_EXPORT ThreadedMessagingProxyBase : private WorkerLoaderProxyProvider {
public:
void terminateGlobalScope();
virtual void workerThreadCreated();
// This method should be called in the destructor of the object which
// initially created it. This object could either be a Worker or a Worklet.
virtual void parentObjectDestroyed();
void reportConsoleMessage(MessageSource, MessageLevel, const String& message, std::unique_ptr<SourceLocation>);
void postMessageToPageInspector(const String&);
// 'virtual' for testing.
virtual void workerThreadTerminated();
ExecutionContext* getExecutionContext() const { return m_executionContext.get(); }
ParentFrameTaskRunners* getParentFrameTaskRunners() { return m_parentFrameTaskRunners.get(); }
// Number of live messaging proxies, used by leak detection.
static int proxyCount();
protected:
ThreadedMessagingProxyBase(ExecutionContext*);
~ThreadedMessagingProxyBase() override;
void initializeWorkerThread(std::unique_ptr<WorkerThreadStartupData>);
virtual std::unique_ptr<WorkerThread> createWorkerThread(double originTime) = 0;
WorkerThread* workerThread() const { return m_workerThread.get(); }
bool askedToTerminate() const { return m_askedToTerminate; }
PassRefPtr<WorkerLoaderProxy> loaderProxy() { return m_loaderProxy; }
WorkerInspectorProxy* workerInspectorProxy() { return m_workerInspectorProxy.get(); }
// Returns true if this is called on the parent context thread.
bool isParentContextThread() const;
// WorkerLoaderProxyProvider
// These methods are called on different threads to schedule loading
// requests and to send callbacks back to WorkerGlobalScope.
void postTaskToLoader(const WebTraceLocation&, std::unique_ptr<ExecutionContextTask>) override;
void postTaskToWorkerGlobalScope(const WebTraceLocation&, std::unique_ptr<ExecutionContextTask>) override;
private:
friend class InProcessWorkerMessagingProxyForTest;
void parentObjectDestroyedInternal();
Persistent<ExecutionContext> m_executionContext;
Persistent<WorkerInspectorProxy> m_workerInspectorProxy;
Persistent<ParentFrameTaskRunners> m_parentFrameTaskRunners;
std::unique_ptr<WorkerThread> m_workerThread;
RefPtr<WorkerLoaderProxy> m_loaderProxy;
bool m_mayBeDestroyed;
bool m_askedToTerminate;
};
} // namespace blink
#endif // ThreadedMessagingProxyBase_h
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment