Commit d0b777b7 authored by Erik Jensen's avatar Erik Jensen Committed by Commit Bot

remoting: Wait to call done callback in WebrtcDataStreamAdapter.

When sending a message with WebRtcDataStreamAdapter, the adapter would
immediately call Send on the underlying webrtc::DataChannelInterface and
invoke the done callback. As a result, any code that tried to use the
done callback to determine when to queue the next message would end up
overrunning the WebRTC send buffer, causing the connection to drop.

This commit changes WebRtcDataStreamAdapter so it monitors the data
channel buffer and waits for the previous message to makes its way down
to the SCTP layer before sending the next message and calling its done
callback.

Bug: 679313
Change-Id: Ibb1cf8f7a0806e80d97e0dfd0632707711228e14
Reviewed-on: https://chromium-review.googlesource.com/c/1483932
Commit-Queue: Erik Jensen <rkjnsn@chromium.org>
Reviewed-by: default avatarJoe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#635347}
parent 7d0ce26d
...@@ -8,12 +8,13 @@ ...@@ -8,12 +8,13 @@
#include <utility> #include <utility>
#include "base/bind.h" #include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback.h" #include "base/callback.h"
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/location.h" #include "base/location.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/threading/thread_task_runner_handle.h" #include "base/threading/sequenced_task_runner_handle.h"
#include "net/base/net_errors.h" #include "net/base/net_errors.h"
#include "remoting/base/compound_buffer.h" #include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_serialization.h" #include "remoting/protocol/message_serialization.h"
...@@ -23,7 +24,7 @@ namespace protocol { ...@@ -23,7 +24,7 @@ namespace protocol {
WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) rtc::scoped_refptr<webrtc::DataChannelInterface> channel)
: channel_(channel.get()) { : channel_(channel.get()), weak_ptr_factory_(this) {
channel_->RegisterObserver(this); channel_->RegisterObserver(this);
DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting);
} }
...@@ -34,8 +35,11 @@ WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { ...@@ -34,8 +35,11 @@ WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {
channel_->Close(); channel_->Close();
// Destroy |channel_| asynchronously as it may be on stack. // Destroy |channel_| asynchronously as it may be on stack.
base::ThreadTaskRunnerHandle::Get()->ReleaseSoon( base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::WrapRefCounted(channel_.release())); FROM_HERE,
base::BindOnce(base::DoNothing::Once<
rtc::scoped_refptr<webrtc::DataChannelInterface>>(),
std::move(channel_)));
} }
} }
...@@ -54,15 +58,38 @@ void WebrtcDataStreamAdapter::Send(google::protobuf::MessageLite* message, ...@@ -54,15 +58,38 @@ void WebrtcDataStreamAdapter::Send(google::protobuf::MessageLite* message,
buffer.SetSize(message->ByteSize()); buffer.SetSize(message->ByteSize());
message->SerializeWithCachedSizesToArray( message->SerializeWithCachedSizesToArray(
reinterpret_cast<uint8_t*>(buffer.data())); reinterpret_cast<uint8_t*>(buffer.data()));
webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); pending_messages_.emplace(
if (!channel_->Send(data_buffer)) { webrtc::DataBuffer(std::move(buffer), true /* binary */),
LOG(ERROR) << "Send failed on data channel " << channel_->label(); std::move(done));
channel_->Close();
return; // Send asynchronously to avoid nested calls to Send.
} base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&WebrtcDataStreamAdapter::SendMessagesIfReady,
weak_ptr_factory_.GetWeakPtr()));
}
if (!done.is_null()) void WebrtcDataStreamAdapter::SendMessagesIfReady() {
std::move(done).Run(); // We use our own send queue instead of queuing multiple messages in the
// data-channel queue so we can invoke the done callback as close to the
// message actually being sent as possible and avoid overrunning the data-
// channel queue. There is also lower-level buffering beneath the data-channel
// queue, which we do want to keep full to ensure the link is fully utilized.
// Send messages to the data channel until it has to add one to its own queue.
// This ensures that the lower-level buffers remain full.
while (channel_->buffered_amount() == 0 && !pending_messages_.empty()) {
PendingMessage message = std::move(pending_messages_.front());
pending_messages_.pop();
if (!channel_->Send(std::move(message.buffer))) {
LOG(ERROR) << "Send failed on data channel " << channel_->label();
channel_->Close();
return;
}
if (message.done_callback) {
std::move(message.done_callback).Run();
}
}
} }
void WebrtcDataStreamAdapter::OnStateChange() { void WebrtcDataStreamAdapter::OnStateChange() {
...@@ -99,5 +126,26 @@ void WebrtcDataStreamAdapter::OnMessage(const webrtc::DataBuffer& rtc_buffer) { ...@@ -99,5 +126,26 @@ void WebrtcDataStreamAdapter::OnMessage(const webrtc::DataBuffer& rtc_buffer) {
event_handler_->OnMessageReceived(std::move(buffer)); event_handler_->OnMessageReceived(std::move(buffer));
} }
void WebrtcDataStreamAdapter::OnBufferedAmountChange(uint64_t previous_amount) {
// WebRTC explicitly doesn't support sending from observer callbacks, so post
// a task to let the stack unwind.
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&WebrtcDataStreamAdapter::SendMessagesIfReady,
weak_ptr_factory_.GetWeakPtr()));
}
WebrtcDataStreamAdapter::PendingMessage::PendingMessage(
webrtc::DataBuffer buffer,
base::OnceClosure done_callback)
: buffer(std::move(buffer)), done_callback(std::move(done_callback)) {}
WebrtcDataStreamAdapter::PendingMessage&
WebrtcDataStreamAdapter::PendingMessage::operator=(PendingMessage&&) = default;
WebrtcDataStreamAdapter::PendingMessage::PendingMessage(PendingMessage&&) =
default;
WebrtcDataStreamAdapter::PendingMessage::~PendingMessage() = default;
} // namespace protocol } // namespace protocol
} // namespace remoting } // namespace remoting
...@@ -9,7 +9,9 @@ ...@@ -9,7 +9,9 @@
#include <string> #include <string>
#include "base/callback.h" #include "base/callback.h"
#include "base/containers/queue.h"
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "remoting/protocol/message_pipe.h" #include "remoting/protocol/message_pipe.h"
#include "third_party/webrtc/api/peer_connection_interface.h" #include "third_party/webrtc/api/peer_connection_interface.h"
#include "third_party/webrtc/rtc_base/ref_count.h" #include "third_party/webrtc/rtc_base/ref_count.h"
...@@ -35,9 +37,22 @@ class WebrtcDataStreamAdapter : public MessagePipe, ...@@ -35,9 +37,22 @@ class WebrtcDataStreamAdapter : public MessagePipe,
private: private:
enum class State { CONNECTING, OPEN, CLOSED }; enum class State { CONNECTING, OPEN, CLOSED };
struct PendingMessage {
PendingMessage(webrtc::DataBuffer buffer, base::OnceClosure done_callback);
PendingMessage(PendingMessage&&);
~PendingMessage();
PendingMessage& operator=(PendingMessage&&);
webrtc::DataBuffer buffer;
base::OnceClosure done_callback;
};
void SendMessagesIfReady();
// webrtc::DataChannelObserver interface. // webrtc::DataChannelObserver interface.
void OnStateChange() override; void OnStateChange() override;
void OnMessage(const webrtc::DataBuffer& buffer) override; void OnMessage(const webrtc::DataBuffer& buffer) override;
void OnBufferedAmountChange(uint64_t previous_amount) override;
rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; rtc::scoped_refptr<webrtc::DataChannelInterface> channel_;
...@@ -45,6 +60,11 @@ class WebrtcDataStreamAdapter : public MessagePipe, ...@@ -45,6 +60,11 @@ class WebrtcDataStreamAdapter : public MessagePipe,
State state_ = State::CONNECTING; State state_ = State::CONNECTING;
// The data and done callbacks for queued but not yet sent messages.
base::queue<PendingMessage> pending_messages_;
base::WeakPtrFactory<WebrtcDataStreamAdapter> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(WebrtcDataStreamAdapter); DISALLOW_COPY_AND_ASSIGN(WebrtcDataStreamAdapter);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment