Commit a07ef3d0 authored by sergeyu@chromium.org's avatar sergeyu@chromium.org

Refactor channel dispatchers on the host side.

The new HostControlDispatcher manages reading and writing to and from control 
channel on the host side. Similarly HostEventDispatcher is responsible for
reading and, in future, writing to and from the event channel.


Review URL: http://codereview.chromium.org/8468022

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@110625 0039d316-1c4b-4281-b951-d872f2087c98
parent 0b6fba80
...@@ -14,8 +14,7 @@ package remoting.protocol; ...@@ -14,8 +14,7 @@ package remoting.protocol;
// starts. Legacy clients expect to receive this message at the // starts. Legacy clients expect to receive this message at the
// beginning of each session. Current clients ignore it. // beginning of each session. Current clients ignore it.
// //
// TODO(sergeyu): Remove it once all clients are upgraded to the new // TODO(sergeyu): Remove this message. See http://crbug.com/104670 .
// version.
message LocalLoginStatusDeprecated { message LocalLoginStatusDeprecated {
optional bool success = 1; optional bool success = 1;
} }
......
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Implementation of ClientStub using sockets created from jingle connection.
// It sends messages through the socket after serializing it.
//
// Object of this class can only be created by ConnectionToClient.
//
// This class can be used on any thread.
#ifndef REMOTING_PROTOCOL_CLIENT_CONTROL_SENDER_H_
#define REMOTING_PROTOCOL_CLIENT_CONTROL_SENDER_H_
#include "base/basictypes.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "remoting/protocol/client_stub.h"
namespace base {
class MessageLoopProxy;
} // namespace base
namespace net {
class Socket;
} // namespace net
namespace remoting {
namespace protocol {
class BufferedSocketWriter;
// Implementation of ClientStub that sends commands on a socket. Must
// be created and closed on the network thread, but can be used on any
// other thread.
class ClientControlSender : public ClientStub {
public:
explicit ClientControlSender(base::MessageLoopProxy* message_loop,
net::Socket* socket);
virtual ~ClientControlSender();
// Stop writing. Must be called on the network thread when the
// underlying socket is being destroyed.
void Close();
private:
// Buffered socket writer holds the serialized message and send it on the
// right thread.
scoped_refptr<BufferedSocketWriter> buffered_writer_;
DISALLOW_COPY_AND_ASSIGN(ClientControlSender);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_CLIENT_CONTROL_SENDER_H_
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
#include "base/message_loop_proxy.h" #include "base/message_loop_proxy.h"
#include "google/protobuf/message.h" #include "google/protobuf/message.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
#include "remoting/protocol/client_control_sender.h" #include "remoting/protocol/host_control_dispatcher.h"
#include "remoting/protocol/host_message_dispatcher.h" #include "remoting/protocol/host_event_dispatcher.h"
#include "remoting/protocol/host_stub.h" #include "remoting/protocol/host_stub.h"
#include "remoting/protocol/input_stub.h" #include "remoting/protocol/input_stub.h"
...@@ -78,7 +78,7 @@ VideoStub* ConnectionToClient::video_stub() { ...@@ -78,7 +78,7 @@ VideoStub* ConnectionToClient::video_stub() {
// Return pointer to ClientStub. // Return pointer to ClientStub.
ClientStub* ConnectionToClient::client_stub() { ClientStub* ConnectionToClient::client_stub() {
DCHECK(CalledOnValidThread()); DCHECK(CalledOnValidThread());
return client_control_sender_.get(); return control_dispatcher_.get();
} }
void ConnectionToClient::set_host_stub(protocol::HostStub* host_stub) { void ConnectionToClient::set_host_stub(protocol::HostStub* host_stub) {
...@@ -110,11 +110,14 @@ void ConnectionToClient::OnSessionStateChange(protocol::Session::State state) { ...@@ -110,11 +110,14 @@ void ConnectionToClient::OnSessionStateChange(protocol::Session::State state) {
break; break;
case protocol::Session::CONNECTED_CHANNELS: case protocol::Session::CONNECTED_CHANNELS:
client_control_sender_.reset( control_dispatcher_.reset(new HostControlDispatcher());
new ClientControlSender(base::MessageLoopProxy::current(), control_dispatcher_->Init(session_.get());
session_->control_channel())); control_dispatcher_->set_host_stub(host_stub_);
dispatcher_.reset(new HostMessageDispatcher()); input_dispatcher_.reset(new HostEventDispatcher());
dispatcher_->Initialize(this, host_stub_, input_stub_); input_dispatcher_->Init(session_.get());
input_dispatcher_->set_input_stub(input_stub_);
input_dispatcher_->set_sequence_number_callback(base::Bind(
&ConnectionToClient::UpdateSequenceNumber, base::Unretained(this)));
control_connected_ = true; control_connected_ = true;
input_connected_ = true; input_connected_ = true;
...@@ -162,9 +165,9 @@ void ConnectionToClient::CloseOnError() { ...@@ -162,9 +165,9 @@ void ConnectionToClient::CloseOnError() {
} }
void ConnectionToClient::CloseChannels() { void ConnectionToClient::CloseChannels() {
control_dispatcher_.reset();
input_dispatcher_.reset();
video_writer_.reset(); video_writer_.reset();
client_control_sender_.reset();
dispatcher_.reset();
} }
} // namespace protocol } // namespace protocol
......
...@@ -21,11 +21,11 @@ class MessageLoopProxy; ...@@ -21,11 +21,11 @@ class MessageLoopProxy;
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
class ClientControlSender;
class ClientStub; class ClientStub;
class HostStub; class HostStub;
class InputStub; class InputStub;
class HostMessageDispatcher; class HostControlDispatcher;
class HostEventDispatcher;
// This class represents a remote viewer connection to the chromoting // This class represents a remote viewer connection to the chromoting
// host. It sets up all protocol channels and connects them to the // host. It sets up all protocol channels and connects them to the
...@@ -104,12 +104,9 @@ class ConnectionToClient : public base::NonThreadSafe { ...@@ -104,12 +104,9 @@ class ConnectionToClient : public base::NonThreadSafe {
// The libjingle channel used to send and receive data from the remote client. // The libjingle channel used to send and receive data from the remote client.
scoped_ptr<Session> session_; scoped_ptr<Session> session_;
// Writers for outgoing channels.
scoped_ptr<VideoWriter> video_writer_; scoped_ptr<VideoWriter> video_writer_;
scoped_ptr<ClientControlSender> client_control_sender_; scoped_ptr<HostControlDispatcher> control_dispatcher_;
scoped_ptr<HostEventDispatcher> input_dispatcher_;
// Dispatcher for incoming messages.
scoped_ptr<HostMessageDispatcher> dispatcher_;
// State of the channels. // State of the channels.
bool control_connected_; bool control_connected_;
......
...@@ -2,38 +2,49 @@ ...@@ -2,38 +2,49 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
// This stub is thread safe because of the use of BufferedSocketWriter. #include "remoting/protocol/host_control_dispatcher.h"
// BufferedSocketWriter buffers messages and send them on them right thread.
#include "remoting/protocol/client_control_sender.h" #include "base/message_loop_proxy.h"
#include "base/task.h"
#include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/proto/control.pb.h" #include "remoting/proto/control.pb.h"
#include "remoting/proto/internal.pb.h" #include "remoting/proto/internal.pb.h"
#include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/protocol/host_stub.h"
#include "remoting/protocol/session.h"
#include "remoting/protocol/util.h" #include "remoting/protocol/util.h"
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
ClientControlSender::ClientControlSender(base::MessageLoopProxy* message_loop, HostControlDispatcher::HostControlDispatcher()
net::Socket* socket) : host_stub_(NULL),
: buffered_writer_(new BufferedSocketWriter(message_loop)) { writer_(new BufferedSocketWriter(base::MessageLoopProxy::current())) {
buffered_writer_->Init(socket, BufferedSocketWriter::WriteFailedCallback()); }
HostControlDispatcher::~HostControlDispatcher() {
writer_->Close();
}
void HostControlDispatcher::Init(Session* session) {
DCHECK(session);
reader_.Init(session->control_channel(), base::Bind(
&HostControlDispatcher::OnMessageReceived, base::Unretained(this)));
writer_->Init(session->control_channel(),
BufferedSocketWriter::WriteFailedCallback());
// Write legacy BeginSession message. // Write legacy BeginSession message.
// TODO(sergeyu): Remove it. See http://crbug.com/104670 .
protocol::ControlMessage message; protocol::ControlMessage message;
message.mutable_begin_session_deprecated()->mutable_login_status()-> message.mutable_begin_session_deprecated()->mutable_login_status()->
set_success(true); set_success(true);
buffered_writer_->Write(SerializeAndFrameMessage(message), base::Closure()); writer_->Write(SerializeAndFrameMessage(message), base::Closure());
} }
ClientControlSender::~ClientControlSender() { void HostControlDispatcher::OnMessageReceived(
} ControlMessage* message, const base::Closure& done_task) {
DCHECK(host_stub_);
LOG(WARNING) << "Unknown control message received.";
void ClientControlSender::Close() { done_task.Run();
buffered_writer_->Close();
} }
} // namespace protocol } // namespace protocol
......
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_HOST_CONTROL_DISPATCHER_H_
#define REMOTING_PROTOCOL_HOST_CONTROL_DISPATCHER_H_
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "remoting/protocol/client_stub.h"
#include "remoting/protocol/message_reader.h"
namespace base {
class MessageLoopProxy;
} // namespace base
namespace remoting {
namespace protocol {
class BufferedSocketWriter;
class ControlMessage;
class HostStub;
class Session;
// HostControlDispatcher dispatches incoming messages on the control
// channel to HostStub, and also implements ClientStub for outgoing
// messages.
class HostControlDispatcher : public ClientStub {
public:
HostControlDispatcher();
virtual ~HostControlDispatcher();
// Initialize the control channel and the dispatcher for the
// |session|. Doesn't take ownership of |session|.
void Init(Session* session);
// Sets HostStub that will be called for each incoming control
// message. Doesn't take ownership of |host_stub|. It must outlive
// this dispatcher.
void set_host_stub(HostStub* host_stub) { host_stub_ = host_stub; }
private:
// This method is called by |reader_| when a message is received.
void OnMessageReceived(ControlMessage* message,
const base::Closure& done_task);
HostStub* host_stub_;
ProtobufMessageReader<ControlMessage> reader_;
scoped_refptr<BufferedSocketWriter> writer_;
DISALLOW_COPY_AND_ASSIGN(HostControlDispatcher);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_HOST_CONTROL_DISPATCHER_H_
...@@ -2,80 +2,49 @@ ...@@ -2,80 +2,49 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "base/memory/ref_counted.h" #include "remoting/protocol/host_event_dispatcher.h"
#include "net/base/io_buffer.h"
#include "remoting/proto/control.pb.h"
#include "remoting/proto/event.pb.h" #include "remoting/proto/event.pb.h"
#include "remoting/proto/internal.pb.h" #include "remoting/proto/internal.pb.h"
#include "remoting/protocol/connection_to_client.h"
#include "remoting/protocol/host_message_dispatcher.h"
#include "remoting/protocol/host_stub.h"
#include "remoting/protocol/input_stub.h" #include "remoting/protocol/input_stub.h"
#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
HostMessageDispatcher::HostMessageDispatcher() HostEventDispatcher::HostEventDispatcher()
: connection_(NULL), : input_stub_(NULL) {
host_stub_(NULL),
input_stub_(NULL) {
}
HostMessageDispatcher::~HostMessageDispatcher() {
} }
void HostMessageDispatcher::Initialize( HostEventDispatcher::~HostEventDispatcher() {
ConnectionToClient* connection,
HostStub* host_stub, InputStub* input_stub) {
if (!connection || !host_stub || !input_stub ||
!connection->session()->event_channel() ||
!connection->session()->control_channel()) {
return;
}
control_message_reader_.reset(new ProtobufMessageReader<ControlMessage>());
event_message_reader_.reset(new ProtobufMessageReader<EventMessage>());
connection_ = connection;
host_stub_ = host_stub;
input_stub_ = input_stub;
// Initialize the readers on the sockets provided by channels.
event_message_reader_->Init(
connection->session()->event_channel(),
base::Bind(&HostMessageDispatcher::OnEventMessageReceived,
base::Unretained(this)));
control_message_reader_->Init(
connection->session()->control_channel(),
base::Bind(&HostMessageDispatcher::OnControlMessageReceived,
base::Unretained(this)));
} }
void HostMessageDispatcher::OnControlMessageReceived( void HostEventDispatcher::Init(Session* session) {
ControlMessage* message, const base::Closure& done_task) { DCHECK(session);
LOG(WARNING) << "Invalid control message received."; reader_.Init(session->event_channel(), base::Bind(
done_task.Run(); &HostEventDispatcher::OnMessageReceived, base::Unretained(this)));
} }
void HostMessageDispatcher::OnEventMessageReceived( void HostEventDispatcher::OnMessageReceived(
EventMessage* message, const base::Closure& done_task) { EventMessage* message, const base::Closure& done_task) {
DCHECK(input_stub_);
base::ScopedClosureRunner done_runner(done_task); base::ScopedClosureRunner done_runner(done_task);
connection_->UpdateSequenceNumber(message->sequence_number()); sequence_number_callback_.Run(message->sequence_number());
if (message->has_key_event()) { if (message->has_key_event()) {
const KeyEvent& event = message->key_event(); const KeyEvent& event = message->key_event();
if (event.has_keycode() && event.has_pressed()) { if (event.has_keycode() && event.has_pressed()) {
input_stub_->InjectKeyEvent(event); input_stub_->InjectKeyEvent(event);
return; } else {
LOG(WARNING) << "Received invalid key event.";
} }
} else if (message->has_mouse_event()) { } else if (message->has_mouse_event()) {
input_stub_->InjectMouseEvent(message->mouse_event()); input_stub_->InjectMouseEvent(message->mouse_event());
return; } else {
LOG(WARNING) << "Unknown event message received.";
} }
LOG(WARNING) << "Unknown event message received.";
} }
} // namespace protocol } // namespace protocol
......
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_HOST_EVENT_DISPATCHER_H_
#define REMOTING_PROTOCOL_HOST_EVENT_DISPATCHER_H_
#include "base/basictypes.h"
#include "remoting/protocol/message_reader.h"
namespace remoting {
namespace protocol {
class EventMessage;
class InputStub;
class Session;
// HostEventDispatcher dispatches incoming messages on the event
// channel to InputStub.
class HostEventDispatcher {
public:
typedef base::Callback<void(int64)> SequenceNumberCallback;
HostEventDispatcher();
virtual ~HostEventDispatcher();
// Initialize the event channel and the dispatcher for the
// |session|. Caller retains ownership of |session|.
void Init(Session* session);
// Set InputStub that will be called for each incoming input
// message. Doesn't take ownership of |input_stub|. It must outlive
// the dispatcher.
void set_input_stub(InputStub* input_stub) { input_stub_ = input_stub; }
// Set callback to notify of each message's sequence number. The
// callback cannot tear down this object.
void set_sequence_number_callback(const SequenceNumberCallback& value) {
sequence_number_callback_ = value;
}
private:
// This method is called by |reader_| when a message is received.
void OnMessageReceived(EventMessage* message,
const base::Closure& done_task);
InputStub* input_stub_;
SequenceNumberCallback sequence_number_callback_;
ProtobufMessageReader<EventMessage> reader_;
DISALLOW_COPY_AND_ASSIGN(HostEventDispatcher);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_HOST_EVENT_DISPATCHER_H_
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_HOST_MESSAGE_DISPATCHER_H_
#define REMOTING_PROTOCOL_HOST_MESSAGE_DISPATCHER_H_
#include "base/basictypes.h"
#include "base/memory/scoped_ptr.h"
#include "base/task.h"
#include "remoting/protocol/message_reader.h"
namespace remoting {
namespace protocol {
class ConnectionToClient;
class ControlMessage;
class EventMessage;
class HostStub;
class InputStub;
class Session;
// A message dispatcher used to listen for messages received in
// protocol::Session. It dispatches messages to the corresponding
// handler.
//
// Internally it contains an EventStreamReader that decodes data on
// communications channels into protocol buffer messages.
// EventStreamReader is registered with protocol::Session given to it.
//
// Object of this class is owned by ConnectionToClient to dispatch messages
// to itself.
class HostMessageDispatcher {
public:
// Construct a message dispatcher.
HostMessageDispatcher();
virtual ~HostMessageDispatcher();
// Initialize the message dispatcher with the given connection and
// message handlers.
void Initialize(ConnectionToClient* connection,
HostStub* host_stub, InputStub* input_stub);
private:
// This method is called by |control_channel_reader_| when a control
// message is received.
void OnControlMessageReceived(ControlMessage* message,
const base::Closure& done_task);
// This method is called by |event_channel_reader_| when a event
// message is received.
void OnEventMessageReceived(EventMessage* message,
const base::Closure& done_task);
// MessageReader that runs on the control channel. It runs a loop
// that parses data on the channel and then delegates the message to this
// class.
scoped_ptr<ProtobufMessageReader<ControlMessage> > control_message_reader_;
// MessageReader that runs on the event channel.
scoped_ptr<ProtobufMessageReader<EventMessage> > event_message_reader_;
// Connection that this object belongs to.
ConnectionToClient* connection_;
// Stubs for host and input. These objects are not owned.
// They are called on the thread there data is received, i.e. jingle thread.
HostStub* host_stub_;
InputStub* input_stub_;
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_HOST_MESSAGE_DISPATCHER_H_
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
#include "net/socket/stream_socket.h" #include "net/socket/stream_socket.h"
#include "remoting/base/constants.h" #include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/rtp_writer.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
#include "remoting/protocol/util.h" #include "remoting/protocol/util.h"
......
...@@ -740,8 +740,6 @@ ...@@ -740,8 +740,6 @@
'protocol/buffered_socket_writer.h', 'protocol/buffered_socket_writer.h',
'protocol/channel_authenticator.cc', 'protocol/channel_authenticator.cc',
'protocol/channel_authenticator.h', 'protocol/channel_authenticator.h',
'protocol/client_control_sender.cc',
'protocol/client_control_sender.h',
'protocol/client_message_dispatcher.cc', 'protocol/client_message_dispatcher.cc',
'protocol/client_message_dispatcher.h', 'protocol/client_message_dispatcher.h',
'protocol/client_stub.h', 'protocol/client_stub.h',
...@@ -751,10 +749,12 @@ ...@@ -751,10 +749,12 @@
'protocol/connection_to_host.h', 'protocol/connection_to_host.h',
'protocol/content_description.cc', 'protocol/content_description.cc',
'protocol/content_description.h', 'protocol/content_description.h',
'protocol/host_control_dispatcher.cc',
'protocol/host_control_dispatcher.h',
'protocol/host_control_sender.cc', 'protocol/host_control_sender.cc',
'protocol/host_control_sender.h', 'protocol/host_control_sender.h',
'protocol/host_message_dispatcher.cc', 'protocol/host_event_dispatcher.cc',
'protocol/host_message_dispatcher.h', 'protocol/host_event_dispatcher.h',
'protocol/host_stub.h', 'protocol/host_stub.h',
'protocol/input_sender.cc', 'protocol/input_sender.cc',
'protocol/input_sender.h', 'protocol/input_sender.h',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment