Commit e2f98f70 authored by haraken@chromium.org's avatar haraken@chromium.org

Revert 176303 "ThreadableWebSocketChannelClientWrapper don't nee..."

This broke browser_tests (WebSocket2):
http://build.chromium.org/p/chromium.webkit/builders/Mac10.8%20Tests/builds/8151

> ThreadableWebSocketChannelClientWrapper don't need suspend / resume code.
> 
> As the WebSocket class handles suspend / resume, we can remove the suspend /
> resume code from ThreadableWebSocketChannelClientWrapper.
> 
> BUG=384238
> R=tyoshino
> 
> Review URL: https://codereview.chromium.org/333223002

TBR=yhirano@chromium.org

Review URL: https://codereview.chromium.org/338343002

git-svn-id: svn://svn.chromium.org/blink/trunk@176310 bbb929c8-8fbe-4397-9dbb-9b2b20218538
parent 86e91486
...@@ -31,10 +31,21 @@ ...@@ -31,10 +31,21 @@
#include "config.h" #include "config.h"
#include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h" #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
#include "core/dom/CrossThreadTask.h"
#include "core/dom/ExecutionContext.h"
#include "platform/CrossThreadCopier.h"
#include "wtf/PassRefPtr.h"
#include "wtf/RefPtr.h"
namespace WebCore { namespace WebCore {
ThreadableWebSocketChannelClientWrapper::ThreadableWebSocketChannelClientWrapper(WebSocketChannelClient* client) ThreadableWebSocketChannelClientWrapper::ThreadableWebSocketChannelClientWrapper(WebSocketChannelClient* client)
: m_client(client) : m_client(client)
, m_suspended(false)
{
}
ThreadableWebSocketChannelClientWrapper::~ThreadableWebSocketChannelClientWrapper()
{ {
} }
...@@ -50,44 +61,121 @@ void ThreadableWebSocketChannelClientWrapper::clearClient() ...@@ -50,44 +61,121 @@ void ThreadableWebSocketChannelClientWrapper::clearClient()
void ThreadableWebSocketChannelClientWrapper::didConnect(const String& subprotocol, const String& extensions) void ThreadableWebSocketChannelClientWrapper::didConnect(const String& subprotocol, const String& extensions)
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didConnectCallback, this, subprotocol, extensions));
m_client->didConnect(subprotocol, extensions); if (!m_suspended)
processPendingTasks();
} }
void ThreadableWebSocketChannelClientWrapper::didReceiveMessage(const String& message) void ThreadableWebSocketChannelClientWrapper::didReceiveMessage(const String& message)
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didReceiveMessageCallback, this, message));
m_client->didReceiveMessage(message); if (!m_suspended)
processPendingTasks();
} }
void ThreadableWebSocketChannelClientWrapper::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData) void ThreadableWebSocketChannelClientWrapper::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didReceiveBinaryDataCallback, this, binaryData));
m_client->didReceiveBinaryData(binaryData); if (!m_suspended)
processPendingTasks();
} }
void ThreadableWebSocketChannelClientWrapper::didConsumeBufferedAmount(unsigned long consumed) void ThreadableWebSocketChannelClientWrapper::didConsumeBufferedAmount(unsigned long consumed)
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didConsumeBufferedAmountCallback, this, consumed));
m_client->didConsumeBufferedAmount(consumed); if (!m_suspended)
processPendingTasks();
} }
void ThreadableWebSocketChannelClientWrapper::didStartClosingHandshake() void ThreadableWebSocketChannelClientWrapper::didStartClosingHandshake()
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didStartClosingHandshakeCallback, this));
m_client->didStartClosingHandshake(); if (!m_suspended)
processPendingTasks();
} }
void ThreadableWebSocketChannelClientWrapper::didClose(WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) void ThreadableWebSocketChannelClientWrapper::didClose(WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didCloseCallback, this, closingHandshakeCompletion, code, reason));
m_client->didClose(closingHandshakeCompletion, code, reason); if (!m_suspended)
processPendingTasks();
} }
void ThreadableWebSocketChannelClientWrapper::didReceiveMessageError() void ThreadableWebSocketChannelClientWrapper::didReceiveMessageError()
{ {
if (m_client) m_pendingTasks.append(createCallbackTask(&didReceiveMessageErrorCallback, this));
m_client->didReceiveMessageError(); if (!m_suspended)
processPendingTasks();
}
void ThreadableWebSocketChannelClientWrapper::suspend()
{
m_suspended = true;
}
void ThreadableWebSocketChannelClientWrapper::resume()
{
m_suspended = false;
processPendingTasks();
}
void ThreadableWebSocketChannelClientWrapper::processPendingTasks()
{
if (m_suspended)
return;
Vector<OwnPtr<ExecutionContextTask> > tasks;
tasks.swap(m_pendingTasks);
for (Vector<OwnPtr<ExecutionContextTask> >::const_iterator iter = tasks.begin(); iter != tasks.end(); ++iter)
(*iter)->performTask(0);
}
void ThreadableWebSocketChannelClientWrapper::didConnectCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, const String& subprotocol, const String& extensions)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didConnect(subprotocol, extensions);
}
void ThreadableWebSocketChannelClientWrapper::didReceiveMessageCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, const String& message)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didReceiveMessage(message);
}
void ThreadableWebSocketChannelClientWrapper::didReceiveBinaryDataCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, PassOwnPtr<Vector<char> > binaryData)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didReceiveBinaryData(binaryData);
}
void ThreadableWebSocketChannelClientWrapper::didConsumeBufferedAmountCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, unsigned long consumed)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didConsumeBufferedAmount(consumed);
}
void ThreadableWebSocketChannelClientWrapper::didStartClosingHandshakeCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didStartClosingHandshake();
}
void ThreadableWebSocketChannelClientWrapper::didCloseCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didClose(closingHandshakeCompletion, code, reason);
}
void ThreadableWebSocketChannelClientWrapper::didReceiveMessageErrorCallback(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> wrapper)
{
ASSERT_UNUSED(context, !context);
if (wrapper->m_client)
wrapper->m_client->didReceiveMessageError();
} }
} // namespace WebCore } // namespace WebCore
...@@ -31,18 +31,24 @@ ...@@ -31,18 +31,24 @@
#ifndef ThreadableWebSocketChannelClientWrapper_h #ifndef ThreadableWebSocketChannelClientWrapper_h
#define ThreadableWebSocketChannelClientWrapper_h #define ThreadableWebSocketChannelClientWrapper_h
#include "core/dom/ExecutionContextTask.h"
#include "modules/websockets/WebSocketChannel.h"
#include "modules/websockets/WebSocketChannelClient.h" #include "modules/websockets/WebSocketChannelClient.h"
#include "platform/heap/Handle.h"
#include "wtf/Forward.h" #include "wtf/Forward.h"
#include "wtf/OwnPtr.h"
#include "wtf/PassOwnPtr.h" #include "wtf/PassOwnPtr.h"
#include "wtf/ThreadSafeRefCounted.h" #include "wtf/ThreadSafeRefCounted.h"
#include "wtf/Vector.h" #include "wtf/Vector.h"
namespace WebCore { namespace WebCore {
class ThreadableWebSocketChannelClientWrapper : public ThreadSafeRefCountedWillBeGarbageCollected<ThreadableWebSocketChannelClientWrapper> { class ExecutionContext;
class WebSocketChannelClient;
class ThreadableWebSocketChannelClientWrapper : public ThreadSafeRefCountedWillBeGarbageCollectedFinalized<ThreadableWebSocketChannelClientWrapper> {
public: public:
static PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> create(WebSocketChannelClient*); static PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> create(WebSocketChannelClient*);
~ThreadableWebSocketChannelClientWrapper();
void clearClient(); void clearClient();
...@@ -54,12 +60,27 @@ public: ...@@ -54,12 +60,27 @@ public:
void didClose(WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason); void didClose(WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason);
void didReceiveMessageError(); void didReceiveMessageError();
void suspend();
void resume();
void trace(Visitor*) { } void trace(Visitor*) { }
private: private:
ThreadableWebSocketChannelClientWrapper(WebSocketChannelClient*); ThreadableWebSocketChannelClientWrapper(WebSocketChannelClient*);
void processPendingTasks();
static void didConnectCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, const String& subprotocol, const String& extensions);
static void didReceiveMessageCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, const String& message);
static void didReceiveBinaryDataCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, PassOwnPtr<Vector<char> >);
static void didConsumeBufferedAmountCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, unsigned long);
static void didStartClosingHandshakeCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>);
static void didCloseCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, WebSocketChannelClient::ClosingHandshakeCompletionStatus, unsigned short code, const String& reason);
static void didReceiveMessageErrorCallback(ExecutionContext*, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>);
WebSocketChannelClient* m_client; WebSocketChannelClient* m_client;
bool m_suspended;
Vector<OwnPtr<ExecutionContextTask> > m_pendingTasks;
}; };
} // namespace WebCore } // namespace WebCore
......
...@@ -187,6 +187,20 @@ void WorkerThreadableWebSocketChannel::disconnect() ...@@ -187,6 +187,20 @@ void WorkerThreadableWebSocketChannel::disconnect()
m_bridge.clear(); m_bridge.clear();
} }
void WorkerThreadableWebSocketChannel::suspend()
{
m_workerClientWrapper->suspend();
if (m_bridge)
m_bridge->suspend();
}
void WorkerThreadableWebSocketChannel::resume()
{
m_workerClientWrapper->resume();
if (m_bridge)
m_bridge->resume();
}
void WorkerThreadableWebSocketChannel::trace(Visitor* visitor) void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
{ {
visitor->trace(m_workerClientWrapper); visitor->trace(m_workerClientWrapper);
...@@ -305,6 +319,22 @@ void WorkerThreadableWebSocketChannel::Peer::disconnect() ...@@ -305,6 +319,22 @@ void WorkerThreadableWebSocketChannel::Peer::disconnect()
m_mainWebSocketChannel = nullptr; m_mainWebSocketChannel = nullptr;
} }
void WorkerThreadableWebSocketChannel::Peer::suspend()
{
ASSERT(isMainThread());
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->suspend();
}
void WorkerThreadableWebSocketChannel::Peer::resume()
{
ASSERT(isMainThread());
if (!m_mainWebSocketChannel)
return;
m_mainWebSocketChannel->resume();
}
static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions) static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
{ {
ASSERT_UNUSED(context, context->isWorkerGlobalScope()); ASSERT_UNUSED(context, context->isWorkerGlobalScope());
...@@ -495,6 +525,22 @@ void WorkerThreadableWebSocketChannel::Bridge::disconnect() ...@@ -495,6 +525,22 @@ void WorkerThreadableWebSocketChannel::Bridge::disconnect()
terminatePeer(); terminatePeer();
} }
void WorkerThreadableWebSocketChannel::Bridge::suspend()
{
if (hasTerminatedPeer())
return;
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::suspend, m_peer)));
}
void WorkerThreadableWebSocketChannel::Bridge::resume()
{
if (hasTerminatedPeer())
return;
m_loaderProxy.postTaskToLoader(CallClosureTask::create(bind(&Peer::resume, m_peer)));
}
void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
{ {
m_workerClientWrapper->clearClient(); m_workerClientWrapper->clearClient();
......
...@@ -84,8 +84,8 @@ public: ...@@ -84,8 +84,8 @@ public:
virtual void close(int code, const String& reason) OVERRIDE; virtual void close(int code, const String& reason) OVERRIDE;
virtual void fail(const String& reason, MessageLevel, const String&, unsigned) OVERRIDE; virtual void fail(const String& reason, MessageLevel, const String&, unsigned) OVERRIDE;
virtual void disconnect() OVERRIDE; // Will suppress didClose(). virtual void disconnect() OVERRIDE; // Will suppress didClose().
virtual void suspend() OVERRIDE { } virtual void suspend() OVERRIDE;
virtual void resume() OVERRIDE { } virtual void resume() OVERRIDE;
virtual void trace(Visitor*) OVERRIDE; virtual void trace(Visitor*) OVERRIDE;
...@@ -110,6 +110,8 @@ public: ...@@ -110,6 +110,8 @@ public:
void close(int code, const String& reason); void close(int code, const String& reason);
void fail(const String& reason, MessageLevel, const String& sourceURL, unsigned lineNumber); void fail(const String& reason, MessageLevel, const String& sourceURL, unsigned lineNumber);
void disconnect(); void disconnect();
void suspend();
void resume();
// WebSocketChannelClient functions. // WebSocketChannelClient functions.
virtual void didConnect(const String& subprotocol, const String& extensions) OVERRIDE; virtual void didConnect(const String& subprotocol, const String& extensions) OVERRIDE;
...@@ -150,6 +152,8 @@ private: ...@@ -150,6 +152,8 @@ private:
void close(int code, const String& reason); void close(int code, const String& reason);
void fail(const String& reason, MessageLevel, const String& sourceURL, unsigned lineNumber); void fail(const String& reason, MessageLevel, const String& sourceURL, unsigned lineNumber);
void disconnect(); void disconnect();
void suspend();
void resume();
private: private:
Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, WorkerGlobalScope&); Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper>, WorkerGlobalScope&);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment