Commit 409ac617 authored by sergeyu@chromium.org's avatar sergeyu@chromium.org

Refactor client channel dispatchers.

The new ClientControlDispatcher and ClientEventDispatcher manage control 
and event channels.


Review URL: http://codereview.chromium.org/8574025

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@110648 0039d316-1c4b-4281-b951-d872f2087c98
parent 04ef5154
...@@ -2,12 +2,14 @@ ...@@ -2,12 +2,14 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "remoting/protocol/client_control_dispatcher.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/message_loop_proxy.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
#include "remoting/proto/control.pb.h" #include "remoting/proto/control.pb.h"
#include "remoting/proto/event.pb.h" #include "remoting/proto/event.pb.h"
#include "remoting/proto/internal.pb.h" #include "remoting/proto/internal.pb.h"
#include "remoting/protocol/client_message_dispatcher.h"
#include "remoting/protocol/client_stub.h" #include "remoting/protocol/client_stub.h"
#include "remoting/protocol/input_stub.h" #include "remoting/protocol/input_stub.h"
#include "remoting/protocol/message_reader.h" #include "remoting/protocol/message_reader.h"
...@@ -16,33 +18,37 @@ ...@@ -16,33 +18,37 @@
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
ClientMessageDispatcher::ClientMessageDispatcher() : client_stub_(NULL) { ClientControlDispatcher::ClientControlDispatcher()
: client_stub_(NULL),
writer_(new BufferedSocketWriter(base::MessageLoopProxy::current())) {
} }
ClientMessageDispatcher::~ClientMessageDispatcher() { ClientControlDispatcher::~ClientControlDispatcher() {
writer_->Close();
} }
void ClientMessageDispatcher::Initialize( void ClientControlDispatcher::Init(protocol::Session* session) {
protocol::Session* session, ClientStub* client_stub) { DCHECK(session);
if (!session || !client_stub || !session->control_channel()) {
return;
}
control_message_reader_.reset(new ProtobufMessageReader<ControlMessage>());
client_stub_ = client_stub;
control_message_reader_->Init( // TODO(garykac): Set write failed callback.
session->control_channel(), writer_->Init(session->control_channel(),
base::Bind(&ClientMessageDispatcher::OnControlMessageReceived, BufferedSocketWriter::WriteFailedCallback());
base::Unretained(this))); reader_.Init(session->control_channel(), base::Bind(
return; &ClientControlDispatcher::OnMessageReceived, base::Unretained(this)));
} }
void ClientMessageDispatcher::OnControlMessageReceived( void ClientControlDispatcher::OnMessageReceived(
ControlMessage* message, const base::Closure& done_task) { ControlMessage* message, const base::Closure& done_task) {
DCHECK(client_stub_);
LOG(WARNING) << "Invalid control message received."; base::ScopedClosureRunner done_runner(done_task);
done_task.Run();
if (message->has_begin_session_deprecated()) {
// Host sends legacy BeginSession message for compatibility with
// older clients. Ignore it without warning.
} else {
LOG(WARNING) << "Unknown control message received.";
}
} }
} // namespace protocol } // namespace protocol
......
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_CLIENT_CONTROL_DISPATCHER_H_
#define REMOTING_PROTOCOL_CLIENT_CONTROL_DISPATCHER_H_
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "remoting/protocol/host_stub.h"
#include "remoting/protocol/message_reader.h"
namespace base {
class MessageLoopProxy;
} // namespace base
namespace remoting {
namespace protocol {
class ClientStub;
class ControlMessage;
class BufferedSocketWriter;
class Session;
// ClientControlDispatcher dispatches incoming messages on the control
// channel to ClientStub, and also implements HostStub for outgoing
// messages.
class ClientControlDispatcher : public HostStub {
public:
ClientControlDispatcher();
virtual ~ClientControlDispatcher();
// Initialize the control channel and the dispatcher for the
// |session|. Doesn't take ownership of |session|.
void Init(protocol::Session* session);
// Sets ClientStub that will be called for each incoming control
// message. Doesn't take ownership of |client_stub|. It must outlive
// this dispatcher.
void set_client_stub(ClientStub* client_stub) { client_stub_ = client_stub; }
private:
void OnMessageReceived(ControlMessage* message,
const base::Closure& done_task);
ClientStub* client_stub_;
ProtobufMessageReader<ControlMessage> reader_;
scoped_refptr<BufferedSocketWriter> writer_;
DISALLOW_COPY_AND_ASSIGN(ClientControlDispatcher);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_CLIENT_CONTROL_DISPATCHER_H_
...@@ -2,48 +2,47 @@ ...@@ -2,48 +2,47 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
// This stub is thread safe because of the use of BufferedSocketWriter. #include "remoting/protocol/client_event_dispatcher.h"
// BufferedSocketWriter buffers messages and send them on the right thread.
#include "remoting/protocol/input_sender.h" #include "base/message_loop_proxy.h"
#include "base/task.h"
#include "base/time.h" #include "base/time.h"
#include "remoting/proto/event.pb.h" #include "remoting/proto/event.pb.h"
#include "remoting/proto/internal.pb.h" #include "remoting/proto/internal.pb.h"
#include "remoting/protocol/buffered_socket_writer.h" #include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/protocol/session.h"
#include "remoting/protocol/util.h" #include "remoting/protocol/util.h"
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
InputSender::InputSender(base::MessageLoopProxy* message_loop, ClientEventDispatcher::ClientEventDispatcher()
net::Socket* socket) : writer_(new BufferedSocketWriter(base::MessageLoopProxy::current())) {
: buffered_writer_(new BufferedSocketWriter(message_loop)) { }
// TODO(garykac) Set write failed callback.
DCHECK(socket); ClientEventDispatcher::~ClientEventDispatcher() {
buffered_writer_->Init(socket, BufferedSocketWriter::WriteFailedCallback()); writer_->Close();
} }
InputSender::~InputSender() { void ClientEventDispatcher::Init(Session* session) {
DCHECK(session);
// TODO(garykac): Set write failed callback.
writer_->Init(session->event_channel(),
BufferedSocketWriter::WriteFailedCallback());
} }
void InputSender::InjectKeyEvent(const KeyEvent& event) { void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
EventMessage message; EventMessage message;
message.set_sequence_number(base::Time::Now().ToInternalValue()); message.set_sequence_number(base::Time::Now().ToInternalValue());
message.mutable_key_event()->CopyFrom(event); message.mutable_key_event()->CopyFrom(event);
buffered_writer_->Write(SerializeAndFrameMessage(message), base::Closure()); writer_->Write(SerializeAndFrameMessage(message), base::Closure());
} }
void InputSender::InjectMouseEvent(const MouseEvent& event) { void ClientEventDispatcher::InjectMouseEvent(const MouseEvent& event) {
EventMessage message; EventMessage message;
message.set_sequence_number(base::Time::Now().ToInternalValue()); message.set_sequence_number(base::Time::Now().ToInternalValue());
message.mutable_mouse_event()->CopyFrom(event); message.mutable_mouse_event()->CopyFrom(event);
buffered_writer_->Write(SerializeAndFrameMessage(message), base::Closure()); writer_->Write(SerializeAndFrameMessage(message), base::Closure());
}
void InputSender::Close() {
buffered_writer_->Close();
} }
} // namespace protocol } // namespace protocol
......
...@@ -2,66 +2,45 @@ ...@@ -2,66 +2,45 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
// Implementation of InputStub using sockets created from jingle connection. #ifndef REMOTING_PROTOCOL_CLIENT_INPUT_DISPATCHER_H_
// It sends messages through the socket after serializing it. #define REMOTING_PROTOCOL_CLIENT_INPUT_DISPATCHER_H_
//
// Object of this class can only be created by ConnectionToHost.
//
// This class can be used on any thread. An object of socket is given to this
// class, its lifetime is strictly greater than this object.
#ifndef REMOTING_PROTOCOL_INPUT_SENDER_H_
#define REMOTING_PROTOCOL_INPUT_SENDER_H_
#include "base/basictypes.h" #include "base/basictypes.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "remoting/protocol/input_stub.h" #include "remoting/protocol/input_stub.h"
class Task;
namespace base { namespace base {
class MessageLoopProxy; class MessageLoopProxy;
} // namespace base } // namespace base
namespace net {
class Socket;
} // namespace net
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
class BufferedSocketWriter; class BufferedSocketWriter;
class Session;
// Implementation of InputStub that sends messages on a socket. Must // ClientEventDispatcher manages the event channel on the client
// be created and closed on the network thread, but can be used on any // side. It implements InputStub for outgoing input messages.
// other thread. class ClientEventDispatcher : public InputStub {
class InputSender : public InputStub {
public: public:
// Create a stub using a socket. ClientEventDispatcher();
explicit InputSender(base::MessageLoopProxy* message_loop, virtual ~ClientEventDispatcher();
net::Socket* socket);
virtual ~InputSender(); // Initialize the event channel and the dispatcher for the
// |session|.
void Init(Session* session);
// InputStub implementation. // InputStub implementation.
virtual void InjectKeyEvent(const KeyEvent& event); virtual void InjectKeyEvent(const KeyEvent& event);
virtual void InjectMouseEvent(const MouseEvent& event); virtual void InjectMouseEvent(const MouseEvent& event);
// Stop writing. Must be called on the network thread when the
// underlying socket is being destroyed.
void Close();
private: private:
// Helper method to run the task and delete it afterwards. scoped_refptr<BufferedSocketWriter> writer_;
void RunTask(Task* done);
// Buffered socket writer holds the serialized message and sends it on the
// right thread.
scoped_refptr<BufferedSocketWriter> buffered_writer_;
DISALLOW_COPY_AND_ASSIGN(InputSender); DISALLOW_COPY_AND_ASSIGN(ClientEventDispatcher);
}; };
} // namespace protocol } // namespace protocol
} // namespace remoting } // namespace remoting
#endif // REMOTING_PROTOCOL_INPUT_SENDER_H_ #endif // REMOTING_PROTOCOL_CLIENT_INPUT_DISPATCHER_H_
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_CLIENT_MESSAGE_DISPATCHER_H_
#define REMOTING_PROTOCOL_CLIENT_MESSAGE_DISPATCHER_H_
#include "base/basictypes.h"
#include "base/memory/scoped_ptr.h"
#include "remoting/protocol/message_reader.h"
namespace remoting {
namespace protocol {
class ClientStub;
class ControlMessage;
class Session;
// A message dispatcher used to listen for messages received in
// protocol::Session. It dispatches messages to the corresponding
// handler.
//
// Internally it contains an EventStreamReader that decodes data on
// communications channels into protocol buffer messages.
// EventStreamReader is registered with protocol::Session given to it.
//
// Object of this class is owned by ConnectionToHost.
class ClientMessageDispatcher {
public:
// Construct a message dispatcher.
ClientMessageDispatcher();
virtual ~ClientMessageDispatcher();
// Initialize the message dispatcher with the given connection and
// message handlers.
void Initialize(protocol::Session* session, ClientStub* client_stub);
private:
void OnControlMessageReceived(ControlMessage* message,
const base::Closure& done_task);
// MessageReader that runs on the control channel. It runs a loop
// that parses data on the channel and then calls the corresponding handler
// in this class.
scoped_ptr<ProtobufMessageReader<ControlMessage> > control_message_reader_;
// Stubs for client and input. These objects are not owned.
// They are called on the thread there data is received, i.e. jingle thread.
ClientStub* client_stub_;
DISALLOW_COPY_AND_ASSIGN(ClientMessageDispatcher);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_CLIENT_MESSAGE_DISPATCHER_H_
...@@ -12,10 +12,9 @@ ...@@ -12,10 +12,9 @@
#include "remoting/jingle_glue/javascript_signal_strategy.h" #include "remoting/jingle_glue/javascript_signal_strategy.h"
#include "remoting/jingle_glue/xmpp_signal_strategy.h" #include "remoting/jingle_glue/xmpp_signal_strategy.h"
#include "remoting/protocol/auth_token_utils.h" #include "remoting/protocol/auth_token_utils.h"
#include "remoting/protocol/client_message_dispatcher.h" #include "remoting/protocol/client_control_dispatcher.h"
#include "remoting/protocol/client_event_dispatcher.h"
#include "remoting/protocol/client_stub.h" #include "remoting/protocol/client_stub.h"
#include "remoting/protocol/host_control_sender.h"
#include "remoting/protocol/input_sender.h"
#include "remoting/protocol/jingle_session_manager.h" #include "remoting/protocol/jingle_session_manager.h"
#include "remoting/protocol/pepper_session_manager.h" #include "remoting/protocol/pepper_session_manager.h"
#include "remoting/protocol/video_reader.h" #include "remoting/protocol/video_reader.h"
...@@ -46,11 +45,11 @@ ConnectionToHost::~ConnectionToHost() { ...@@ -46,11 +45,11 @@ ConnectionToHost::~ConnectionToHost() {
} }
InputStub* ConnectionToHost::input_stub() { InputStub* ConnectionToHost::input_stub() {
return input_sender_.get(); return input_dispatcher_.get();
} }
HostStub* ConnectionToHost::host_stub() { HostStub* ConnectionToHost::host_stub() {
return host_control_sender_.get(); return control_dispatcher_.get();
} }
void ConnectionToHost::Connect(scoped_refptr<XmppProxy> xmpp_proxy, void ConnectionToHost::Connect(scoped_refptr<XmppProxy> xmpp_proxy,
...@@ -200,12 +199,11 @@ void ConnectionToHost::OnSessionStateChange( ...@@ -200,12 +199,11 @@ void ConnectionToHost::OnSessionStateChange(
break; break;
case Session::CONNECTED_CHANNELS: case Session::CONNECTED_CHANNELS:
host_control_sender_.reset( control_dispatcher_.reset(new ClientControlDispatcher());
new HostControlSender(message_loop_, session_->control_channel())); control_dispatcher_->Init(session_.get());
input_sender_.reset( control_dispatcher_->set_client_stub(client_stub_);
new InputSender(message_loop_, session_->event_channel())); input_dispatcher_.reset(new ClientEventDispatcher());
dispatcher_.reset(new ClientMessageDispatcher()); input_dispatcher_->Init(session_.get());
dispatcher_->Initialize(session_.get(), client_stub_);
control_connected_ = true; control_connected_ = true;
input_connected_ = true; input_connected_ = true;
...@@ -243,12 +241,8 @@ void ConnectionToHost::CloseOnError(Error error) { ...@@ -243,12 +241,8 @@ void ConnectionToHost::CloseOnError(Error error) {
} }
void ConnectionToHost::CloseChannels() { void ConnectionToHost::CloseChannels() {
if (input_sender_.get()) control_dispatcher_.reset();
input_sender_->Close(); input_dispatcher_.reset();
if (host_control_sender_.get())
host_control_sender_->Close();
video_reader_.reset(); video_reader_.reset();
} }
......
...@@ -31,11 +31,10 @@ class VideoPacket; ...@@ -31,11 +31,10 @@ class VideoPacket;
namespace protocol { namespace protocol {
class ClientMessageDispatcher; class ClientControlDispatcher;
class ClientEventDispatcher;
class ClientStub; class ClientStub;
class HostControlSender;
class HostStub; class HostStub;
class InputSender;
class InputStub; class InputStub;
class SessionConfig; class SessionConfig;
class VideoReader; class VideoReader;
...@@ -152,13 +151,9 @@ class ConnectionToHost : public SignalStrategy::StatusObserver, ...@@ -152,13 +151,9 @@ class ConnectionToHost : public SignalStrategy::StatusObserver,
scoped_ptr<SessionManager> session_manager_; scoped_ptr<SessionManager> session_manager_;
scoped_ptr<Session> session_; scoped_ptr<Session> session_;
// Handlers for incoming messages.
scoped_ptr<VideoReader> video_reader_; scoped_ptr<VideoReader> video_reader_;
scoped_ptr<ClientMessageDispatcher> dispatcher_; scoped_ptr<ClientControlDispatcher> control_dispatcher_;
scoped_ptr<ClientEventDispatcher> input_dispatcher_;
// Senders for outgoing messages.
scoped_ptr<InputSender> input_sender_;
scoped_ptr<HostControlSender> host_control_sender_;
// Internal state of the connection. // Internal state of the connection.
State state_; State state_;
......
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// This stub is thread safe because of the use of BufferedSocketWriter.
// BufferedSocketWriter buffers messages and send them on them right thread.
#include "remoting/protocol/host_control_sender.h"
#include "base/task.h"
#include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/proto/control.pb.h"
#include "remoting/proto/internal.pb.h"
#include "remoting/protocol/util.h"
namespace remoting {
namespace protocol {
HostControlSender::HostControlSender(base::MessageLoopProxy* message_loop,
net::Socket* socket)
: buffered_writer_(new BufferedSocketWriter(message_loop)) {
buffered_writer_->Init(socket, BufferedSocketWriter::WriteFailedCallback());
}
HostControlSender::~HostControlSender() {
}
void HostControlSender::Close() {
buffered_writer_->Close();
}
} // namespace protocol
} // namespace remoting
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Implementation of HostStub using sockets created from jingle connection.
// It sends messages through the socket after serializing it.
//
// Object of this class can only be created by ConnectionToHost.
//
// This class can be used on any thread.
#ifndef REMOTING_PROTOCOL_HOST_STUB_IMPL_H_
#define REMOTING_PROTOCOL_HOST_STUB_IMPL_H_
#include "base/basictypes.h"
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "remoting/protocol/host_stub.h"
namespace base {
class MessageLoopProxy;
} // namespace base
namespace net {
class Socket;
} // namespace net
namespace remoting {
namespace protocol {
class BufferedSocketWriter;
// Implementation of HostStub that sends commands on a socket. Must be
// created and closed on the network thread, but can be used on any
// other thread.
class HostControlSender : public HostStub {
public:
explicit HostControlSender(base::MessageLoopProxy* message_loop,
net::Socket* socket);
virtual ~HostControlSender();
// Stop writing. Must be called on the network thread when the
// underlying socket is being destroyed.
void Close();
private:
// Buffered socket writer holds the serialized message and send it on the
// right thread.
scoped_refptr<BufferedSocketWriter> buffered_writer_;
DISALLOW_COPY_AND_ASSIGN(HostControlSender);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_HOST_STUB_IMPL_H_
...@@ -740,8 +740,10 @@ ...@@ -740,8 +740,10 @@
'protocol/buffered_socket_writer.h', 'protocol/buffered_socket_writer.h',
'protocol/channel_authenticator.cc', 'protocol/channel_authenticator.cc',
'protocol/channel_authenticator.h', 'protocol/channel_authenticator.h',
'protocol/client_message_dispatcher.cc', 'protocol/client_control_dispatcher.cc',
'protocol/client_message_dispatcher.h', 'protocol/client_control_dispatcher.h',
'protocol/client_event_dispatcher.cc',
'protocol/client_event_dispatcher.h',
'protocol/client_stub.h', 'protocol/client_stub.h',
'protocol/connection_to_client.cc', 'protocol/connection_to_client.cc',
'protocol/connection_to_client.h', 'protocol/connection_to_client.h',
...@@ -751,13 +753,9 @@ ...@@ -751,13 +753,9 @@
'protocol/content_description.h', 'protocol/content_description.h',
'protocol/host_control_dispatcher.cc', 'protocol/host_control_dispatcher.cc',
'protocol/host_control_dispatcher.h', 'protocol/host_control_dispatcher.h',
'protocol/host_control_sender.cc',
'protocol/host_control_sender.h',
'protocol/host_event_dispatcher.cc', 'protocol/host_event_dispatcher.cc',
'protocol/host_event_dispatcher.h', 'protocol/host_event_dispatcher.h',
'protocol/host_stub.h', 'protocol/host_stub.h',
'protocol/input_sender.cc',
'protocol/input_sender.h',
'protocol/input_stub.h', 'protocol/input_stub.h',
'protocol/jingle_channel_connector.h', 'protocol/jingle_channel_connector.h',
'protocol/jingle_datagram_connector.cc', 'protocol/jingle_datagram_connector.cc',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment