Commit fd2efc13 authored by Seth Hampson's avatar Seth Hampson Committed by Commit Bot

P2PQuicStream read functionality.

This adds the P2PQuicStream::Delegate::OnDataReceived and adds tests. It
also adds the concept of marking data as consumed by the application and
keeping track of the application's read buffered size and amount. This
ensures that the P2PQuicStream does not give the application more data
than it can buffer, and if its buffer is full it will apply backpressure
to the send side.

Bug: 874296
Change-Id: I85f677778631a04a5eb1ccd71457485c9444824e
Reviewed-on: https://chromium-review.googlesource.com/c/1316836
Commit-Queue: Seth Hampson <shampson@chromium.org>
Reviewed-by: default avatarHenrik Boström <hbos@chromium.org>
Cr-Commit-Position: refs/heads/master@{#605783}
parent 0b37ebfc
......@@ -33,13 +33,13 @@ class P2PQuicStream {
// deleted by the quic::QuicSession.
virtual void OnRemoteReset() {}
// Called when the P2PQuicStream has consumed all incoming data from the
// remote side up to the FIN bit. Consuming means that the data is marked
// as consumed by quic::QuicStreamSequencer, but the data has not
// necessarily been read by the application. If the stream has already
// finished writing, then upon consuming the FIN bit the stream can no
// longer read or write and is deleted by the quic::QuicSession.
virtual void OnRemoteFinish() {}
// Called when the P2PQuicStream has received data from the remote side.
// If |fin| is set to true that means that the FIN bit has been received
// and the Delegate will no longer receive data with OnDataReceived.
// If the stream has already finished writing, then upon receiving the FIN
// bit the stream can no longer read or write and is deleted by the
// quic::QuicSession.
virtual void OnDataReceived(std::vector<uint8_t> data, bool fin) {}
// Called when data written with WriteData() has been consumed by QUIC.
//
......@@ -60,6 +60,13 @@ class P2PQuicStream {
// received from the remote side, because the local stream is already closed.
virtual void Reset() = 0;
// Marks received data of size |amount| as being consumed by the Delegate.
// This is used in conjuction with Delegate::OnDataReceived, to let the
// P2PQuicStream know that received data has been consumed. This allows the
// P2PQuicStream to send back pressure to the send side, if the Delegate
// cannot receive more data.
virtual void MarkReceivedDataConsumed(uint32_t amount) = 0;
// Writes |data| to a STREAM frame and gives it to QUIC to be buffered or sent
// to the remote endpoint. Once that data has been sent Delegate::OnDataSent()
// will be fired. Specifying |fin| to true will mark the STREAM frame with the
......
......@@ -10,21 +10,72 @@ namespace blink {
P2PQuicStreamImpl::P2PQuicStreamImpl(quic::QuicStreamId id,
quic::QuicSession* session,
uint32_t delegate_read_buffer_size,
uint32_t write_buffer_size)
: quic::QuicStream(id, session, /*is_static=*/false, quic::BIDIRECTIONAL),
delegate_read_buffer_size_(delegate_read_buffer_size),
write_buffer_size_(write_buffer_size) {
DCHECK_GT(delegate_read_buffer_size_, 0u);
DCHECK_GT(write_buffer_size_, 0u);
}
P2PQuicStreamImpl::~P2PQuicStreamImpl() {}
void P2PQuicStreamImpl::OnDataAvailable() {
// We just drop the data by marking all data as immediately consumed.
sequencer()->MarkConsumed(sequencer()->ReadableBytes());
if (sequencer()->IsClosed()) {
// This means all data has been consumed up to the FIN bit.
OnFinRead();
DCHECK(delegate_);
if (!sequencer()->HasBytesToRead() && sequencer()->IsClosed()) {
// We have consumed all data from the sequencer up to the FIN bit. This can
// only occur by receiving an empty STREAM frame with the FIN bit set.
quic::QuicStream::OnFinRead();
delegate_->OnDataReceived(std::vector<uint8_t>(), /*fin=*/true);
consumed_fin_ = true;
}
uint32_t delegate_read_buffer_available =
delegate_read_buffer_size_ - delegate_read_buffered_amount_;
uint32_t total_read_amount =
std::min(static_cast<uint32_t>(sequencer()->ReadableBytes()),
delegate_read_buffer_available);
// Nothing to do if the delegate's read buffer can't fit anymore data,
// or the sequencer doesn't have any data available to be read.
if (total_read_amount == 0 || consumed_fin_) {
return;
}
std::vector<uint8_t> data(total_read_amount);
uint32_t current_data_offset = 0;
struct iovec iov;
// Read data from the quic::QuicStreamSequencer until we have exhausted the
// data, or have read at least the amount of the delegate's read buffer size.
while (sequencer()->GetReadableRegion(&iov)) {
uint32_t read_amount = static_cast<uint32_t>(iov.iov_len);
if (read_amount == 0) {
// Read everything available from the quic::QuicStreamSequencer.
DCHECK_EQ(current_data_offset, total_read_amount);
break;
}
// Limit the |consume_amount| by the amount available in the delegate's read
// buffer.
uint32_t consume_amount = std::min(
read_amount, delegate_read_buffer_available - current_data_offset);
memcpy(data.data() + current_data_offset, iov.iov_base, consume_amount);
sequencer()->MarkConsumed(consume_amount);
current_data_offset += consume_amount;
if (read_amount > consume_amount) {
// The delegate cannot store more data in its read buffer.
DCHECK_EQ(current_data_offset, total_read_amount);
break;
}
}
bool fin = !sequencer()->HasBytesToRead() && sequencer()->IsClosed();
delegate_read_buffered_amount_ += data.size();
DCHECK(delegate_read_buffer_size_ >= delegate_read_buffered_amount_);
if (fin) {
quic::QuicStream::OnFinRead();
consumed_fin_ = fin;
}
delegate_->OnDataReceived(std::move(data), fin);
}
void P2PQuicStreamImpl::OnStreamDataConsumed(size_t bytes_consumed) {
......@@ -47,6 +98,14 @@ void P2PQuicStreamImpl::Reset() {
quic::QuicStream::Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
}
void P2PQuicStreamImpl::MarkReceivedDataConsumed(uint32_t amount) {
DCHECK_GE(delegate_read_buffered_amount_, amount);
delegate_read_buffered_amount_ -= amount;
if (sequencer()->HasBytesToRead() || !consumed_fin_) {
OnDataAvailable();
}
}
void P2PQuicStreamImpl::WriteData(std::vector<uint8_t> data, bool fin) {
// It is up to the delegate to not write more data than the
// |write_buffer_size_|.
......@@ -71,16 +130,6 @@ void P2PQuicStreamImpl::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
delegate_->OnRemoteReset();
}
void P2PQuicStreamImpl::OnFinRead() {
// TODO(https://crbug.com/874296): If we get an incoming stream we need to
// make sure that the delegate is set before we have incoming data.
DCHECK(delegate_);
// Calling this on the QuicStream ensures that the stream is closed
// for reading.
quic::QuicStream::OnFinRead();
delegate_->OnRemoteFinish();
}
void P2PQuicStreamImpl::OnClose() {
closed_ = true;
quic::QuicStream::OnClose();
......@@ -90,4 +139,8 @@ bool P2PQuicStreamImpl::IsClosedForTesting() {
return closed_;
}
uint32_t P2PQuicStreamImpl::DelegateReadBufferedAmountForTesting() {
return delegate_read_buffered_amount_;
}
} // namespace blink
......@@ -16,10 +16,9 @@ class MODULES_EXPORT P2PQuicStreamImpl final : public P2PQuicStream,
public:
P2PQuicStreamImpl(quic::QuicStreamId id,
quic::QuicSession* session,
uint32_t delegate_read_buffer_size,
uint32_t write_buffer_size);
~P2PQuicStreamImpl() override;
// P2PQuicStream overrides
void SetDelegate(P2PQuicStream::Delegate* delegate) override;
......@@ -27,9 +26,15 @@ class MODULES_EXPORT P2PQuicStreamImpl final : public P2PQuicStream,
void WriteData(std::vector<uint8_t> data, bool fin) override;
void MarkReceivedDataConsumed(uint32_t amount) override;
// For testing purposes. This is returns true after quic::QuicStream::OnClose
bool IsClosedForTesting();
// For testing purposes. This exposes the amount of received data that the
// P2PQuicStream is aware is buffered by the delegate.
uint32_t DelegateReadBufferedAmountForTesting();
// quic::QuicStream overrides.
//
// Right now this marks the data as consumed and drops it.
......@@ -46,12 +51,6 @@ class MODULES_EXPORT P2PQuicStreamImpl final : public P2PQuicStream,
// reading
// and writing, and can now be deleted by the quic::QuicSession.
void OnClose() override;
// Called when the stream has finished consumed data up to the FIN bit from
// the quic::QuicStreamSequencer. This will close the underlying QuicStream
// for reading. This can be called either by the P2PQuicStreamImpl when
// reading data, or by the quic::QuicStreamSequencer if we're done reading &
// receive a stream frame with the FIN bit.
void OnFinRead() override;
protected:
// quic::QuicStream overrides.
......@@ -67,17 +66,31 @@ class MODULES_EXPORT P2PQuicStreamImpl final : public P2PQuicStream,
// Outlives the P2PQuicStreamImpl.
Delegate* delegate_;
// The maximum size allowed to be from due to writing data. The
// The read buffer size of the delegate. The |delegate_read_buffered_amount_|
// must never exceed this value (enforced by the P2PQuicStreamImpl).
const uint32_t delegate_read_buffer_size_;
// The maximum size allowed to be buffered write side. The
// |write_buffered_amount_| must never exceed this value, and it is up
// to the application to enforce this.
// to the delegate to enforce this.
const uint32_t write_buffer_size_;
// How much total data has been received and given to the delegate,
// but not yet consumed by the delegate. This value gets increased when data
// is received from the QUIC library in OnDataAvailable() and and decreased
// when the delegate updates that data has been read with
// MarkReceivedDataConsumed().
uint32_t delegate_read_buffered_amount_ = 0;
// How much data is buffered by the QUIC library, but has not yet
// been consumed. This value gets increased when WriteData() is called
// and decreased when OnDataConsumed() gets fired.
// been sent. This value gets increased when WriteData() is called
// and decreased when OnDataConsumed() gets called by the QUIC library,
// due to the data being sent.
uint32_t write_buffered_amount_ = 0;
// Set after OnClose gets called. Used for testing purposes.
// Set after OnClose gets called.
bool closed_ = false;
// This is set after the sequencer is closed due to the P2PQuicStream
// consuming all of the sequencer's data up to the FIN bit.
bool consumed_fin_ = false;
};
} // namespace blink
......
......@@ -23,14 +23,17 @@ struct P2PQuicTransportConfig final {
P2PQuicPacketTransport* const packet_transport_in,
const std::vector<rtc::scoped_refptr<rtc::RTCCertificate>>
certificates_in,
uint32_t stream_delegate_read_buffer_size_in,
uint32_t stream_write_buffer_size_in)
: packet_transport(packet_transport_in),
certificates(certificates_in),
delegate(delegate_in),
stream_delegate_read_buffer_size(stream_delegate_read_buffer_size_in),
stream_write_buffer_size(stream_write_buffer_size_in) {
DCHECK_GT(certificates.size(), 0u);
DCHECK(packet_transport);
DCHECK(delegate);
DCHECK_GT(stream_delegate_read_buffer_size, 0u);
DCHECK_GT(stream_write_buffer_size, 0u);
}
P2PQuicTransportConfig(const P2PQuicTransportConfig&) = delete;
......@@ -54,6 +57,11 @@ struct P2PQuicTransportConfig final {
// to listen and respond to a crypto handshake upon construction.
// This will NOT start a handshake.
bool can_respond_to_crypto_handshake = true;
// The amount that the delegate can store in its read buffer. This is a
// mandatory field that must be set to ensure that the
// P2PQuicStream::Delegate will not give the delegate more data than it can
// store.
const uint32_t stream_delegate_read_buffer_size;
// The amount that the P2PQuicStream will allow to buffer. This is a mandatory
// field that must be set to ensure that the client of the P2PQuicStream does
// not write more data than can be buffered.
......
......@@ -152,11 +152,14 @@ P2PQuicTransportImpl::P2PQuicTransportImpl(
packet_transport_(p2p_transport_config.packet_transport),
delegate_(p2p_transport_config.delegate),
clock_(clock),
stream_delegate_read_buffer_size_(
p2p_transport_config.stream_delegate_read_buffer_size),
stream_write_buffer_size_(p2p_transport_config.stream_write_buffer_size) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(delegate_);
DCHECK(clock_);
DCHECK(packet_transport_);
DCHECK_GT(stream_delegate_read_buffer_size_, 0u);
DCHECK_GT(stream_write_buffer_size_, 0u);
DCHECK_GT(p2p_transport_config.certificates.size(), 0u);
if (p2p_transport_config.can_respond_to_crypto_handshake) {
......@@ -252,7 +255,8 @@ P2PQuicStreamImpl* P2PQuicTransportImpl::CreateStreamInternal(
DCHECK(crypto_stream_);
DCHECK(IsEncryptionEstablished());
DCHECK(!IsClosed());
return new P2PQuicStreamImpl(id, this, stream_write_buffer_size_);
return new P2PQuicStreamImpl(id, this, stream_delegate_read_buffer_size_,
stream_write_buffer_size_);
}
void P2PQuicTransportImpl::InitializeCryptoStream() {
......
......@@ -170,6 +170,9 @@ class MODULES_EXPORT P2PQuicTransportImpl final
// Owned by whatever creates the P2PQuicTransportImpl. The |clock_| needs to
// outlive the P2PQuicTransportImpl.
quic::QuicClock* clock_ = nullptr;
// The size of the stream delegate's read buffer, used when creating
// P2PQuicStreams.
uint32_t stream_delegate_read_buffer_size_;
// Determines the size of the write buffer when P2PQuicStreams.
uint32_t stream_write_buffer_size_;
......
......@@ -28,6 +28,7 @@ using ::testing::PolymorphicAction;
const std::string kTriggerRemoteStreamPhrase = "open sesame";
const uint32_t kWriteBufferSize = 100 * 1024;
const uint32_t kDelegateReadBufferSize = 100 * 1024;
// A custom gmock Action that fires the given callback. This is used in
// conjuction with the CallbackRunLoop in order to drive the TestTaskRunner
......@@ -388,9 +389,9 @@ class P2PQuicTransportTest : public testing::Test {
std::make_unique<MockP2PQuicTransportDelegate>();
std::vector<rtc::scoped_refptr<rtc::RTCCertificate>> client_certificates;
client_certificates.push_back(client_cert);
P2PQuicTransportConfig client_config(client_quic_transport_delegate.get(),
client_packet_transport.get(),
client_certificates, kWriteBufferSize);
P2PQuicTransportConfig client_config(
client_quic_transport_delegate.get(), client_packet_transport.get(),
client_certificates, kDelegateReadBufferSize, kWriteBufferSize);
client_config.is_server = false;
client_config.can_respond_to_crypto_handshake =
can_respond_to_crypto_handshake;
......@@ -415,9 +416,9 @@ class P2PQuicTransportTest : public testing::Test {
CreateTestCertificate();
std::vector<rtc::scoped_refptr<rtc::RTCCertificate>> server_certificates;
server_certificates.push_back(server_cert);
P2PQuicTransportConfig server_config(server_quic_transport_delegate.get(),
server_packet_transport.get(),
server_certificates, kWriteBufferSize);
P2PQuicTransportConfig server_config(
server_quic_transport_delegate.get(), server_packet_transport.get(),
server_certificates, kDelegateReadBufferSize, kWriteBufferSize);
server_config.is_server = true;
server_config.can_respond_to_crypto_handshake =
can_respond_to_crypto_handshake;
......@@ -981,7 +982,8 @@ TEST_F(P2PQuicTransportTest, StreamClosedAfterSendingAndReceivingFin) {
SetupConnectedStreams();
CallbackRunLoop run_loop(runner());
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteFinish())
EXPECT_CALL(*server_peer()->stream_delegate(),
OnDataReceived(_, /*fin=*/true))
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->WriteData({}, /*fin=*/true);
......@@ -998,7 +1000,8 @@ TEST_F(P2PQuicTransportTest, StreamClosedAfterSendingAndReceivingFin) {
EXPECT_FALSE(client_peer()->quic_transport()->IsClosedStream(
client_peer()->stream_id()));
EXPECT_CALL(*client_peer()->stream_delegate(), OnRemoteFinish())
EXPECT_CALL(*client_peer()->stream_delegate(),
OnDataReceived(_, /*fin=*/true))
.WillOnce(FireCallback(run_loop.CreateCallback()));
server_peer()->stream()->WriteData({}, /*fin=*/true);
......@@ -1024,7 +1027,8 @@ TEST_F(P2PQuicTransportTest, StreamResetAfterSendingFin) {
SetupConnectedStreams();
CallbackRunLoop run_loop(runner());
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteFinish())
EXPECT_CALL(*server_peer()->stream_delegate(),
OnDataReceived(_, /*fin=*/true))
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->WriteData({}, /*fin=*/true);
......@@ -1048,7 +1052,8 @@ TEST_F(P2PQuicTransportTest, StreamResetAfterReceivingFin) {
SetupConnectedStreams();
CallbackRunLoop run_loop(runner());
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteFinish())
EXPECT_CALL(*server_peer()->stream_delegate(),
OnDataReceived(_, /*fin=*/true))
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->WriteData({}, /*fin=*/true);
......@@ -1066,4 +1071,64 @@ TEST_F(P2PQuicTransportTest, StreamResetAfterReceivingFin) {
ExpectStreamsClosed();
}
// Tests that when data is sent on a stream it is received on the other end.
TEST_F(P2PQuicTransportTest, StreamDataSentThenReceivedOnRemoteSide) {
Initialize();
Connect();
SetupConnectedStreams();
CallbackRunLoop run_loop(runner());
std::string message = "howdy partner";
EXPECT_CALL(*server_peer()->stream_delegate(),
OnDataReceived(
std::vector<uint8_t>(message.begin(), message.end()), false))
.WillOnce(FireCallback(run_loop.CreateCallback()));
EXPECT_CALL(*client_peer()->stream_delegate(),
OnWriteDataConsumed(message.size()))
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->WriteData(
std::vector<uint8_t>(message.begin(), message.end()), /* fin= */ false);
run_loop.RunUntilCallbacksFired();
}
// Tests that if both sides have a stream that sends data and FIN bit
// they both close down for reading and writing properly.
TEST_F(P2PQuicTransportTest, StreamDataSentWithFinClosesStreams) {
Initialize();
Connect();
SetupConnectedStreams();
CallbackRunLoop run_loop(runner());
std::string server_message = "some server data";
std::string client_message = "client data";
EXPECT_CALL(*server_peer()->stream_delegate(),
OnDataReceived(std::vector<uint8_t>(client_message.begin(),
client_message.end()),
true))
.WillOnce(FireCallback(run_loop.CreateCallback()));
EXPECT_CALL(*server_peer()->stream_delegate(),
OnWriteDataConsumed(server_message.size()))
.WillOnce(FireCallback(run_loop.CreateCallback()));
EXPECT_CALL(*client_peer()->stream_delegate(),
OnDataReceived(std::vector<uint8_t>(server_message.begin(),
server_message.end()),
true))
.WillOnce(FireCallback(run_loop.CreateCallback()));
EXPECT_CALL(*client_peer()->stream_delegate(),
OnWriteDataConsumed(client_message.size()))
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->WriteData(
std::vector<uint8_t>(client_message.begin(), client_message.end()),
/*fin=*/true);
server_peer()->stream()->WriteData(
std::vector<uint8_t>(server_message.begin(), server_message.end()),
/*fin=*/true);
run_loop.RunUntilCallbacksFired();
ExpectStreamsClosed();
}
} // namespace blink
......@@ -70,8 +70,14 @@ void QuicStreamHost::OnRemoteReset() {
Delete();
}
void QuicStreamHost::OnRemoteFinish() {
// TODO(https://crbug.com/874296): When the blink binding (RTCQuicStream) is
// updated to support reading/writing, update this function to do more than just
// call OnRemoteFinish.
void QuicStreamHost::OnDataReceived(std::vector<uint8_t> data, bool fin) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!fin) {
return;
}
PostCrossThreadTask(
*proxy_thread(), FROM_HERE,
CrossThreadBind(&QuicStreamProxy::OnRemoteFinish, stream_proxy_));
......
......@@ -64,7 +64,7 @@ class QuicStreamHost final : public base::SupportsWeakPtr<QuicStreamHost>,
// P2PQuicStream::Delegate overrides.
void OnRemoteReset() override;
void OnRemoteFinish() override;
void OnDataReceived(std::vector<uint8_t> data, bool fin) override;
// Up reference. Owned by QuicTransportProxy.
QuicTransportHost* transport_host_ = nullptr;
......
......@@ -50,7 +50,8 @@ void QuicTransportHost::Initialize(
uint32_t stream_buffer_size = 24 * 1024 * 1024;
P2PQuicTransportConfig config(
this, ice_transport_host->ConnectConsumer(this)->packet_transport(),
certificates, stream_buffer_size);
certificates, /*stream_delegate_read_buffer_size_in=*/stream_buffer_size,
/*stream_write_buffer_size_in=*/stream_buffer_size);
config.is_server = (perspective == quic::Perspective::IS_SERVER);
quic_transport_ =
quic_transport_factory_->CreateQuicTransport(std::move(config));
......
......@@ -14,10 +14,9 @@ class MockP2PQuicStream : public testing::NiceMock<P2PQuicStream> {
public:
// P2PQuicStream overrides.
MOCK_METHOD0(Reset, void());
MOCK_METHOD0(Finish, void());
MOCK_METHOD1(SetDelegate, void(Delegate*));
MOCK_METHOD1(MarkDataConsumed, void(uint32_t));
MOCK_METHOD2(WriteData, void(std::vector<uint8_t>, bool));
MOCK_METHOD1(MarkReceivedDataConsumed, void(uint32_t));
};
} // namespace blink
......
......@@ -16,7 +16,7 @@ class MockP2PQuicStreamDelegate
// P2PQuicStream::Delegate overrides.
MOCK_METHOD1(OnWriteDataConsumed, void(uint32_t));
MOCK_METHOD0(OnRemoteReset, void());
MOCK_METHOD0(OnRemoteFinish, void());
MOCK_METHOD2(OnDataReceived, void(std::vector<uint8_t>, bool));
};
} // namespace blink
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment