Commit 182ec8f2 authored by sergeyu@chromium.org's avatar sergeyu@chromium.org

Remove video_channel() from Session interface

BUG=None
TEST=Unittests.

Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=96089

Review URL: http://codereview.chromium.org/7508044

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@96301 0039d316-1c4b-4281-b951-d872f2087c98
parent de719a40
...@@ -12,4 +12,8 @@ const char kChromotingTokenDefaultServiceName[] = "chromiumsync"; ...@@ -12,4 +12,8 @@ const char kChromotingTokenDefaultServiceName[] = "chromiumsync";
const char kChromotingXmlNamespace[] = "google:remoting"; const char kChromotingXmlNamespace[] = "google:remoting";
const char kVideoChannelName[] = "video";
const char kVideoRtpChannelName[] = "videortp";
const char kVideoRtcpChannelName[] = "videortpc";
} // namespace remoting } // namespace remoting
...@@ -17,6 +17,11 @@ extern const char kChromotingTokenDefaultServiceName[]; ...@@ -17,6 +17,11 @@ extern const char kChromotingTokenDefaultServiceName[];
// Namespace used for chromoting XMPP stanzas. // Namespace used for chromoting XMPP stanzas.
extern const char kChromotingXmlNamespace[]; extern const char kChromotingXmlNamespace[];
// Channel names.
extern const char kVideoChannelName[];
extern const char kVideoRtpChannelName[];
extern const char kVideoRtcpChannelName[];
} // namespace remoting } // namespace remoting
#endif // REMOTING_BASE_CONSTANTS_H_ #endif // REMOTING_BASE_CONSTANTS_H_
...@@ -41,7 +41,8 @@ class BufferedSocketWriterBase ...@@ -41,7 +41,8 @@ class BufferedSocketWriterBase
// Initializes the writer. Must be called on the thread that will be used // Initializes the writer. Must be called on the thread that will be used
// to access the socket in the future. |callback| will be called after each // to access the socket in the future. |callback| will be called after each
// failed write. // failed write. Caller retains ownership of |socket|.
// TODO(sergeyu): Change it so that it take ownership of |socket|.
void Init(net::Socket* socket, WriteFailedCallback* callback); void Init(net::Socket* socket, WriteFailedCallback* callback);
// Puts a new data chunk in the buffer. Returns false and doesn't enqueue // Puts a new data chunk in the buffer. Returns false and doesn't enqueue
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "remoting/protocol/connection_to_client.h" #include "remoting/protocol/connection_to_client.h"
#include "base/bind.h"
#include "google/protobuf/message.h" #include "google/protobuf/message.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
#include "remoting/protocol/client_control_sender.h" #include "remoting/protocol/client_control_sender.h"
...@@ -26,7 +27,10 @@ ConnectionToClient::ConnectionToClient(MessageLoop* message_loop, ...@@ -26,7 +27,10 @@ ConnectionToClient::ConnectionToClient(MessageLoop* message_loop,
: loop_(message_loop), : loop_(message_loop),
handler_(handler), handler_(handler),
host_stub_(NULL), host_stub_(NULL),
input_stub_(NULL) { input_stub_(NULL),
control_connected_(false),
input_connected_(false),
video_connected_(false) {
DCHECK(loop_); DCHECK(loop_);
DCHECK(handler_); DCHECK(handler_);
} }
...@@ -90,33 +94,63 @@ void ConnectionToClient::OnSessionStateChange(protocol::Session::State state) { ...@@ -90,33 +94,63 @@ void ConnectionToClient::OnSessionStateChange(protocol::Session::State state) {
DCHECK(handler_); DCHECK(handler_);
switch(state) { switch(state) {
case protocol::Session::CONNECTING: case protocol::Session::CONNECTING:
// Don't care about this message.
break; break;
// Don't care about this message.
case protocol::Session::CONNECTED: case protocol::Session::CONNECTED:
client_control_sender_.reset(
new ClientControlSender(session_->control_channel()));
video_writer_.reset(VideoWriter::Create(session_->config())); video_writer_.reset(VideoWriter::Create(session_->config()));
video_writer_->Init(session_.get()); video_writer_->Init(
session_.get(), base::Bind(&ConnectionToClient::OnVideoInitialized,
base::Unretained(this)));
break;
case protocol::Session::CONNECTED_CHANNELS:
client_control_sender_.reset(
new ClientControlSender(session_->control_channel()));
dispatcher_.reset(new HostMessageDispatcher()); dispatcher_.reset(new HostMessageDispatcher());
dispatcher_->Initialize(this, host_stub_, input_stub_); dispatcher_->Initialize(this, host_stub_, input_stub_);
handler_->OnConnectionOpened(this); control_connected_ = true;
input_connected_ = true;
NotifyIfChannelsReady();
break; break;
case protocol::Session::CLOSED: case protocol::Session::CLOSED:
CloseChannels(); CloseChannels();
handler_->OnConnectionClosed(this); handler_->OnConnectionClosed(this);
break; break;
case protocol::Session::FAILED: case protocol::Session::FAILED:
CloseChannels(); CloseOnError();
handler_->OnConnectionFailed(this);
break; break;
default: default:
// We shouldn't receive other states. // We shouldn't receive other states.
NOTREACHED(); NOTREACHED();
} }
} }
void ConnectionToClient::OnVideoInitialized(bool successful) {
if (!successful) {
LOG(ERROR) << "Failed to connect video channel";
CloseOnError();
return;
}
video_connected_ = true;
NotifyIfChannelsReady();
}
void ConnectionToClient::NotifyIfChannelsReady() {
if (control_connected_ && input_connected_ && video_connected_)
handler_->OnConnectionOpened(this);
}
void ConnectionToClient::CloseOnError() {
CloseChannels();
handler_->OnConnectionFailed(this);
}
void ConnectionToClient::CloseChannels() { void ConnectionToClient::CloseChannels() {
if (video_writer_.get()) if (video_writer_.get())
video_writer_->Close(); video_writer_->Close();
......
...@@ -90,16 +90,15 @@ class ConnectionToClient : ...@@ -90,16 +90,15 @@ class ConnectionToClient :
// Callback for protocol Session. // Callback for protocol Session.
void OnSessionStateChange(Session::State state); void OnSessionStateChange(Session::State state);
// Stops writing in the channels. // Callback for VideoReader::Init().
void CloseChannels(); void OnVideoInitialized(bool successful);
// The libjingle channel used to send and receive data from the remote client. void NotifyIfChannelsReady();
scoped_ptr<Session> session_;
scoped_ptr<VideoWriter> video_writer_; void CloseOnError();
// ClientStub for sending messages to the client. // Stops writing in the channels.
scoped_ptr<ClientControlSender> client_control_sender_; void CloseChannels();
// The message loop that this object runs on. // The message loop that this object runs on.
MessageLoop* loop_; MessageLoop* loop_;
...@@ -107,15 +106,25 @@ class ConnectionToClient : ...@@ -107,15 +106,25 @@ class ConnectionToClient :
// Event handler for handling events sent from this object. // Event handler for handling events sent from this object.
EventHandler* handler_; EventHandler* handler_;
// HostStub for receiving control events from the client. // Stubs that are called for incoming messages.
HostStub* host_stub_; HostStub* host_stub_;
// InputStub for receiving input events from the client.
InputStub* input_stub_; InputStub* input_stub_;
// Dispatcher for submitting messages to stubs. // The libjingle channel used to send and receive data from the remote client.
scoped_ptr<Session> session_;
// Writers for outgoing channels.
scoped_ptr<VideoWriter> video_writer_;
scoped_ptr<ClientControlSender> client_control_sender_;
// Dispatcher for incoming messages.
scoped_ptr<HostMessageDispatcher> dispatcher_; scoped_ptr<HostMessageDispatcher> dispatcher_;
// State of the channels.
bool control_connected_;
bool input_connected_;
bool video_connected_;
DISALLOW_COPY_AND_ASSIGN(ConnectionToClient); DISALLOW_COPY_AND_ASSIGN(ConnectionToClient);
}; };
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "base/message_loop.h" #include "base/message_loop.h"
#include "remoting/base/base_mock_objects.h" #include "remoting/base/base_mock_objects.h"
#include "remoting/base/constants.h"
#include "remoting/protocol/fake_session.h" #include "remoting/protocol/fake_session.h"
#include "remoting/protocol/connection_to_client.h" #include "remoting/protocol/connection_to_client.h"
#include "remoting/protocol/protocol_mock_objects.h" #include "remoting/protocol/protocol_mock_objects.h"
...@@ -34,6 +35,8 @@ class ConnectionToClientTest : public testing::Test { ...@@ -34,6 +35,8 @@ class ConnectionToClientTest : public testing::Test {
EXPECT_CALL(handler_, OnConnectionOpened(viewer_.get())); EXPECT_CALL(handler_, OnConnectionOpened(viewer_.get()));
session_->state_change_callback()->Run( session_->state_change_callback()->Run(
protocol::Session::CONNECTED); protocol::Session::CONNECTED);
session_->state_change_callback()->Run(
protocol::Session::CONNECTED_CHANNELS);
message_loop_.RunAllPending(); message_loop_.RunAllPending();
} }
...@@ -60,7 +63,9 @@ TEST_F(ConnectionToClientTest, SendUpdateStream) { ...@@ -60,7 +63,9 @@ TEST_F(ConnectionToClientTest, SendUpdateStream) {
// Verify that something has been written. // Verify that something has been written.
// TODO(sergeyu): Verify that the correct data has been written. // TODO(sergeyu): Verify that the correct data has been written.
EXPECT_GT(session_->video_channel()->written_data().size(), 0u); ASSERT_TRUE(session_->GetStreamChannel(kVideoChannelName));
EXPECT_GT(session_->GetStreamChannel(kVideoChannelName)->
written_data().size(), 0u);
// And then close the connection to ConnectionToClient. // And then close the connection to ConnectionToClient.
viewer_->Disconnect(); viewer_->Disconnect();
......
...@@ -38,11 +38,13 @@ ConnectionToHost::ConnectionToHost( ...@@ -38,11 +38,13 @@ ConnectionToHost::ConnectionToHost(
host_resolver_factory_(host_resolver_factory), host_resolver_factory_(host_resolver_factory),
port_allocator_session_factory_(session_factory), port_allocator_session_factory_(session_factory),
allow_nat_traversal_(allow_nat_traversal), allow_nat_traversal_(allow_nat_traversal),
state_(STATE_EMPTY),
event_callback_(NULL), event_callback_(NULL),
dispatcher_(new ClientMessageDispatcher()),
client_stub_(NULL), client_stub_(NULL),
video_stub_(NULL) { video_stub_(NULL),
state_(STATE_EMPTY),
control_connected_(false),
input_connected_(false),
video_connected_(false) {
} }
ConnectionToHost::~ConnectionToHost() { ConnectionToHost::~ConnectionToHost() {
...@@ -176,8 +178,7 @@ void ConnectionToHost::OnSessionStateChange( ...@@ -176,8 +178,7 @@ void ConnectionToHost::OnSessionStateChange(
switch (state) { switch (state) {
case Session::FAILED: case Session::FAILED:
state_ = STATE_FAILED; state_ = STATE_FAILED;
CloseChannels(); CloseOnError();
event_callback_->OnConnectionFailed(this);
break; break;
case Session::CLOSED: case Session::CLOSED:
...@@ -187,14 +188,24 @@ void ConnectionToHost::OnSessionStateChange( ...@@ -187,14 +188,24 @@ void ConnectionToHost::OnSessionStateChange(
break; break;
case Session::CONNECTED: case Session::CONNECTED:
state_ = STATE_CONNECTED;
// Initialize reader and writer. // Initialize reader and writer.
video_reader_.reset(VideoReader::Create(session_->config())); video_reader_.reset(VideoReader::Create(session_->config()));
video_reader_->Init(session_.get(), video_stub_); video_reader_->Init(
session_.get(), video_stub_,
base::Bind(&ConnectionToHost::OnVideoChannelInitialized,
base::Unretained(this)));
break;
case Session::CONNECTED_CHANNELS:
state_ = STATE_CONNECTED;
host_control_sender_.reset( host_control_sender_.reset(
new HostControlSender(session_->control_channel())); new HostControlSender(session_->control_channel()));
dispatcher_.reset(new ClientMessageDispatcher());
dispatcher_->Initialize(session_.get(), client_stub_); dispatcher_->Initialize(session_.get(), client_stub_);
event_callback_->OnConnectionOpened(this);
control_connected_ = true;
input_connected_ = true;
NotifyIfChannelsReady();
break; break;
default: default:
...@@ -203,12 +214,36 @@ void ConnectionToHost::OnSessionStateChange( ...@@ -203,12 +214,36 @@ void ConnectionToHost::OnSessionStateChange(
} }
} }
void ConnectionToHost::OnVideoChannelInitialized(bool successful) {
if (!successful) {
LOG(ERROR) << "Failed to connect video channel";
CloseOnError();
return;
}
video_connected_ = true;
NotifyIfChannelsReady();
}
void ConnectionToHost::NotifyIfChannelsReady() {
if (control_connected_ && input_connected_ && video_connected_)
event_callback_->OnConnectionOpened(this);
}
void ConnectionToHost::CloseOnError() {
state_ = STATE_FAILED;
CloseChannels();
event_callback_->OnConnectionFailed(this);
}
void ConnectionToHost::CloseChannels() { void ConnectionToHost::CloseChannels() {
if (input_sender_.get()) if (input_sender_.get())
input_sender_->Close(); input_sender_->Close();
if (host_control_sender_.get()) if (host_control_sender_.get())
host_control_sender_->Close(); host_control_sender_->Close();
video_reader_.reset();
} }
void ConnectionToHost::OnClientAuthenticated() { void ConnectionToHost::OnClientAuthenticated() {
......
...@@ -124,9 +124,16 @@ class ConnectionToHost : public SignalStrategy::StatusObserver, ...@@ -124,9 +124,16 @@ class ConnectionToHost : public SignalStrategy::StatusObserver,
// Callback for |session_|. // Callback for |session_|.
void OnSessionStateChange(Session::State state); void OnSessionStateChange(Session::State state);
// Callback for VideoReader::Init().
void OnVideoChannelInitialized(bool successful);
void NotifyIfChannelsReady();
// Callback for |video_reader_|. // Callback for |video_reader_|.
void OnVideoPacket(VideoPacket* packet); void OnVideoPacket(VideoPacket* packet);
void CloseOnError();
// Stops writing in the channels. // Stops writing in the channels.
void CloseChannels(); void CloseChannels();
...@@ -137,44 +144,36 @@ class ConnectionToHost : public SignalStrategy::StatusObserver, ...@@ -137,44 +144,36 @@ class ConnectionToHost : public SignalStrategy::StatusObserver,
scoped_ptr<PortAllocatorSessionFactory> port_allocator_session_factory_; scoped_ptr<PortAllocatorSessionFactory> port_allocator_session_factory_;
bool allow_nat_traversal_; bool allow_nat_traversal_;
// Internal state of the connection. std::string host_jid_;
State state_; std::string host_public_key_;
std::string access_code_;
HostEventCallback* event_callback_;
// Stub for incoming messages.
ClientStub* client_stub_;
VideoStub* video_stub_;
scoped_ptr<SignalStrategy> signal_strategy_; scoped_ptr<SignalStrategy> signal_strategy_;
std::string local_jid_; std::string local_jid_;
scoped_ptr<SessionManager> session_manager_; scoped_ptr<SessionManager> session_manager_;
scoped_ptr<Session> session_; scoped_ptr<Session> session_;
// Handlers for incoming messages.
scoped_ptr<VideoReader> video_reader_; scoped_ptr<VideoReader> video_reader_;
HostEventCallback* event_callback_;
std::string host_jid_;
std::string host_public_key_;
std::string access_code_;
scoped_ptr<ClientMessageDispatcher> dispatcher_; scoped_ptr<ClientMessageDispatcher> dispatcher_;
//////////////////////////////////////////////////////////////////////////// // Senders for outgoing messages.
// User input event channel interface
// Stub for sending input event messages to the host.
scoped_ptr<InputSender> input_sender_; scoped_ptr<InputSender> input_sender_;
////////////////////////////////////////////////////////////////////////////
// Protocol control channel interface
// Stub for sending control messages to the host.
scoped_ptr<HostControlSender> host_control_sender_; scoped_ptr<HostControlSender> host_control_sender_;
// Stub for receiving control messages from the host. // Internal state of the connection.
ClientStub* client_stub_; State state_;
////////////////////////////////////////////////////////////////////////////
// Video channel interface
// Stub for receiving video packets from the host. // State of the channels.
VideoStub* video_stub_; bool control_connected_;
bool input_connected_;
bool video_connected_;
private: private:
DISALLOW_COPY_AND_ASSIGN(ConnectionToHost); DISALLOW_COPY_AND_ASSIGN(ConnectionToHost);
......
...@@ -70,6 +70,67 @@ bool FakeSocket::SetSendBufferSize(int32 size) { ...@@ -70,6 +70,67 @@ bool FakeSocket::SetSendBufferSize(int32 size) {
return false; return false;
} }
int FakeSocket::Connect(net::CompletionCallback* callback) {
return net::OK;
}
void FakeSocket::Disconnect() {
NOTIMPLEMENTED();
}
bool FakeSocket::IsConnected() const {
return true;
}
bool FakeSocket::IsConnectedAndIdle() const {
NOTIMPLEMENTED();
return false;
}
int FakeSocket::GetPeerAddress(
net::AddressList* address) const {
NOTIMPLEMENTED();
return net::ERR_FAILED;
}
int FakeSocket::GetLocalAddress(
net::IPEndPoint* address) const {
NOTIMPLEMENTED();
return net::ERR_FAILED;
}
const net::BoundNetLog& FakeSocket::NetLog() const {
return net_log_;
}
void FakeSocket::SetSubresourceSpeculation() {
NOTIMPLEMENTED();
}
void FakeSocket::SetOmniboxSpeculation() {
NOTIMPLEMENTED();
}
bool FakeSocket::WasEverUsed() const {
NOTIMPLEMENTED();
return true;
}
bool FakeSocket::UsingTCPFastOpen() const {
NOTIMPLEMENTED();
return true;
}
int64 FakeSocket::NumBytesRead() const {
NOTIMPLEMENTED();
return 0;
}
base::TimeDelta FakeSocket::GetConnectTimeMicros() const {
NOTIMPLEMENTED();
return base::TimeDelta();
}
FakeUdpSocket::FakeUdpSocket() FakeUdpSocket::FakeUdpSocket()
: read_pending_(false), : read_pending_(false),
input_pos_(0) { input_pos_(0) {
...@@ -135,21 +196,31 @@ FakeSession::FakeSession() ...@@ -135,21 +196,31 @@ FakeSession::FakeSession()
FakeSession::~FakeSession() { } FakeSession::~FakeSession() { }
void FakeSession::SetStateChangeCallback( FakeSocket* FakeSession::GetStreamChannel(const std::string& name) {
StateChangeCallback* callback) { return stream_channels_[name];
}
FakeUdpSocket* FakeSession::GetDatagramChannel(const std::string& name) {
return datagram_channels_[name];
}
void FakeSession::SetStateChangeCallback(StateChangeCallback* callback) {
callback_.reset(callback); callback_.reset(callback);
} }
void FakeSession::CreateStreamChannel( void FakeSession::CreateStreamChannel(
const std::string& name, const StreamChannelCallback& callback) { const std::string& name, const StreamChannelCallback& callback) {
NOTIMPLEMENTED(); LOG(ERROR) << " creating channel " << name;
callback.Run(name, NULL); FakeSocket* channel = new FakeSocket();
stream_channels_[name] = channel;
callback.Run(name, channel);
} }
void FakeSession::CreateDatagramChannel( void FakeSession::CreateDatagramChannel(
const std::string& name, const DatagramChannelCallback& callback) { const std::string& name, const DatagramChannelCallback& callback) {
NOTIMPLEMENTED(); FakeUdpSocket* channel = new FakeUdpSocket();
callback.Run(name, NULL); datagram_channels_[name] = channel;
callback.Run(name, channel);
} }
FakeSocket* FakeSession::control_channel() { FakeSocket* FakeSession::control_channel() {
...@@ -160,18 +231,6 @@ FakeSocket* FakeSession::event_channel() { ...@@ -160,18 +231,6 @@ FakeSocket* FakeSession::event_channel() {
return &event_channel_; return &event_channel_;
} }
FakeSocket* FakeSession::video_channel() {
return &video_channel_;
}
FakeUdpSocket* FakeSession::video_rtp_channel() {
return &video_rtp_channel_;
}
FakeUdpSocket* FakeSession::video_rtcp_channel() {
return &video_rtcp_channel_;
}
const std::string& FakeSession::jid() { const std::string& FakeSession::jid() {
return jid_; return jid_;
} }
......
...@@ -5,11 +5,13 @@ ...@@ -5,11 +5,13 @@
#ifndef REMOTING_PROTOCOL_FAKE_SESSION_H_ #ifndef REMOTING_PROTOCOL_FAKE_SESSION_H_
#define REMOTING_PROTOCOL_FAKE_SESSION_H_ #define REMOTING_PROTOCOL_FAKE_SESSION_H_
#include <map>
#include <string> #include <string>
#include <vector> #include <vector>
#include "base/memory/scoped_ptr.h" #include "base/memory/scoped_ptr.h"
#include "net/socket/socket.h" #include "net/socket/socket.h"
#include "net/socket/stream_socket.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
namespace remoting { namespace remoting {
...@@ -22,7 +24,7 @@ extern const char kTestJid[]; ...@@ -22,7 +24,7 @@ extern const char kTestJid[];
// Read() reads data from another buffer that can be set with AppendInputData(). // Read() reads data from another buffer that can be set with AppendInputData().
// Pending reads are supported, so if there is a pending read AppendInputData() // Pending reads are supported, so if there is a pending read AppendInputData()
// calls the read callback. // calls the read callback.
class FakeSocket : public net::Socket { class FakeSocket : public net::StreamSocket {
public: public:
FakeSocket(); FakeSocket();
virtual ~FakeSocket(); virtual ~FakeSocket();
...@@ -42,6 +44,21 @@ class FakeSocket : public net::Socket { ...@@ -42,6 +44,21 @@ class FakeSocket : public net::Socket {
virtual bool SetReceiveBufferSize(int32 size); virtual bool SetReceiveBufferSize(int32 size);
virtual bool SetSendBufferSize(int32 size); virtual bool SetSendBufferSize(int32 size);
// net::StreamSocket interface.
virtual int Connect(net::CompletionCallback* callback) OVERRIDE;
virtual void Disconnect() OVERRIDE;
virtual bool IsConnected() const OVERRIDE;
virtual bool IsConnectedAndIdle() const OVERRIDE;
virtual int GetPeerAddress(net::AddressList* address) const OVERRIDE;
virtual int GetLocalAddress(net::IPEndPoint* address) const OVERRIDE;
virtual const net::BoundNetLog& NetLog() const OVERRIDE;
virtual void SetSubresourceSpeculation() OVERRIDE;
virtual void SetOmniboxSpeculation() OVERRIDE;
virtual bool WasEverUsed() const OVERRIDE;
virtual bool UsingTCPFastOpen() const OVERRIDE;
virtual int64 NumBytesRead() const OVERRIDE;
virtual base::TimeDelta GetConnectTimeMicros() const OVERRIDE;
private: private:
bool read_pending_; bool read_pending_;
scoped_refptr<net::IOBuffer> read_buffer_; scoped_refptr<net::IOBuffer> read_buffer_;
...@@ -51,6 +68,10 @@ class FakeSocket : public net::Socket { ...@@ -51,6 +68,10 @@ class FakeSocket : public net::Socket {
std::string written_data_; std::string written_data_;
std::string input_data_; std::string input_data_;
int input_pos_; int input_pos_;
net::BoundNetLog net_log_;
DISALLOW_COPY_AND_ASSIGN(FakeSocket);
}; };
// FakeUdpSocket is similar to FakeSocket but behaves as UDP socket. All written // FakeUdpSocket is similar to FakeSocket but behaves as UDP socket. All written
...@@ -86,6 +107,8 @@ class FakeUdpSocket : public net::Socket { ...@@ -86,6 +107,8 @@ class FakeUdpSocket : public net::Socket {
std::vector<std::string> written_packets_; std::vector<std::string> written_packets_;
std::vector<std::string> input_packets_; std::vector<std::string> input_packets_;
int input_pos_; int input_pos_;
DISALLOW_COPY_AND_ASSIGN(FakeUdpSocket);
}; };
// FakeSession is a dummy protocol::Session that uses FakeSocket for all // FakeSession is a dummy protocol::Session that uses FakeSocket for all
...@@ -103,6 +126,10 @@ class FakeSession : public Session { ...@@ -103,6 +126,10 @@ class FakeSession : public Session {
bool is_closed() const { return closed_; } bool is_closed() const { return closed_; }
FakeSocket* GetStreamChannel(const std::string& name);
FakeUdpSocket* GetDatagramChannel(const std::string& name);
// Session interface.
virtual void SetStateChangeCallback(StateChangeCallback* callback); virtual void SetStateChangeCallback(StateChangeCallback* callback);
virtual void CreateStreamChannel( virtual void CreateStreamChannel(
...@@ -112,10 +139,6 @@ class FakeSession : public Session { ...@@ -112,10 +139,6 @@ class FakeSession : public Session {
virtual FakeSocket* control_channel(); virtual FakeSocket* control_channel();
virtual FakeSocket* event_channel(); virtual FakeSocket* event_channel();
virtual FakeSocket* video_channel();
virtual FakeUdpSocket* video_rtp_channel();
virtual FakeUdpSocket* video_rtcp_channel();
virtual const std::string& jid(); virtual const std::string& jid();
...@@ -140,9 +163,9 @@ class FakeSession : public Session { ...@@ -140,9 +163,9 @@ class FakeSession : public Session {
MessageLoop* message_loop_; MessageLoop* message_loop_;
FakeSocket control_channel_; FakeSocket control_channel_;
FakeSocket event_channel_; FakeSocket event_channel_;
FakeSocket video_channel_;
FakeUdpSocket video_rtp_channel_; std::map<std::string, FakeSocket*> stream_channels_;
FakeUdpSocket video_rtcp_channel_; std::map<std::string, FakeUdpSocket*> datagram_channels_;
std::string initiator_token_; std::string initiator_token_;
std::string receiver_token_; std::string receiver_token_;
...@@ -151,6 +174,8 @@ class FakeSession : public Session { ...@@ -151,6 +174,8 @@ class FakeSession : public Session {
std::string jid_; std::string jid_;
bool closed_; bool closed_;
DISALLOW_COPY_AND_ASSIGN(FakeSession);
}; };
} // namespace protocol } // namespace protocol
......
...@@ -33,7 +33,6 @@ namespace { ...@@ -33,7 +33,6 @@ namespace {
const char kControlChannelName[] = "control"; const char kControlChannelName[] = "control";
const char kEventChannelName[] = "event"; const char kEventChannelName[] = "event";
const char kVideoChannelName[] = "video";
const int kMasterKeyLength = 16; const int kMasterKeyLength = 16;
const int kChannelKeyLength = 16; const int kChannelKeyLength = 16;
...@@ -160,7 +159,6 @@ void JingleSession::CloseInternal(int result, bool failed) { ...@@ -160,7 +159,6 @@ void JingleSession::CloseInternal(int result, bool failed) {
control_channel_socket_.reset(); control_channel_socket_.reset();
event_channel_socket_.reset(); event_channel_socket_.reset();
video_channel_socket_.reset();
STLDeleteContainerPairSecondPointers(channel_connectors_.begin(), STLDeleteContainerPairSecondPointers(channel_connectors_.begin(),
channel_connectors_.end()); channel_connectors_.end());
...@@ -224,23 +222,6 @@ net::Socket* JingleSession::event_channel() { ...@@ -224,23 +222,6 @@ net::Socket* JingleSession::event_channel() {
return event_channel_socket_.get(); return event_channel_socket_.get();
} }
net::Socket* JingleSession::video_channel() {
DCHECK(CalledOnValidThread());
return video_channel_socket_.get();
}
net::Socket* JingleSession::video_rtp_channel() {
DCHECK(CalledOnValidThread());
NOTREACHED();
return NULL;
}
net::Socket* JingleSession::video_rtcp_channel() {
DCHECK(CalledOnValidThread());
NOTREACHED();
return NULL;
}
const std::string& JingleSession::jid() { const std::string& JingleSession::jid() {
// TODO(sergeyu): Fix ChromotingHost so that it doesn't call this // TODO(sergeyu): Fix ChromotingHost so that it doesn't call this
// method on invalid thread and uncomment this DCHECK. // method on invalid thread and uncomment this DCHECK.
...@@ -453,6 +434,8 @@ void JingleSession::OnAccept() { ...@@ -453,6 +434,8 @@ void JingleSession::OnAccept() {
} }
CreateChannels(); CreateChannels();
SetState(CONNECTED);
} }
void JingleSession::OnTerminate() { void JingleSession::OnTerminate() {
...@@ -461,6 +444,8 @@ void JingleSession::OnTerminate() { ...@@ -461,6 +444,8 @@ void JingleSession::OnTerminate() {
} }
void JingleSession::AcceptConnection() { void JingleSession::AcceptConnection() {
SetState(CONNECTING);
if (!jingle_session_manager_->AcceptConnection(this, cricket_session_)) { if (!jingle_session_manager_->AcceptConnection(this, cricket_session_)) {
Close(); Close();
// Release session so that JingleSessionManager::SessionDestroyed() // Release session so that JingleSessionManager::SessionDestroyed()
...@@ -469,9 +454,6 @@ void JingleSession::AcceptConnection() { ...@@ -469,9 +454,6 @@ void JingleSession::AcceptConnection() {
delete this; delete this;
return; return;
} }
// Set state to CONNECTING if the session is being accepted.
SetState(CONNECTING);
} }
void JingleSession::AddChannelConnector( void JingleSession::AddChannelConnector(
...@@ -512,7 +494,6 @@ void JingleSession::CreateChannels() { ...@@ -512,7 +494,6 @@ void JingleSession::CreateChannels() {
base::Unretained(this))); base::Unretained(this)));
CreateStreamChannel(kControlChannelName, stream_callback); CreateStreamChannel(kControlChannelName, stream_callback);
CreateStreamChannel(kEventChannelName, stream_callback); CreateStreamChannel(kEventChannelName, stream_callback);
CreateStreamChannel(kVideoChannelName, stream_callback);
} }
void JingleSession::OnStreamChannelConnected(const std::string& name, void JingleSession::OnStreamChannelConnected(const std::string& name,
...@@ -533,18 +514,12 @@ void JingleSession::OnChannelConnected(const std::string& name, ...@@ -533,18 +514,12 @@ void JingleSession::OnChannelConnected(const std::string& name,
control_channel_socket_.reset(socket); control_channel_socket_.reset(socket);
} else if (name == kEventChannelName) { } else if (name == kEventChannelName) {
event_channel_socket_.reset(socket); event_channel_socket_.reset(socket);
} else if (name == kVideoChannelName) {
video_channel_socket_.reset(socket);
} else { } else {
NOTREACHED(); NOTREACHED();
} }
if (control_channel_socket_.get() && event_channel_socket_.get() && if (control_channel_socket_.get() && event_channel_socket_.get())
video_channel_socket_.get()) { SetState(CONNECTED_CHANNELS);
// TODO(sergeyu): State should be set to CONNECTED in OnAccept
// independent of the channels state.
SetState(CONNECTED);
}
} }
const cricket::ContentInfo* JingleSession::GetContentInfo() const { const cricket::ContentInfo* JingleSession::GetContentInfo() const {
......
...@@ -38,9 +38,6 @@ class JingleSession : public protocol::Session, ...@@ -38,9 +38,6 @@ class JingleSession : public protocol::Session,
const DatagramChannelCallback& callback) OVERRIDE; const DatagramChannelCallback& callback) OVERRIDE;
virtual net::Socket* control_channel() OVERRIDE; virtual net::Socket* control_channel() OVERRIDE;
virtual net::Socket* event_channel() OVERRIDE; virtual net::Socket* event_channel() OVERRIDE;
virtual net::Socket* video_channel() OVERRIDE;
virtual net::Socket* video_rtp_channel() OVERRIDE;
virtual net::Socket* video_rtcp_channel() OVERRIDE;
virtual const std::string& jid() OVERRIDE; virtual const std::string& jid() OVERRIDE;
virtual const CandidateSessionConfig* candidate_config() OVERRIDE; virtual const CandidateSessionConfig* candidate_config() OVERRIDE;
virtual const SessionConfig* config() OVERRIDE; virtual const SessionConfig* config() OVERRIDE;
...@@ -185,7 +182,6 @@ class JingleSession : public protocol::Session, ...@@ -185,7 +182,6 @@ class JingleSession : public protocol::Session,
scoped_ptr<net::Socket> control_channel_socket_; scoped_ptr<net::Socket> control_channel_socket_;
scoped_ptr<net::Socket> event_channel_socket_; scoped_ptr<net::Socket> event_channel_socket_;
scoped_ptr<net::Socket> video_channel_socket_;
ScopedRunnableMethodFactory<JingleSession> task_factory_; ScopedRunnableMethodFactory<JingleSession> task_factory_;
......
...@@ -4,7 +4,10 @@ ...@@ -4,7 +4,10 @@
#include "remoting/protocol/protobuf_video_reader.h" #include "remoting/protocol/protobuf_video_reader.h"
#include "base/bind.h"
#include "base/task.h" #include "base/task.h"
#include "net/socket/stream_socket.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
...@@ -19,11 +22,28 @@ ProtobufVideoReader::ProtobufVideoReader(VideoPacketFormat::Encoding encoding) ...@@ -19,11 +22,28 @@ ProtobufVideoReader::ProtobufVideoReader(VideoPacketFormat::Encoding encoding)
ProtobufVideoReader::~ProtobufVideoReader() { } ProtobufVideoReader::~ProtobufVideoReader() { }
void ProtobufVideoReader::Init(protocol::Session* session, void ProtobufVideoReader::Init(protocol::Session* session,
VideoStub* video_stub) { VideoStub* video_stub,
reader_.Init( const InitializedCallback& callback) {
session->video_channel(), initialized_callback_ = callback;
NewCallback(this, &ProtobufVideoReader::OnNewData));
video_stub_ = video_stub; video_stub_ = video_stub;
session->CreateStreamChannel(
kVideoChannelName,
base::Bind(&ProtobufVideoReader::OnChannelReady, base::Unretained(this)));
}
void ProtobufVideoReader::OnChannelReady(const std::string& name,
net::StreamSocket* socket) {
DCHECK_EQ(name, std::string(kVideoChannelName));
if (!socket) {
initialized_callback_.Run(false);
return;
}
DCHECK(!channel_.get());
channel_.reset(socket);
reader_.Init(socket, NewCallback(this, &ProtobufVideoReader::OnNewData));
initialized_callback_.Run(true);
} }
void ProtobufVideoReader::OnNewData(VideoPacket* packet, Task* done_task) { void ProtobufVideoReader::OnNewData(VideoPacket* packet, Task* done_task) {
......
// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_ #ifndef REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_
#define REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_ #define REMOTING_PROTOCOL_PROTOBUF_VIDEO_READER_H_
#include "base/compiler_specific.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/message_reader.h" #include "remoting/protocol/message_reader.h"
#include "remoting/protocol/video_reader.h" #include "remoting/protocol/video_reader.h"
namespace net {
class StreamSocket;
} // namespace net
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
...@@ -20,13 +25,21 @@ class ProtobufVideoReader : public VideoReader { ...@@ -20,13 +25,21 @@ class ProtobufVideoReader : public VideoReader {
virtual ~ProtobufVideoReader(); virtual ~ProtobufVideoReader();
// VideoReader interface. // VideoReader interface.
virtual void Init(protocol::Session* session, VideoStub* video_stub); virtual void Init(protocol::Session* session,
VideoStub* video_stub,
const InitializedCallback& callback) OVERRIDE;
private: private:
void OnChannelReady(const std::string& name, net::StreamSocket* socket);
void OnNewData(VideoPacket* packet, Task* done_task); void OnNewData(VideoPacket* packet, Task* done_task);
InitializedCallback initialized_callback_;
VideoPacketFormat::Encoding encoding_; VideoPacketFormat::Encoding encoding_;
// TODO(sergeyu): Remove |channel_| and let |reader_| own it.
scoped_ptr<net::StreamSocket> channel_;
ProtobufMessageReader<VideoPacket> reader_; ProtobufMessageReader<VideoPacket> reader_;
// The stub that processes all received packets. // The stub that processes all received packets.
......
// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "remoting/protocol/protobuf_video_writer.h" #include "remoting/protocol/protobuf_video_writer.h"
#include "base/bind.h"
#include "base/task.h" #include "base/task.h"
#include "net/socket/stream_socket.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/rtp_writer.h" #include "remoting/protocol/rtp_writer.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
...@@ -17,14 +20,35 @@ ProtobufVideoWriter::ProtobufVideoWriter() { } ...@@ -17,14 +20,35 @@ ProtobufVideoWriter::ProtobufVideoWriter() { }
ProtobufVideoWriter::~ProtobufVideoWriter() { } ProtobufVideoWriter::~ProtobufVideoWriter() { }
void ProtobufVideoWriter::Init(protocol::Session* session) { void ProtobufVideoWriter::Init(protocol::Session* session,
const InitializedCallback& callback) {
initialized_callback_ = callback;
session->CreateStreamChannel(
kVideoChannelName,
base::Bind(&ProtobufVideoWriter::OnChannelReady, base::Unretained(this)));
}
void ProtobufVideoWriter::OnChannelReady(const std::string& name,
net::StreamSocket* socket) {
DCHECK_EQ(name, std::string(kVideoChannelName));
if (!socket) {
initialized_callback_.Run(false);
return;
}
DCHECK(!channel_.get());
channel_.reset(socket);
buffered_writer_ = new BufferedSocketWriter(); buffered_writer_ = new BufferedSocketWriter();
// TODO(sergeyu): Provide WriteFailedCallback for the buffered writer. // TODO(sergeyu): Provide WriteFailedCallback for the buffered writer.
buffered_writer_->Init(session->video_channel(), NULL); buffered_writer_->Init(socket, NULL);
initialized_callback_.Run(true);
} }
void ProtobufVideoWriter::Close() { void ProtobufVideoWriter::Close() {
buffered_writer_->Close(); buffered_writer_->Close();
channel_.reset();
} }
void ProtobufVideoWriter::ProcessVideoPacket(const VideoPacket* packet, void ProtobufVideoWriter::ProcessVideoPacket(const VideoPacket* packet,
......
...@@ -5,10 +5,17 @@ ...@@ -5,10 +5,17 @@
#ifndef REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_ #ifndef REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_
#define REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_ #define REMOTING_PROTOCOL_PROTOBUF_VIDEO_WRITER_H_
#include <string>
#include "base/compiler_specific.h" #include "base/compiler_specific.h"
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "remoting/protocol/video_writer.h" #include "remoting/protocol/video_writer.h"
namespace net {
class StreamSocket;
} // namespace net
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
...@@ -21,7 +28,8 @@ class ProtobufVideoWriter : public VideoWriter { ...@@ -21,7 +28,8 @@ class ProtobufVideoWriter : public VideoWriter {
virtual ~ProtobufVideoWriter(); virtual ~ProtobufVideoWriter();
// VideoWriter interface. // VideoWriter interface.
virtual void Init(protocol::Session* session) OVERRIDE; virtual void Init(protocol::Session* session,
const InitializedCallback& callback) OVERRIDE;
virtual void Close() OVERRIDE; virtual void Close() OVERRIDE;
// VideoStub interface. // VideoStub interface.
...@@ -30,6 +38,13 @@ class ProtobufVideoWriter : public VideoWriter { ...@@ -30,6 +38,13 @@ class ProtobufVideoWriter : public VideoWriter {
virtual int GetPendingPackets() OVERRIDE; virtual int GetPendingPackets() OVERRIDE;
private: private:
void OnChannelReady(const std::string& name, net::StreamSocket* socket);
InitializedCallback initialized_callback_;
// TODO(sergeyu): Remove |channel_| and let |buffered_writer_| own it.
scoped_ptr<net::StreamSocket> channel_;
scoped_refptr<BufferedSocketWriter> buffered_writer_; scoped_refptr<BufferedSocketWriter> buffered_writer_;
DISALLOW_COPY_AND_ASSIGN(ProtobufVideoWriter); DISALLOW_COPY_AND_ASSIGN(ProtobufVideoWriter);
......
...@@ -4,7 +4,9 @@ ...@@ -4,7 +4,9 @@
#include "remoting/protocol/rtp_video_reader.h" #include "remoting/protocol/rtp_video_reader.h"
#include "base/bind.h"
#include "base/task.h" #include "base/task.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
...@@ -22,7 +24,8 @@ RtpVideoReader::PacketsQueueEntry::PacketsQueueEntry() ...@@ -22,7 +24,8 @@ RtpVideoReader::PacketsQueueEntry::PacketsQueueEntry()
} }
RtpVideoReader::RtpVideoReader() RtpVideoReader::RtpVideoReader()
: last_sequence_number_(0), : initialized_(false),
last_sequence_number_(0),
video_stub_(NULL) { video_stub_(NULL) {
} }
...@@ -30,11 +33,47 @@ RtpVideoReader::~RtpVideoReader() { ...@@ -30,11 +33,47 @@ RtpVideoReader::~RtpVideoReader() {
ResetQueue(); ResetQueue();
} }
void RtpVideoReader::Init(protocol::Session* session, VideoStub* video_stub) { void RtpVideoReader::Init(protocol::Session* session,
rtp_reader_.Init(session->video_rtp_channel(), VideoStub* video_stub,
NewCallback(this, &RtpVideoReader::OnRtpPacket)); const InitializedCallback& callback) {
rtcp_writer_.Init(session->video_rtcp_channel()); initialized_callback_ = callback;
video_stub_ = video_stub; video_stub_ = video_stub;
session->CreateDatagramChannel(
kVideoRtpChannelName,
base::Bind(&RtpVideoReader::OnChannelReady, base::Unretained(this)));
session->CreateDatagramChannel(
kVideoRtcpChannelName,
base::Bind(&RtpVideoReader::OnChannelReady, base::Unretained(this)));
}
void RtpVideoReader::OnChannelReady(const std::string& name,
net::Socket* socket) {
if (!socket) {
if (!initialized_) {
initialized_ = true;
initialized_callback_.Run(false);
}
return;
}
if (name == kVideoRtpChannelName) {
DCHECK(!rtp_channel_.get());
rtp_channel_.reset(socket);
rtp_reader_.Init(socket, NewCallback(this, &RtpVideoReader::OnRtpPacket));
} else if (name == kVideoRtcpChannelName) {
DCHECK(!rtcp_channel_.get());
rtcp_channel_.reset(socket);
rtcp_writer_.Init(socket);
} else {
NOTREACHED();
}
if (rtp_channel_.get() && rtcp_channel_.get()) {
DCHECK(!initialized_);
initialized_ = true;
initialized_callback_.Run(true);
}
} }
void RtpVideoReader::ResetQueue() { void RtpVideoReader::ResetQueue() {
......
...@@ -5,7 +5,9 @@ ...@@ -5,7 +5,9 @@
#ifndef REMOTING_PROTOCOL_RTP_VIDEO_READER_H_ #ifndef REMOTING_PROTOCOL_RTP_VIDEO_READER_H_
#define REMOTING_PROTOCOL_RTP_VIDEO_READER_H_ #define REMOTING_PROTOCOL_RTP_VIDEO_READER_H_
#include "base/compiler_specific.h"
#include "base/time.h" #include "base/time.h"
#include "base/memory/scoped_ptr.h"
#include "remoting/protocol/rtcp_writer.h" #include "remoting/protocol/rtcp_writer.h"
#include "remoting/protocol/rtp_reader.h" #include "remoting/protocol/rtp_reader.h"
#include "remoting/protocol/video_reader.h" #include "remoting/protocol/video_reader.h"
...@@ -13,6 +15,8 @@ ...@@ -13,6 +15,8 @@
namespace remoting { namespace remoting {
namespace protocol { namespace protocol {
class RtcpWriter;
class RtpReader;
class Session; class Session;
class RtpVideoReader : public VideoReader { class RtpVideoReader : public VideoReader {
...@@ -21,7 +25,9 @@ class RtpVideoReader : public VideoReader { ...@@ -21,7 +25,9 @@ class RtpVideoReader : public VideoReader {
virtual ~RtpVideoReader(); virtual ~RtpVideoReader();
// VideoReader interface. // VideoReader interface.
virtual void Init(protocol::Session* session, VideoStub* video_stub); virtual void Init(protocol::Session* session,
VideoStub* video_stub,
const InitializedCallback& callback) OVERRIDE;
private: private:
friend class RtpVideoReaderTest; friend class RtpVideoReaderTest;
...@@ -44,6 +50,8 @@ class RtpVideoReader : public VideoReader { ...@@ -44,6 +50,8 @@ class RtpVideoReader : public VideoReader {
typedef std::deque<PacketsQueueEntry> PacketsQueue; typedef std::deque<PacketsQueueEntry> PacketsQueue;
void OnChannelReady(const std::string& name, net::Socket* socket);
void OnRtpPacket(const RtpPacket* rtp_packet); void OnRtpPacket(const RtpPacket* rtp_packet);
void CheckFullPacket(const PacketsQueue::iterator& pos); void CheckFullPacket(const PacketsQueue::iterator& pos);
void RebuildVideoPacket(const PacketsQueue::iterator& from, void RebuildVideoPacket(const PacketsQueue::iterator& from,
...@@ -56,7 +64,12 @@ class RtpVideoReader : public VideoReader { ...@@ -56,7 +64,12 @@ class RtpVideoReader : public VideoReader {
// |kReceiverReportsIntervalMs|. // |kReceiverReportsIntervalMs|.
void SendReceiverReportIf(); void SendReceiverReportIf();
bool initialized_;
InitializedCallback initialized_callback_;
scoped_ptr<net::Socket> rtp_channel_;
RtpReader rtp_reader_; RtpReader rtp_reader_;
scoped_ptr<net::Socket> rtcp_channel_;
RtcpWriter rtcp_writer_; RtcpWriter rtcp_writer_;
PacketsQueue packets_queue_; PacketsQueue packets_queue_;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <vector> #include <vector>
#include "base/bind.h"
#include "base/message_loop.h" #include "base/message_loop.h"
#include "base/string_number_conversions.h" #include "base/string_number_conversions.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
...@@ -65,10 +66,16 @@ class RtpVideoReaderTest : public testing::Test, ...@@ -65,10 +66,16 @@ class RtpVideoReaderTest : public testing::Test,
void Reset() { void Reset() {
session_.reset(new FakeSession()); session_.reset(new FakeSession());
reader_.reset(new RtpVideoReader()); reader_.reset(new RtpVideoReader());
reader_->Init(session_.get(), this); reader_->Init(session_.get(), this,
base::Bind(&RtpVideoReaderTest::OnReaderInitialized,
base::Unretained(this)));
received_packets_.clear(); received_packets_.clear();
} }
void OnReaderInitialized(bool success) {
ASSERT_TRUE(success);
}
void InitData(int size) { void InitData(int size) {
data_.resize(size); data_.resize(size);
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
......
...@@ -4,9 +4,11 @@ ...@@ -4,9 +4,11 @@
#include "remoting/protocol/rtp_video_writer.h" #include "remoting/protocol/rtp_video_writer.h"
#include "base/bind.h"
#include "base/task.h" #include "base/task.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
#include "remoting/base/compound_buffer.h" #include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/rtp_writer.h" #include "remoting/protocol/rtp_writer.h"
#include "remoting/protocol/session.h" #include "remoting/protocol/session.h"
...@@ -18,18 +20,56 @@ namespace { ...@@ -18,18 +20,56 @@ namespace {
const int kMtu = 1200; const int kMtu = 1200;
} // namespace } // namespace
RtpVideoWriter::RtpVideoWriter() { } RtpVideoWriter::RtpVideoWriter()
: initialized_(false) {
}
RtpVideoWriter::~RtpVideoWriter() { RtpVideoWriter::~RtpVideoWriter() {
Close(); Close();
} }
void RtpVideoWriter::Init(protocol::Session* session) { void RtpVideoWriter::Init(protocol::Session* session,
rtp_writer_.Init(session->video_rtp_channel()); const InitializedCallback& callback) {
initialized_callback_ = callback;
session->CreateDatagramChannel(
kVideoRtpChannelName,
base::Bind(&RtpVideoWriter::OnChannelReady, base::Unretained(this)));
session->CreateDatagramChannel(
kVideoRtcpChannelName,
base::Bind(&RtpVideoWriter::OnChannelReady, base::Unretained(this)));
}
void RtpVideoWriter::OnChannelReady(const std::string& name,
net::Socket* socket) {
if (!socket) {
if (!initialized_) {
initialized_ = true;
initialized_callback_.Run(false);
}
return;
}
if (name == kVideoRtpChannelName) {
DCHECK(!rtp_channel_.get());
rtp_channel_.reset(socket);
rtp_writer_.Init(socket);
} else if (name == kVideoRtcpChannelName) {
DCHECK(!rtcp_channel_.get());
rtcp_channel_.reset(socket);
// TODO(sergeyu): Use RTCP channel somehow.
}
if (rtp_channel_.get() && rtcp_channel_.get()) {
DCHECK(!initialized_);
initialized_ = true;
initialized_callback_.Run(true);
}
} }
void RtpVideoWriter::Close() { void RtpVideoWriter::Close() {
rtp_writer_.Close(); rtp_writer_.Close();
rtp_channel_.reset();
rtcp_channel_.reset();
} }
void RtpVideoWriter::ProcessVideoPacket(const VideoPacket* packet, Task* done) { void RtpVideoWriter::ProcessVideoPacket(const VideoPacket* packet, Task* done) {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#ifndef REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_ #ifndef REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_
#define REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_ #define REMOTING_PROTOCOL_RTP_VIDEO_WRITER_H_
#include "base/memory/scoped_ptr.h"
#include "remoting/protocol/rtp_writer.h" #include "remoting/protocol/rtp_writer.h"
#include "remoting/protocol/video_writer.h" #include "remoting/protocol/video_writer.h"
...@@ -19,7 +20,8 @@ class RtpVideoWriter : public VideoWriter { ...@@ -19,7 +20,8 @@ class RtpVideoWriter : public VideoWriter {
virtual ~RtpVideoWriter(); virtual ~RtpVideoWriter();
// VideoWriter interface. // VideoWriter interface.
virtual void Init(protocol::Session* session) OVERRIDE; virtual void Init(protocol::Session* session,
const InitializedCallback& callback) OVERRIDE;
virtual void Close() OVERRIDE; virtual void Close() OVERRIDE;
// VideoStub interface. // VideoStub interface.
...@@ -28,7 +30,14 @@ class RtpVideoWriter : public VideoWriter { ...@@ -28,7 +30,14 @@ class RtpVideoWriter : public VideoWriter {
virtual int GetPendingPackets() OVERRIDE; virtual int GetPendingPackets() OVERRIDE;
private: private:
void OnChannelReady(const std::string& name, net::Socket* socket);
bool initialized_;
InitializedCallback initialized_callback_;
scoped_ptr<net::Socket> rtp_channel_;
RtpWriter rtp_writer_; RtpWriter rtp_writer_;
scoped_ptr<net::Socket> rtcp_channel_;
DISALLOW_COPY_AND_ASSIGN(RtpVideoWriter); DISALLOW_COPY_AND_ASSIGN(RtpVideoWriter);
}; };
......
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "base/bind.h"
#include "base/message_loop.h" #include "base/message_loop.h"
#include "base/string_number_conversions.h" #include "base/string_number_conversions.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h" #include "remoting/proto/video.pb.h"
#include "remoting/protocol/fake_session.h" #include "remoting/protocol/fake_session.h"
#include "remoting/protocol/rtp_reader.h" #include "remoting/protocol/rtp_reader.h"
...@@ -57,7 +59,13 @@ class RtpVideoWriterTest : public testing::Test { ...@@ -57,7 +59,13 @@ class RtpVideoWriterTest : public testing::Test {
virtual void SetUp() { virtual void SetUp() {
session_.reset(new FakeSession()); session_.reset(new FakeSession());
writer_.Init(session_.get()); writer_.Init(session_.get(),
base::Bind(&RtpVideoWriterTest::OnWriterInitialized,
base::Unretained(this)));
}
void OnWriterInitialized(bool success) {
ASSERT_TRUE(success);
} }
void InitData(int size) { void InitData(int size) {
...@@ -88,7 +96,7 @@ class RtpVideoWriterTest : public testing::Test { ...@@ -88,7 +96,7 @@ class RtpVideoWriterTest : public testing::Test {
void VerifyResult(const ExpectedPacket expected[], void VerifyResult(const ExpectedPacket expected[],
int count) { int count) {
const vector<string>& rtp_packets = const vector<string>& rtp_packets =
session_->video_rtp_channel()->written_packets(); session_->GetDatagramChannel(kVideoRtpChannelName)->written_packets();
ASSERT_EQ(count, static_cast<int>(rtp_packets.size())); ASSERT_EQ(count, static_cast<int>(rtp_packets.size()));
int pos = 0; int pos = 0;
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
......
...@@ -28,10 +28,24 @@ namespace protocol { ...@@ -28,10 +28,24 @@ namespace protocol {
class Session : public base::NonThreadSafe { class Session : public base::NonThreadSafe {
public: public:
enum State { enum State {
// Created, but not connecting yet.
INITIALIZING, INITIALIZING,
// Sent or received session-initiate, but haven't sent or received
// session-accept.
CONNECTING, CONNECTING,
// Session has been accepted, but channels are connected yet.
CONNECTED, CONNECTED,
// Video and control channels are connected.
// TODO(sergeyu): Remove this state.
CONNECTED_CHANNELS,
// Session has been closed.
CLOSED, CLOSED,
// Connection has failed.
FAILED, FAILED,
}; };
...@@ -64,9 +78,6 @@ class Session : public base::NonThreadSafe { ...@@ -64,9 +78,6 @@ class Session : public base::NonThreadSafe {
// instead. // instead.
virtual net::Socket* control_channel() = 0; virtual net::Socket* control_channel() = 0;
virtual net::Socket* event_channel() = 0; virtual net::Socket* event_channel() = 0;
virtual net::Socket* video_channel() = 0;
virtual net::Socket* video_rtp_channel() = 0;
virtual net::Socket* video_rtcp_channel() = 0;
// JID of the other side. // JID of the other side.
virtual const std::string& jid() = 0; virtual const std::string& jid() = 0;
......
// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
...@@ -25,12 +25,17 @@ class VideoReader { ...@@ -25,12 +25,17 @@ class VideoReader {
public: public:
static VideoReader* Create(const SessionConfig* config); static VideoReader* Create(const SessionConfig* config);
// The callback is called when initialization is finished. The
// parameter is set to true on success.
typedef base::Callback<void(bool)> InitializedCallback;
virtual ~VideoReader(); virtual ~VideoReader();
// Initializies the reader. Doesn't take ownership of either |connection| // Initializies the reader. Doesn't take ownership of either |connection|
// or |video_stub|. // or |video_stub|.
virtual void Init(Session* session, virtual void Init(Session* session,
VideoStub* video_stub) = 0; VideoStub* video_stub,
const InitializedCallback& callback) = 0;
protected: protected:
VideoReader() { } VideoReader() { }
......
// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#define REMOTING_PROTOCOL_VIDEO_WRITER_H_ #define REMOTING_PROTOCOL_VIDEO_WRITER_H_
#include "base/basictypes.h" #include "base/basictypes.h"
#include "base/callback.h"
#include "remoting/protocol/video_stub.h" #include "remoting/protocol/video_stub.h"
namespace remoting { namespace remoting {
...@@ -23,10 +24,14 @@ class VideoWriter : public VideoStub { ...@@ -23,10 +24,14 @@ class VideoWriter : public VideoStub {
public: public:
virtual ~VideoWriter(); virtual ~VideoWriter();
// The callback is called when initialization is finished. The
// parameter is set to true on success.
typedef base::Callback<void(bool)> InitializedCallback;
static VideoWriter* Create(const SessionConfig* config); static VideoWriter* Create(const SessionConfig* config);
// Initializes the writer. // Initializes the writer.
virtual void Init(Session* session) = 0; virtual void Init(Session* session, const InitializedCallback& callback) = 0;
// Stops writing. Must be called on the network thread before this // Stops writing. Must be called on the network thread before this
// object is destroyed. // object is destroyed.
......
# Following tests create real libjingle connections, and libjingle has # Following tests create real libjingle connections, and libjingle has
# hardcoded timeouts, so these tests fail under TSan. # hardcoded timeouts, so these tests fail under TSan.
JingleSessionTest.Connect JingleSessionTest.Connect
JingleSessionTest.TestControlChannel JingleSessionTest.TestUdpChannel
JingleSessionTest.TestEventChannel JingleSessionTest.TestTcpChannel
JingleSessionTest.TestVideoChannel
JingleSessionTest.TestVideoRtpChannel
JingleSessionTest.TestSpeed JingleSessionTest.TestSpeed
# This test fails on an assertion, see http://crbug.com/57266 # This test fails on an assertion, see http://crbug.com/57266
......
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
# hardcoded timeouts, so these tests fail under TSan. # hardcoded timeouts, so these tests fail under TSan.
JingleSessionTest.Connect JingleSessionTest.Connect
JingleSessionTest.ConnectBadChannelAuth JingleSessionTest.ConnectBadChannelAuth
JingleSessionTest.TestControlChannel JingleSessionTest.TestUdpChannel
JingleSessionTest.TestEventChannel JingleSessionTest.TestTcpChannel
JingleSessionTest.TestSpeed JingleSessionTest.TestSpeed
JingleSessionTest.TestVideoChannel
JingleSessionTest.TestVideoRtpChannel
# Following tests create real libjingle connections, and libjingle has # Following tests create real libjingle connections, and libjingle has
# hardcoded timeouts, so these tests fail under TSan. # hardcoded timeouts, so these tests fail under TSan.
JingleSessionTest.Connect JingleSessionTest.Connect
JingleSessionTest.TestControlChannel JingleSessionTest.TestUdpChannel
JingleSessionTest.TestEventChannel JingleSessionTest.TestTcpChannel
JingleSessionTest.TestVideoChannel
JingleSessionTest.TestVideoRtpChannel
JingleSessionTest.TestSpeed JingleSessionTest.TestSpeed
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment