Commit 6b4bf550 authored by vkuzkokov's avatar vkuzkokov Committed by Commit bot

DevTools: Merged AndroidWebSocket and AndroidWebSocketImpl

This is done so that ADM::Device could have access to AndroidWebSocket's private members in order to close all sockets when device is released.

BUG=387067

Review URL: https://codereview.chromium.org/620183002

Cr-Commit-Position: refs/heads/master@{#300663}
parent b7a7ed1a
...@@ -53,6 +53,7 @@ class AndroidDeviceManager ...@@ -53,6 +53,7 @@ class AndroidDeviceManager
}; };
typedef base::Callback<void(const DeviceInfo&)> DeviceInfoCallback; typedef base::Callback<void(const DeviceInfo&)> DeviceInfoCallback;
class Device;
class AndroidWebSocket { class AndroidWebSocket {
public: public:
...@@ -66,9 +67,28 @@ class AndroidDeviceManager ...@@ -66,9 +67,28 @@ class AndroidDeviceManager
virtual ~Delegate() {} virtual ~Delegate() {}
}; };
virtual ~AndroidWebSocket() {} ~AndroidWebSocket();
virtual void SendFrame(const std::string& message) = 0; void SendFrame(const std::string& message);
private:
friend class Device;
class WebSocketImpl;
AndroidWebSocket(
scoped_refptr<Device> device,
const std::string& socket_name,
const std::string& url,
AndroidWebSocket::Delegate* delegate);
void Connected(int result, scoped_ptr<net::StreamSocket> socket);
void OnFrameRead(const std::string& message);
void OnSocketClosed();
scoped_refptr<Device> device_;
WebSocketImpl* socket_impl_;
Delegate* delegate_;
base::WeakPtrFactory<AndroidWebSocket> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(AndroidWebSocket);
}; };
class DeviceProvider; class DeviceProvider;
...@@ -76,10 +96,6 @@ class AndroidDeviceManager ...@@ -76,10 +96,6 @@ class AndroidDeviceManager
class Device : public base::RefCountedThreadSafe<Device>, class Device : public base::RefCountedThreadSafe<Device>,
public base::NonThreadSafe { public base::NonThreadSafe {
public: public:
typedef AndroidDeviceManager::DeviceInfoCallback DeviceInfoCallback;
typedef AndroidDeviceManager::CommandCallback CommandCallback;
typedef AndroidDeviceManager::SocketCallback SocketCallback;
void QueryDeviceInfo(const DeviceInfoCallback& callback); void QueryDeviceInfo(const DeviceInfoCallback& callback);
void OpenSocket(const std::string& socket_name, void OpenSocket(const std::string& socket_name,
...@@ -92,7 +108,6 @@ class AndroidDeviceManager ...@@ -92,7 +108,6 @@ class AndroidDeviceManager
void HttpUpgrade(const std::string& socket_name, void HttpUpgrade(const std::string& socket_name,
const std::string& url, const std::string& url,
const SocketCallback& callback); const SocketCallback& callback);
AndroidWebSocket* CreateWebSocket( AndroidWebSocket* CreateWebSocket(
const std::string& socket_name, const std::string& socket_name,
const std::string& url, const std::string& url,
...@@ -101,12 +116,14 @@ class AndroidDeviceManager ...@@ -101,12 +116,14 @@ class AndroidDeviceManager
std::string serial() { return serial_; } std::string serial() { return serial_; }
private: private:
friend class base::RefCountedThreadSafe<Device>;
friend class AndroidDeviceManager; friend class AndroidDeviceManager;
friend class AndroidWebSocket;
Device(scoped_refptr<base::MessageLoopProxy> device_message_loop, Device(scoped_refptr<base::MessageLoopProxy> device_message_loop,
scoped_refptr<DeviceProvider> provider, scoped_refptr<DeviceProvider> provider,
const std::string& serial); const std::string& serial);
friend class base::RefCountedThreadSafe<Device>;
virtual ~Device(); virtual ~Device();
scoped_refptr<base::MessageLoopProxy> device_message_loop_; scoped_refptr<base::MessageLoopProxy> device_message_loop_;
......
...@@ -19,262 +19,179 @@ namespace { ...@@ -19,262 +19,179 @@ namespace {
const int kBufferSize = 16 * 1024; const int kBufferSize = 16 * 1024;
class WebSocketImpl { } // namespace
public:
typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate;
WebSocketImpl(Delegate* delegate,
scoped_ptr<net::StreamSocket> socket);
void StartListening();
void SendFrame(const std::string& message);
private:
void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result);
void SendPendingRequests(int result);
void Disconnect();
Delegate* delegate_;
scoped_ptr<net::StreamSocket> socket_;
std::string response_buffer_;
std::string request_buffer_;
base::ThreadChecker thread_checker_;
DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
};
class DelegateWrapper class AndroidDeviceManager::AndroidWebSocket::WebSocketImpl {
: public AndroidDeviceManager::AndroidWebSocket::Delegate {
public: public:
DelegateWrapper(base::WeakPtr<Delegate> weak_delegate, WebSocketImpl(scoped_refptr<base::MessageLoopProxy> response_message_loop,
scoped_refptr<base::MessageLoopProxy> message_loop) base::WeakPtr<AndroidWebSocket> weak_socket,
: weak_delegate_(weak_delegate), scoped_ptr<net::StreamSocket> socket)
message_loop_(message_loop) { : response_message_loop_(response_message_loop),
} weak_socket_(weak_socket),
socket_(socket.Pass()) {
~DelegateWrapper() override {} thread_checker_.DetachFromThread();
// AndroidWebSocket::Delegate implementation
void OnSocketOpened() override {
message_loop_->PostTask(FROM_HERE,
base::Bind(&Delegate::OnSocketOpened, weak_delegate_));
} }
void OnFrameRead(const std::string& message) override { void StartListening() {
message_loop_->PostTask(FROM_HERE, DCHECK(thread_checker_.CalledOnValidThread());
base::Bind(&Delegate::OnFrameRead, weak_delegate_, message)); DCHECK(socket_);
scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));
Read(buffer);
} }
void OnSocketClosed() override { void SendFrame(const std::string& message) {
message_loop_->PostTask(FROM_HERE, DCHECK(thread_checker_.CalledOnValidThread());
base::Bind(&Delegate::OnSocketClosed, weak_delegate_)); if (!socket_)
return;
int mask = base::RandInt(0, 0x7FFFFFFF);
std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
request_buffer_ += encoded_frame;
if (request_buffer_.length() == encoded_frame.length())
SendPendingRequests(0);
} }
private: private:
base::WeakPtr<Delegate> weak_delegate_; void Read(scoped_refptr<net::IOBuffer> response_buffer) {
scoped_refptr<base::MessageLoopProxy> message_loop_; int result = socket_->Read(
}; response_buffer.get(),
kBufferSize,
class AndroidWebSocketImpl base::Bind(&WebSocketImpl::OnBytesRead,
: public AndroidDeviceManager::AndroidWebSocket, base::Unretained(this), response_buffer));
public AndroidDeviceManager::AndroidWebSocket::Delegate { if (result != net::ERR_IO_PENDING)
public: OnBytesRead(response_buffer, result);
typedef AndroidDeviceManager::Device Device; }
AndroidWebSocketImpl(
scoped_refptr<base::MessageLoopProxy> device_message_loop,
scoped_refptr<Device> device,
const std::string& socket_name,
const std::string& url,
AndroidWebSocket::Delegate* delegate);
~AndroidWebSocketImpl() override; void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result) {
DCHECK(thread_checker_.CalledOnValidThread());
if (result <= 0) {
Disconnect();
return;
}
response_buffer_.append(response_buffer->data(), result);
int bytes_consumed;
std::string output;
WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
response_buffer_, false, &bytes_consumed, &output);
// AndroidWebSocket implementation while (parse_result == WebSocket::FRAME_OK) {
void SendFrame(const std::string& message) override; response_buffer_ = response_buffer_.substr(bytes_consumed);
response_message_loop_->PostTask(
FROM_HERE,
base::Bind(&AndroidWebSocket::OnFrameRead, weak_socket_, output));
parse_result = WebSocket::DecodeFrameHybi17(
response_buffer_, false, &bytes_consumed, &output);
}
if (parse_result == WebSocket::FRAME_ERROR ||
parse_result == WebSocket::FRAME_CLOSE) {
Disconnect();
return;
}
Read(response_buffer);
}
// AndroidWebSocket::Delegate implementation void SendPendingRequests(int result) {
void OnSocketOpened() override; DCHECK(thread_checker_.CalledOnValidThread());
void OnFrameRead(const std::string& message) override; if (result < 0) {
void OnSocketClosed() override; Disconnect();
return;
}
request_buffer_ = request_buffer_.substr(result);
if (request_buffer_.empty())
return;
scoped_refptr<net::StringIOBuffer> buffer =
new net::StringIOBuffer(request_buffer_);
result = socket_->Write(buffer.get(), buffer->size(),
base::Bind(&WebSocketImpl::SendPendingRequests,
base::Unretained(this)));
if (result != net::ERR_IO_PENDING)
SendPendingRequests(result);
}
private: void Disconnect() {
void Connected(int result, scoped_ptr<net::StreamSocket> socket); DCHECK(thread_checker_.CalledOnValidThread());
socket_.reset();
response_message_loop_->PostTask(
FROM_HERE,
base::Bind(&AndroidWebSocket::OnSocketClosed, weak_socket_));
}
scoped_refptr<base::MessageLoopProxy> device_message_loop_; scoped_refptr<base::MessageLoopProxy> response_message_loop_;
scoped_refptr<Device> device_; base::WeakPtr<AndroidWebSocket> weak_socket_;
std::string socket_name_; scoped_ptr<net::StreamSocket> socket_;
std::string url_; std::string response_buffer_;
WebSocketImpl* connection_; std::string request_buffer_;
DelegateWrapper* delegate_wrapper_; base::ThreadChecker thread_checker_;
AndroidWebSocket::Delegate* delegate_; DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl);
}; };
AndroidWebSocketImpl::AndroidWebSocketImpl( AndroidDeviceManager::AndroidWebSocket::AndroidWebSocket(
scoped_refptr<base::MessageLoopProxy> device_message_loop,
scoped_refptr<Device> device, scoped_refptr<Device> device,
const std::string& socket_name, const std::string& socket_name,
const std::string& url, const std::string& url,
AndroidWebSocket::Delegate* delegate) Delegate* delegate)
: device_message_loop_(device_message_loop), : device_(device),
device_(device), socket_impl_(nullptr),
socket_name_(socket_name),
url_(url),
delegate_(delegate), delegate_(delegate),
weak_factory_(this) { weak_factory_(this) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
DCHECK(delegate_); DCHECK(delegate_);
device_->HttpUpgrade( device_->HttpUpgrade(
socket_name_, url_, socket_name, url,
base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr())); base::Bind(&AndroidWebSocket::Connected, weak_factory_.GetWeakPtr()));
} }
void AndroidWebSocketImpl::SendFrame(const std::string& message) { AndroidDeviceManager::AndroidWebSocket::~AndroidWebSocket() {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
device_message_loop_->PostTask( if (socket_impl_)
FROM_HERE, device_->device_message_loop_->DeleteSoon(FROM_HERE, socket_impl_);
base::Bind(&WebSocketImpl::SendFrame,
base::Unretained(connection_), message));
}
void WebSocketImpl::SendFrame(const std::string& message) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!socket_)
return;
int mask = base::RandInt(0, 0x7FFFFFFF);
std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
request_buffer_ += encoded_frame;
if (request_buffer_.length() == encoded_frame.length())
SendPendingRequests(0);
} }
AndroidWebSocketImpl::~AndroidWebSocketImpl() { void AndroidDeviceManager::AndroidWebSocket::SendFrame(
const std::string& message) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
device_message_loop_->DeleteSoon(FROM_HERE, connection_); DCHECK(socket_impl_);
device_message_loop_->DeleteSoon(FROM_HERE, delegate_wrapper_); device_->device_message_loop_->PostTask(
} FROM_HERE,
base::Bind(&WebSocketImpl::SendFrame,
WebSocketImpl::WebSocketImpl(Delegate* delegate, base::Unretained(socket_impl_), message));
scoped_ptr<net::StreamSocket> socket)
: delegate_(delegate),
socket_(socket.Pass()) {
thread_checker_.DetachFromThread();
} }
void AndroidWebSocketImpl::Connected(int result, void AndroidDeviceManager::AndroidWebSocket::Connected(
scoped_ptr<net::StreamSocket> socket) { int result,
scoped_ptr<net::StreamSocket> socket) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
if (result != net::OK || socket == NULL) { if (result != net::OK || !socket.get()) {
OnSocketClosed(); OnSocketClosed();
return; return;
} }
delegate_wrapper_ = new DelegateWrapper(weak_factory_.GetWeakPtr(), socket_impl_ = new WebSocketImpl(base::MessageLoopProxy::current(),
base::MessageLoopProxy::current()); weak_factory_.GetWeakPtr(),
connection_ = new WebSocketImpl(delegate_wrapper_, socket.Pass()); socket.Pass());
device_message_loop_->PostTask( device_->device_message_loop_->PostTask(
FROM_HERE, FROM_HERE,
base::Bind(&WebSocketImpl::StartListening, base::Bind(&WebSocketImpl::StartListening,
base::Unretained(connection_))); base::Unretained(socket_impl_)));
OnSocketOpened();
}
void WebSocketImpl::StartListening() {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(socket_);
scoped_refptr<net::IOBuffer> response_buffer =
new net::IOBuffer(kBufferSize);
int result = socket_->Read(
response_buffer.get(),
kBufferSize,
base::Bind(&WebSocketImpl::OnBytesRead,
base::Unretained(this), response_buffer));
if (result != net::ERR_IO_PENDING)
OnBytesRead(response_buffer, result);
}
void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer,
int result) {
DCHECK(thread_checker_.CalledOnValidThread());
if (result <= 0) {
Disconnect();
return;
}
response_buffer_.append(response_buffer->data(), result);
int bytes_consumed;
std::string output;
WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
response_buffer_, false, &bytes_consumed, &output);
while (parse_result == WebSocket::FRAME_OK) {
response_buffer_ = response_buffer_.substr(bytes_consumed);
delegate_->OnFrameRead(output);
parse_result = WebSocket::DecodeFrameHybi17(
response_buffer_, false, &bytes_consumed, &output);
}
if (parse_result == WebSocket::FRAME_ERROR ||
parse_result == WebSocket::FRAME_CLOSE) {
Disconnect();
return;
}
result = socket_->Read(
response_buffer.get(),
kBufferSize,
base::Bind(&WebSocketImpl::OnBytesRead,
base::Unretained(this), response_buffer));
if (result != net::ERR_IO_PENDING)
OnBytesRead(response_buffer, result);
}
void WebSocketImpl::SendPendingRequests(int result) {
DCHECK(thread_checker_.CalledOnValidThread());
if (result < 0) {
Disconnect();
return;
}
request_buffer_ = request_buffer_.substr(result);
if (request_buffer_.empty())
return;
scoped_refptr<net::StringIOBuffer> buffer =
new net::StringIOBuffer(request_buffer_);
result = socket_->Write(buffer.get(), buffer->size(),
base::Bind(&WebSocketImpl::SendPendingRequests,
base::Unretained(this)));
if (result != net::ERR_IO_PENDING)
SendPendingRequests(result);
}
void WebSocketImpl::Disconnect() {
DCHECK(thread_checker_.CalledOnValidThread());
socket_.reset();
delegate_->OnSocketClosed();
}
void AndroidWebSocketImpl::OnSocketOpened() {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
delegate_->OnSocketOpened(); delegate_->OnSocketOpened();
} }
void AndroidWebSocketImpl::OnFrameRead(const std::string& message) { void AndroidDeviceManager::AndroidWebSocket::OnFrameRead(
const std::string& message) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
delegate_->OnFrameRead(message); delegate_->OnFrameRead(message);
} }
void AndroidWebSocketImpl::OnSocketClosed() { void AndroidDeviceManager::AndroidWebSocket::OnSocketClosed() {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
delegate_->OnSocketClosed(); delegate_->OnSocketClosed();
} }
} // namespace
AndroidDeviceManager::AndroidWebSocket* AndroidDeviceManager::AndroidWebSocket*
AndroidDeviceManager::Device::CreateWebSocket( AndroidDeviceManager::Device::CreateWebSocket(
const std::string& socket, const std::string& socket,
const std::string& url, const std::string& url,
AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) { AndroidWebSocket::Delegate* delegate) {
return new AndroidWebSocketImpl( return new AndroidWebSocket(this, socket, url, delegate);
device_message_loop_, this, socket, url, delegate);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment