Commit 029de6ba authored by tyoshino@chromium.org's avatar tyoshino@chromium.org

Cleanup WorkerThreadableWebSocketChannel's logic to wait for the main thread

- Pass WorkerGlobalScope as a reference until we set it to RefPtr
- Wrap m_syncHelper with a function hasTerminatedPeer() for readability
- Stop holding reference on WorkerGlobalScope in
  WorkerThreadableWebSocketChannel
- Move ASSERT(m_syncHelper) to waitForMethodCompletion()
- Return a value indicating failure when waitForMethodCompletion()
  returns because of shutdown
- Make the methods not calling waitForMethodCompletion() also return
  without doing anything if the peer is already terminated
- Move task posting code into waitForMethodCompletion()
- More comments for readability

BUG=none

Review URL: https://codereview.chromium.org/265713004

git-svn-id: svn://svn.chromium.org/blink/trunk@173608 bbb929c8-8fbe-4397-9dbb-9b2b20218538
parent 94c4812d
......@@ -37,7 +37,6 @@
#include "core/dom/CrossThreadTask.h"
#include "core/dom/Document.h"
#include "core/dom/ExecutionContext.h"
#include "core/dom/ExecutionContextTask.h"
#include "core/fileapi/Blob.h"
#include "core/inspector/ScriptCallFrame.h"
#include "core/inspector/ScriptCallStack.h"
......@@ -120,10 +119,9 @@ private:
unsigned long m_bufferedAmount;
};
WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
: m_workerGlobalScope(context)
, m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
, m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope))
WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
: m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
, m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
, m_sourceURLAtConnection(sourceURL)
, m_lineNumberAtConnection(lineNumber)
{
......@@ -229,12 +227,6 @@ void WorkerThreadableWebSocketChannel::resume()
m_bridge->resume();
}
void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
{
visitor->trace(m_workerGlobalScope);
WebSocketChannel::trace(visitor);
}
WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<WeakReference<Peer> > reference, PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
: m_workerClientWrapper(clientWrapper)
, m_loaderProxy(loaderProxy)
......@@ -463,7 +455,7 @@ void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
m_loaderProxy.postTaskToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper));
}
WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtrWillBeRawPtr<WorkerGlobalScope> workerGlobalScope)
WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
: m_workerClientWrapper(workerClientWrapper)
, m_workerGlobalScope(workerGlobalScope)
, m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
......@@ -487,8 +479,7 @@ void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceUR
m_syncHelper = syncHelper.get();
RefPtr<Bridge> protect(this);
m_loaderProxy.postTaskToLoader(createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sourceURL, lineNumber, syncHelper.release()));
if (!waitForMethodCompletion()) {
if (!waitForMethodCompletion(createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, sourceURL, lineNumber, syncHelper.release()))) {
// The worker thread has been signalled to shutdown before method completion.
terminatePeer();
}
......@@ -496,71 +487,82 @@ void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceUR
bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
{
if (!m_workerGlobalScope)
if (hasTerminatedPeer())
return false;
ASSERT(m_syncHelper);
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy())));
RefPtr<Bridge> protect(this);
waitForMethodCompletion();
if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy()))))
return false;
return m_syncHelper->connectRequestResult();
}
WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
{
if (!m_workerGlobalScope)
if (hasTerminatedPeer())
return WebSocketChannel::SendFail;
ASSERT(m_syncHelper);
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy())));
RefPtr<Bridge> protect(this);
waitForMethodCompletion();
if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy()))))
return WebSocketChannel::SendFail;
return m_syncHelper->sendRequestResult();
}
WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
{
if (!m_workerGlobalScope)
if (hasTerminatedPeer())
return WebSocketChannel::SendFail;
ASSERT(m_syncHelper);
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
if (binaryData.byteLength())
memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release())));
RefPtr<Bridge> protect(this);
waitForMethodCompletion();
if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release()))))
return WebSocketChannel::SendFail;
return m_syncHelper->sendRequestResult();
}
WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(PassRefPtr<BlobDataHandle> data)
{
if (!m_workerGlobalScope)
if (hasTerminatedPeer())
return WebSocketChannel::SendFail;
ASSERT(m_syncHelper);
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data)));
RefPtr<Bridge> protect(this);
waitForMethodCompletion();
if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data))))
return WebSocketChannel::SendFail;
return m_syncHelper->sendRequestResult();
}
unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
{
if (!m_workerGlobalScope)
if (hasTerminatedPeer())
return 0;
ASSERT(m_syncHelper);
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::bufferedAmount, m_peer)));
RefPtr<Bridge> protect(this);
waitForMethodCompletion();
if (!waitForMethodCompletion(CallClosureTask::create(bind(&Peer::bufferedAmount, m_peer))))
return 0;
return m_syncHelper->bufferedAmount();
}
void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
{
if (hasTerminatedPeer())
return;
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::close, m_peer, code, reason.isolatedCopy())));
}
void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
{
if (hasTerminatedPeer())
return;
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::fail, m_peer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber)));
}
......@@ -572,11 +574,17 @@ void WorkerThreadableWebSocketChannel::Bridge::disconnect()
void WorkerThreadableWebSocketChannel::Bridge::suspend()
{
if (hasTerminatedPeer())
return;
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::suspend, m_peer)));
}
void WorkerThreadableWebSocketChannel::Bridge::resume()
{
if (hasTerminatedPeer())
return;
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::resume, m_peer)));
}
......@@ -587,10 +595,12 @@ void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
{
if (!m_syncHelper)
return true;
ASSERT(m_workerGlobalScope);
ASSERT(m_syncHelper);
m_loaderProxy.postTaskToLoader(task);
blink::WebWaitableEvent* shutdownEvent = m_workerGlobalScope->thread()->shutdownEvent();
Vector<blink::WebWaitableEvent*> events;
......@@ -605,8 +615,12 @@ bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
void WorkerThreadableWebSocketChannel::Bridge::terminatePeer()
{
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::destroy, m_peer)));
m_workerGlobalScope = nullptr;
// Peer::destroy() deletes m_peer and then m_syncHelper will be released.
// We must not touch m_syncHelper any more.
m_syncHelper = 0;
// We won't use this any more.
m_workerGlobalScope = nullptr;
}
} // namespace WebCore
......@@ -31,6 +31,7 @@
#ifndef WorkerThreadableWebSocketChannel_h
#define WorkerThreadableWebSocketChannel_h
#include "core/dom/ExecutionContextTask.h"
#include "core/frame/ConsoleTypes.h"
#include "core/workers/WorkerGlobalScope.h"
#include "modules/websockets/WebSocketChannel.h"
......@@ -84,8 +85,6 @@ public:
virtual void suspend() OVERRIDE;
virtual void resume() OVERRIDE;
virtual void trace(Visitor*) OVERRIDE;
// Generated by the bridge. The Peer is destructed by an async call from
// Bridge, and may outlive the bridge. All methods of this class must
// be called on the main thread.
......@@ -133,7 +132,7 @@ private:
// Bridge for Peer. Running on the worker thread.
class Bridge : public RefCounted<Bridge> {
public:
static PassRefPtr<Bridge> create(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtrWillBeRawPtr<WorkerGlobalScope> workerGlobalScope)
static PassRefPtr<Bridge> create(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
{
return adoptRef(new Bridge(workerClientWrapper, workerGlobalScope));
}
......@@ -153,7 +152,7 @@ private:
void resume();
private:
Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper>, PassRefPtrWillBeRawPtr<WorkerGlobalScope>);
Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper>, WorkerGlobalScope&);
static void setWebSocketChannel(ExecutionContext*, Bridge* thisPtr, Peer*, PassRefPtr<ThreadableWebSocketChannelClientWrapper>);
......@@ -161,10 +160,12 @@ private:
void clearClientWrapper();
// Returns false if shutdown event is received before method completion.
bool waitForMethodCompletion();
bool waitForMethodCompletion(PassOwnPtr<ExecutionContextTask>);
void terminatePeer();
bool hasTerminatedPeer() { return !m_syncHelper; }
const RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper;
RefPtrWillBePersistent<WorkerGlobalScope> m_workerGlobalScope;
WorkerLoaderProxy& m_loaderProxy;
......@@ -174,7 +175,6 @@ private:
WorkerThreadableWebSocketChannel(WorkerGlobalScope&, WebSocketChannelClient*, const String& sourceURL, unsigned lineNumber);
RefPtrWillBeMember<WorkerGlobalScope> m_workerGlobalScope;
const RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper;
RefPtr<Bridge> m_bridge;
String m_sourceURLAtConnection;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment