Commit 350f7d4b authored by Seth Hampson's avatar Seth Hampson Committed by Commit Bot

P2PQuicStream write functionality.

This adds the P2PQuicStream::WriteData function and adds tests. It also
adds the concept of a write buffered amount, enforcing this at the
P2PQuicStreamImpl.

Bug: 874296
Change-Id: Id02c8aa8d5368a87bb24a2e50dab5ef94bcae131
Reviewed-on: https://chromium-review.googlesource.com/c/1315534
Commit-Queue: Seth Hampson <shampson@chromium.org>
Reviewed-by: default avatarHenrik Boström <hbos@chromium.org>
Cr-Commit-Position: refs/heads/master@{#605766}
parent 1ac713b7
......@@ -5,6 +5,9 @@
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_PEERCONNECTION_ADAPTERS_P2P_QUIC_STREAM_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_PEERCONNECTION_ADAPTERS_P2P_QUIC_STREAM_H_
#include <stdint.h>
#include <vector>
namespace blink {
// The bidirectional QUIC stream object to be used by the RTCQuicStream Web
......@@ -13,7 +16,8 @@ namespace blink {
// Lifetime: The P2PQuicStream is owned by the P2PQuicTransport, and can be
// deleted after the stream is closed for reading and writing. This can happen
// in 3 ways: 1) OnRemoteReset has been fired. 2) Calling Reset(). 3) Both
// Finish() has been called and OnRemoteFinish has been fired.
// a FIN bit has been sent with WriteData(_, true) and OnRemoteFinish has been
// fired.
class P2PQuicStream {
public:
// Receives callbacks for receiving RST_STREAM frames or a STREAM_FRAME with
......@@ -36,6 +40,14 @@ class P2PQuicStream {
// finished writing, then upon consuming the FIN bit the stream can no
// longer read or write and is deleted by the quic::QuicSession.
virtual void OnRemoteFinish() {}
// Called when data written with WriteData() has been consumed by QUIC.
//
// This will happen immediately after calling WriteData(), unless QUIC has
// buffered the data in which case it will be fired when the stream is no
// longer write blocked and the data is consumed. |amount| specifies how
// much data was consumed in bytes.
virtual void OnWriteDataConsumed(uint32_t amount) {}
};
virtual ~P2PQuicStream() = default;
......@@ -48,18 +60,17 @@ class P2PQuicStream {
// received from the remote side, because the local stream is already closed.
virtual void Reset() = 0;
// Sends a stream frame with the FIN bit set, which notifies the remote side
// that this stream is done writing. The stream can no longer write after
// calling this function. If the stream has already received a FIN bit, this
// will close the stream for reading & writing and it will be deleted by the
// quic::QuicSession.
virtual void Finish() = 0;
// Writes |data| to a STREAM frame and gives it to QUIC to be buffered or sent
// to the remote endpoint. Once that data has been sent Delegate::OnDataSent()
// will be fired. Specifying |fin| to true will mark the STREAM frame with the
// FIN bit set, which notifies the remote side that this stream is done
// writing. After sending the FIN bit, the P2PQuicStream can no longer write.
// Once the P2PQuicStream has sent AND received the FIN bit it will be closed
// for reading and writing and deleted by the quic::QuicSession.
virtual void WriteData(std::vector<uint8_t> data, bool fin) = 0;
// Sets the delegate object, which must outlive the P2PQuicStream.
virtual void SetDelegate(Delegate* delegate) = 0;
// TODO:(https://crbug.com/874296): Create functions for reading and writing,
// specifically for waitForReadable/waitForWriteable.
};
} // namespace blink
......
......@@ -9,8 +9,12 @@
namespace blink {
P2PQuicStreamImpl::P2PQuicStreamImpl(quic::QuicStreamId id,
quic::QuicSession* session)
: quic::QuicStream(id, session, /*is_static=*/false, quic::BIDIRECTIONAL) {}
quic::QuicSession* session,
uint32_t write_buffer_size)
: quic::QuicStream(id, session, /*is_static=*/false, quic::BIDIRECTIONAL),
write_buffer_size_(write_buffer_size) {
DCHECK_GT(write_buffer_size_, 0u);
}
P2PQuicStreamImpl::~P2PQuicStreamImpl() {}
......@@ -23,6 +27,17 @@ void P2PQuicStreamImpl::OnDataAvailable() {
}
}
void P2PQuicStreamImpl::OnStreamDataConsumed(size_t bytes_consumed) {
DCHECK(delegate_);
// We should never consume more than has been written.
DCHECK_GE(write_buffered_amount_, bytes_consumed);
QuicStream::OnStreamDataConsumed(bytes_consumed);
if (bytes_consumed > 0) {
write_buffered_amount_ -= bytes_consumed;
delegate_->OnWriteDataConsumed(bytes_consumed);
}
}
void P2PQuicStreamImpl::Reset() {
if (rst_sent()) {
// No need to reset twice. This could have already been sent as consequence
......@@ -32,10 +47,15 @@ void P2PQuicStreamImpl::Reset() {
quic::QuicStream::Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
}
void P2PQuicStreamImpl::Finish() {
// Should never call Finish twice.
DCHECK(!fin_sent());
quic::QuicStream::WriteOrBufferData("", /*fin=*/true, nullptr);
void P2PQuicStreamImpl::WriteData(std::vector<uint8_t> data, bool fin) {
// It is up to the delegate to not write more data than the
// |write_buffer_size_|.
DCHECK_GE(write_buffer_size_, data.size() + write_buffered_amount_);
write_buffered_amount_ += data.size();
QuicStream::WriteOrBufferData(
quic::QuicStringPiece(reinterpret_cast<const char*>(data.data()),
data.size()),
fin, nullptr);
}
void P2PQuicStreamImpl::SetDelegate(P2PQuicStream::Delegate* delegate) {
......@@ -43,8 +63,6 @@ void P2PQuicStreamImpl::SetDelegate(P2PQuicStream::Delegate* delegate) {
}
void P2PQuicStreamImpl::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
// TODO(https://crbug.com/874296): If we get an incoming stream we need to
// make sure that the delegate is set before we have incoming data.
DCHECK(delegate_);
// Calling this on the QuicStream will ensure that the stream is closed
// for reading and writing and we send a RST frame to the remote side if
......
......@@ -14,32 +14,38 @@ namespace blink {
class MODULES_EXPORT P2PQuicStreamImpl final : public P2PQuicStream,
public quic::QuicStream {
public:
P2PQuicStreamImpl(quic::QuicStreamId id, quic::QuicSession* session);
P2PQuicStreamImpl(quic::QuicStreamId id,
quic::QuicSession* session,
uint32_t write_buffer_size);
~P2PQuicStreamImpl() override;
// QuicStream overrides.
//
// Right now this marks the data as consumed and drops it.
// TODO(https://crbug.com/874296): We need to update this function for
// reading and consuming data properly while the main JavaScript thread is
// busy. See:
// https://w3c.github.io/webrtc-quic/#dom-rtcquicstream-waitforreadable
void OnDataAvailable() override;
// P2PQuicStream overrides
void SetDelegate(P2PQuicStream::Delegate* delegate) override;
void Reset() override;
void Finish() override;
void WriteData(std::vector<uint8_t> data, bool fin) override;
// quic::QuicStream overrides
// For testing purposes. This is returns true after quic::QuicStream::OnClose
bool IsClosedForTesting();
// quic::QuicStream overrides.
//
// Right now this marks the data as consumed and drops it.
// TODO(https://crbug.com/874296): We need to update this function for
// reading and consuming data properly while the main JavaScript thread is
// busy. See:
// https://w3c.github.io/webrtc-quic/#dom-rtcquicstream-waitforreadable
void OnDataAvailable() override;
// Called by the quic::QuicSession when receiving a RST_STREAM frame from the
// remote side. This closes the stream for reading & writing (if not already
// closed), and sends a RST_STREAM frame if one has not been sent yet.
void OnStreamReset(const quic::QuicRstStreamFrame& frame) override;
// Called by the quic::QuicSession. This means the stream is closed for
// reading
// and writing, and can now be deleted by the quic::QuicSession.
void OnClose() override;
// Called when the stream has finished consumed data up to the FIN bit from
// the quic::QuicStreamSequencer. This will close the underlying QuicStream
// for reading. This can be called either by the P2PQuicStreamImpl when
......@@ -47,19 +53,30 @@ class MODULES_EXPORT P2PQuicStreamImpl final : public P2PQuicStream,
// receive a stream frame with the FIN bit.
void OnFinRead() override;
// Called by the quic::QuicSession. This means the stream is closed for
// reading and writing, and can now be deleted by the quic::QuicSession.
void OnClose() override;
// For testing purposes. This is returns true after quic::QuicStream::OnClose
bool IsClosedForTesting();
protected:
// quic::QuicStream overrides.
//
// Called when written data (from WriteData()) is consumed by QUIC. This means
// the data has either been sent across the wire, or it has been turned into a
// packet and queued if the socket is unexpectedly blocked.
void OnStreamDataConsumed(size_t bytes_consumed) override;
private:
using quic::QuicStream::Reset;
// Outlives the P2PQuicStreamImpl.
Delegate* delegate_;
// Set after OnClose gets called.
// The maximum size allowed to be from due to writing data. The
// |write_buffered_amount_| must never exceed this value, and it is up
// to the application to enforce this.
const uint32_t write_buffer_size_;
// How much data is buffered by the QUIC library, but has not yet
// been consumed. This value gets increased when WriteData() is called
// and decreased when OnDataConsumed() gets fired.
uint32_t write_buffered_amount_ = 0;
// Set after OnClose gets called. Used for testing purposes.
bool closed_ = false;
};
......
......@@ -4,6 +4,7 @@
#include "third_party/blink/renderer/modules/peerconnection/adapters/p2p_quic_stream.h"
#include "net/test/gtest_util.h"
#include "net/third_party/quic/core/quic_data_writer.h"
#include "net/third_party/quic/test_tools/quic_test_utils.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "third_party/blink/renderer/modules/peerconnection/adapters/p2p_quic_stream_impl.h"
......@@ -15,13 +16,16 @@ namespace {
using testing::_;
using testing::Invoke;
using testing::InvokeWithoutArgs;
const uint32_t kWriteBufferSize = 1024;
const quic::QuicStreamId kStreamId = 5;
const std::string kSomeData = "howdy";
} // namespace
// Unit tests for the P2PQuicStream, using a mock QuicSession, which allows
// us to isolate testing the behaviors of reading and writing.
// us to isolate testing the behaviors of reading a writing.
class P2PQuicStreamTest : public testing::Test {
public:
P2PQuicStreamTest()
......@@ -31,7 +35,7 @@ class P2PQuicStreamTest : public testing::Test {
quic::Perspective::IS_CLIENT)),
session_(connection_) {
session_.Initialize();
stream_ = new P2PQuicStreamImpl(kStreamId, &session_);
stream_ = new P2PQuicStreamImpl(kStreamId, &session_, kWriteBufferSize);
stream_->SetDelegate(&delegate_);
// The session takes the ownership of the stream.
session_.ActivateStream(std::unique_ptr<P2PQuicStreamImpl>(stream_));
......@@ -53,11 +57,11 @@ class P2PQuicStreamTest : public testing::Test {
P2PQuicStreamImpl* stream_;
};
TEST_F(P2PQuicStreamTest, StreamFinishSendsFinAndCanNoLongerWrite) {
TEST_F(P2PQuicStreamTest, StreamSendsFinAndCanNoLongerWrite) {
EXPECT_CALL(session_, WritevData(stream_, kStreamId, _, _, _))
.WillOnce(Invoke(quic::test::MockQuicSession::ConsumeData));
stream_->Finish();
stream_->WriteData({}, /*fin=*/true);
EXPECT_TRUE(stream_->fin_sent());
EXPECT_TRUE(stream_->write_side_closed());
......@@ -83,11 +87,11 @@ TEST_F(P2PQuicStreamTest, StreamOnStreamFrameWithFin) {
}
// Tests that when a stream receives a stream frame with the FIN bit set after
// it has called Finish(), then the stream will close.
TEST_F(P2PQuicStreamTest, StreamClosedAfterReceivesFin) {
// it has written the FIN bit, then the stream will close.
TEST_F(P2PQuicStreamTest, StreamClosedAfterSendingThenReceivingFin) {
EXPECT_CALL(session_, WritevData(stream_, kStreamId, _, _, _))
.WillOnce(Invoke(quic::test::MockQuicSession::ConsumeData));
stream_->Finish();
stream_->WriteData({}, /*fin=*/true);
EXPECT_FALSE(stream_->IsClosedForTesting());
quic::QuicStreamFrame fin_frame(stream_->id(), /*fin=*/true, 0, 0);
......@@ -98,17 +102,37 @@ TEST_F(P2PQuicStreamTest, StreamClosedAfterReceivesFin) {
EXPECT_TRUE(stream_->IsClosedForTesting());
}
// Tests that when a stream calls Finish() after receiving a stream frame with
// Tests that when a stream writes a FIN bit after receiving a stream frame with
// the FIN bit then the stream will close.
TEST_F(P2PQuicStreamTest, StreamClosedAfterFinish) {
TEST_F(P2PQuicStreamTest, StreamClosedAfterReceivingThenSendingFin) {
quic::QuicStreamFrame fin_frame(stream_->id(), /*fin=*/true, 0, 0);
stream_->OnStreamFrame(fin_frame);
EXPECT_FALSE(stream_->IsClosedForTesting());
EXPECT_CALL(session_, WritevData(stream_, kStreamId, _, _, _))
.WillOnce(Invoke(quic::test::MockQuicSession::ConsumeData));
stream_->Finish();
stream_->WriteData({}, /*fin=*/true);
EXPECT_TRUE(stream_->IsClosedForTesting());
}
// Tests that when a stream writes some data with the FIN bit set, and receives
// data with the FIN bit set it will become closed.
TEST_F(P2PQuicStreamTest, StreamClosedAfterWritingAndReceivingDataWithFin) {
EXPECT_CALL(session_, WritevData(stream_, kStreamId,
/*write_length=*/kSomeData.size(), _, _))
.WillOnce(Invoke(quic::test::MockQuicSession::ConsumeData));
stream_->WriteData(std::vector<uint8_t>(kSomeData.begin(), kSomeData.end()),
/*fin=*/true);
EXPECT_FALSE(stream_->IsClosedForTesting());
quic::QuicStreamFrame fin_frame_with_data(stream_->id(), /*fin=*/true, 0,
kSomeData);
stream_->OnStreamFrame(fin_frame_with_data);
EXPECT_TRUE(stream_->reading_stopped());
EXPECT_TRUE(stream_->write_side_closed());
EXPECT_TRUE(stream_->IsClosedForTesting());
}
......@@ -124,4 +148,103 @@ TEST_F(P2PQuicStreamTest, StreamClosedAfterReceivingReset) {
EXPECT_TRUE(stream_->IsClosedForTesting());
}
// Tests that data written to the P2PQuicStream will appropriately get written
// to the underlying QUIC library.
TEST_F(P2PQuicStreamTest, StreamWritesData) {
EXPECT_CALL(session_, WritevData(stream_, kStreamId,
/*write_length=*/kSomeData.size(), _, _))
.WillOnce(Invoke([](quic::QuicStream* stream, quic::QuicStreamId id,
size_t write_length, quic::QuicStreamOffset offset,
quic::StreamSendingState state) {
// quic::QuicSession::WritevData does not pass the data. The data is
// saved to the stream, so we must grab it before it's consumed, in
// order to check that it's what was written.
std::string data_consumed_by_quic(write_length, 'a');
quic::QuicDataWriter writer(write_length, &data_consumed_by_quic[0],
quic::NETWORK_BYTE_ORDER);
stream->WriteStreamData(offset, write_length, &writer);
EXPECT_EQ(kSomeData, data_consumed_by_quic);
EXPECT_EQ(quic::StreamSendingState::NO_FIN, state);
return quic::QuicConsumedData(
write_length, state != quic::StreamSendingState::NO_FIN);
}));
EXPECT_CALL(delegate_, OnWriteDataConsumed(kSomeData.size()));
stream_->WriteData(std::vector<uint8_t>(kSomeData.begin(), kSomeData.end()),
/*fin=*/false);
}
// Tests that data written to the P2PQuicStream will appropriately get written
// to the underlying QUIC library with the FIN bit set.
TEST_F(P2PQuicStreamTest, StreamWritesDataWithFin) {
EXPECT_CALL(session_, WritevData(stream_, kStreamId,
/*write_length=*/kSomeData.size(), _, _))
.WillOnce(Invoke([](quic::QuicStream* stream, quic::QuicStreamId id,
size_t write_length, quic::QuicStreamOffset offset,
quic::StreamSendingState state) {
// WritevData does not pass the data. The data is saved to the stream,
// so we must grab it before it's consumed, in order to check that it's
// what was written.
std::string data_consumed_by_quic(write_length, 'a');
quic::QuicDataWriter writer(write_length, &data_consumed_by_quic[0],
quic::NETWORK_BYTE_ORDER);
stream->WriteStreamData(offset, write_length, &writer);
EXPECT_EQ(kSomeData, data_consumed_by_quic);
EXPECT_EQ(quic::StreamSendingState::FIN, state);
return quic::QuicConsumedData(
write_length, state != quic::StreamSendingState::NO_FIN);
}));
EXPECT_CALL(delegate_, OnWriteDataConsumed(kSomeData.size()));
stream_->WriteData(std::vector<uint8_t>(kSomeData.begin(), kSomeData.end()),
/*fin=*/true);
}
// Tests that when written data is not consumed by QUIC (due to buffering),
// the OnWriteDataConsumed will not get fired.
TEST_F(P2PQuicStreamTest, StreamWritesDataAndNotConsumedByQuic) {
EXPECT_CALL(delegate_, OnWriteDataConsumed(_)).Times(0);
EXPECT_CALL(session_, WritevData(stream_, kStreamId,
/*write_length=*/kSomeData.size(), _, _))
.WillOnce(Invoke([](quic::QuicStream* stream, quic::QuicStreamId id,
size_t write_length, quic::QuicStreamOffset offset,
quic::StreamSendingState state) {
// We mock that the QUIC library is not consuming the data, meaning it's
// being buffered. In this case, the OnWriteDataConsumed() callback
// should not be called.
return quic::QuicConsumedData(/*bytes_consumed=*/0,
quic::StreamSendingState::NO_FIN);
}));
stream_->WriteData(std::vector<uint8_t>(kSomeData.begin(), kSomeData.end()),
/*fin=*/true);
}
// Tests that OnWriteDataConsumed() is fired with the amount consumed by QUIC.
// This tests the case when amount consumed by QUIC is less than what is written
// with P2PQuicStream::WriteData. This can happen when QUIC is receiving back
// pressure from the receive side, and its "send window" is smaller than the
// amount attempted to be written.
TEST_F(P2PQuicStreamTest, StreamWritesDataAndPartiallyConsumedByQuic) {
size_t amount_consumed_by_quic = 2;
EXPECT_CALL(delegate_, OnWriteDataConsumed(amount_consumed_by_quic));
EXPECT_CALL(session_, WritevData(stream_, kStreamId,
/*write_length=*/kSomeData.size(), _, _))
.WillOnce(Invoke([&amount_consumed_by_quic](
quic::QuicStream* stream, quic::QuicStreamId id,
size_t write_length, quic::QuicStreamOffset offset,
quic::StreamSendingState state) {
// We mock that the QUIC library is only consuming some of the data,
// meaning the rest is being buffered.
return quic::QuicConsumedData(
/*bytes_consumed=*/amount_consumed_by_quic,
quic::StreamSendingState::NO_FIN);
}));
stream_->WriteData(std::vector<uint8_t>(kSomeData.begin(), kSomeData.end()),
/*fin=*/true);
}
} // namespace blink
......@@ -22,13 +22,16 @@ struct P2PQuicTransportConfig final {
P2PQuicTransport::Delegate* const delegate_in,
P2PQuicPacketTransport* const packet_transport_in,
const std::vector<rtc::scoped_refptr<rtc::RTCCertificate>>
certificates_in)
certificates_in,
uint32_t stream_write_buffer_size_in)
: packet_transport(packet_transport_in),
certificates(certificates_in),
delegate(delegate_in) {
delegate(delegate_in),
stream_write_buffer_size(stream_write_buffer_size_in) {
DCHECK_GT(certificates.size(), 0u);
DCHECK(packet_transport);
DCHECK(delegate);
DCHECK_GT(stream_write_buffer_size, 0u);
}
P2PQuicTransportConfig(const P2PQuicTransportConfig&) = delete;
P2PQuicTransportConfig& operator=(const P2PQuicTransportConfig&) = delete;
......@@ -51,6 +54,10 @@ struct P2PQuicTransportConfig final {
// to listen and respond to a crypto handshake upon construction.
// This will NOT start a handshake.
bool can_respond_to_crypto_handshake = true;
// The amount that the P2PQuicStream will allow to buffer. This is a mandatory
// field that must be set to ensure that the client of the P2PQuicStream does
// not write more data than can be buffered.
const uint32_t stream_write_buffer_size;
};
// For creating a P2PQuicTransport. This factory should be injected into
......
......@@ -151,11 +151,13 @@ P2PQuicTransportImpl::P2PQuicTransportImpl(
: quic::Perspective::IS_CLIENT),
packet_transport_(p2p_transport_config.packet_transport),
delegate_(p2p_transport_config.delegate),
clock_(clock) {
clock_(clock),
stream_write_buffer_size_(p2p_transport_config.stream_write_buffer_size) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(delegate_);
DCHECK(clock_);
DCHECK(packet_transport_);
DCHECK_GT(stream_write_buffer_size_, 0u);
DCHECK_GT(p2p_transport_config.certificates.size(), 0u);
if (p2p_transport_config.can_respond_to_crypto_handshake) {
InitializeCryptoStream();
......@@ -250,7 +252,7 @@ P2PQuicStreamImpl* P2PQuicTransportImpl::CreateStreamInternal(
DCHECK(crypto_stream_);
DCHECK(IsEncryptionEstablished());
DCHECK(!IsClosed());
return new P2PQuicStreamImpl(id, this);
return new P2PQuicStreamImpl(id, this, stream_write_buffer_size_);
}
void P2PQuicTransportImpl::InitializeCryptoStream() {
......
......@@ -170,6 +170,8 @@ class MODULES_EXPORT P2PQuicTransportImpl final
// Owned by whatever creates the P2PQuicTransportImpl. The |clock_| needs to
// outlive the P2PQuicTransportImpl.
quic::QuicClock* clock_ = nullptr;
// Determines the size of the write buffer when P2PQuicStreams.
uint32_t stream_write_buffer_size_;
THREAD_CHECKER(thread_checker_);
};
......
......@@ -27,6 +27,7 @@ using ::testing::MakePolymorphicAction;
using ::testing::PolymorphicAction;
const std::string kTriggerRemoteStreamPhrase = "open sesame";
const uint32_t kWriteBufferSize = 100 * 1024;
// A custom gmock Action that fires the given callback. This is used in
// conjuction with the CallbackRunLoop in order to drive the TestTaskRunner
......@@ -389,7 +390,7 @@ class P2PQuicTransportTest : public testing::Test {
client_certificates.push_back(client_cert);
P2PQuicTransportConfig client_config(client_quic_transport_delegate.get(),
client_packet_transport.get(),
client_certificates);
client_certificates, kWriteBufferSize);
client_config.is_server = false;
client_config.can_respond_to_crypto_handshake =
can_respond_to_crypto_handshake;
......@@ -416,7 +417,7 @@ class P2PQuicTransportTest : public testing::Test {
server_certificates.push_back(server_cert);
P2PQuicTransportConfig server_config(server_quic_transport_delegate.get(),
server_packet_transport.get(),
server_certificates);
server_certificates, kWriteBufferSize);
server_config.is_server = true;
server_config.can_respond_to_crypto_handshake =
can_respond_to_crypto_handshake;
......@@ -523,8 +524,10 @@ class P2PQuicTransportTest : public testing::Test {
callback.Run();
}));
client_peer_->stream()->WriteOrBufferData(kTriggerRemoteStreamPhrase,
/*fin=*/false, nullptr);
client_peer_->stream()->WriteData(
std::vector<uint8_t>(kTriggerRemoteStreamPhrase.begin(),
kTriggerRemoteStreamPhrase.end()),
/*fin=*/false);
run_loop.RunUntilCallbacksFired();
// Set the stream and delegate to the |server_peer_|, so that it can be
// accessed by tests later.
......@@ -862,8 +865,10 @@ TEST_F(P2PQuicTransportTest, ClientCreatesStream) {
callback.Run();
}));
client_peer()->stream()->WriteOrBufferData(kTriggerRemoteStreamPhrase,
/*fin=*/false, nullptr);
client_peer()->stream()->WriteData(
std::vector<uint8_t>(kTriggerRemoteStreamPhrase.begin(),
kTriggerRemoteStreamPhrase.end()),
/*fin=*/false);
run_loop.RunUntilCallbacksFired();
EXPECT_TRUE(server_peer()->quic_transport()->HasOpenDynamicStreams());
......@@ -897,8 +902,10 @@ TEST_F(P2PQuicTransportTest, ServerCreatesStream) {
callback.Run();
}));
server_peer()->stream()->WriteOrBufferData(kTriggerRemoteStreamPhrase,
/*fin=*/false, nullptr);
server_peer()->stream()->WriteData(
std::vector<uint8_t>(kTriggerRemoteStreamPhrase.begin(),
kTriggerRemoteStreamPhrase.end()),
/*fin=*/false);
run_loop.RunUntilCallbacksFired();
EXPECT_TRUE(client_peer()->quic_transport()->HasOpenDynamicStreams());
......@@ -967,8 +974,8 @@ TEST_F(P2PQuicTransportTest, ServerStreamReset) {
ExpectStreamsClosed();
}
// Tests the basic case for calling Finish() on both sides.
TEST_F(P2PQuicTransportTest, StreamFinishHandshake) {
// Tests the basic case for sending a FIN bit on both sides.
TEST_F(P2PQuicTransportTest, StreamClosedAfterSendingAndReceivingFin) {
Initialize();
Connect();
SetupConnectedStreams();
......@@ -977,7 +984,7 @@ TEST_F(P2PQuicTransportTest, StreamFinishHandshake) {
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteFinish())
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->Finish();
client_peer()->stream()->WriteData({}, /*fin=*/true);
run_loop.RunUntilCallbacksFired();
ASSERT_EQ(1u, server_peer()->quic_transport()->GetNumActiveStreams());
......@@ -994,7 +1001,7 @@ TEST_F(P2PQuicTransportTest, StreamFinishHandshake) {
EXPECT_CALL(*client_peer()->stream_delegate(), OnRemoteFinish())
.WillOnce(FireCallback(run_loop.CreateCallback()));
server_peer()->stream()->Finish();
server_peer()->stream()->WriteData({}, /*fin=*/true);
run_loop.RunUntilCallbacksFired();
// This is required so that the client acks the FIN back to the server side
......@@ -1009,9 +1016,9 @@ TEST_F(P2PQuicTransportTest, StreamFinishHandshake) {
client_peer()->stream_id()));
}
// Tests that if a Reset() is called after Finish(), both sides close down
// properly.
TEST_F(P2PQuicTransportTest, StreamResetAfterFinish) {
// Tests that if a Reset() is called after sending a FIN bit, both sides close
// down properly.
TEST_F(P2PQuicTransportTest, StreamResetAfterSendingFin) {
Initialize();
Connect();
SetupConnectedStreams();
......@@ -1020,7 +1027,7 @@ TEST_F(P2PQuicTransportTest, StreamResetAfterFinish) {
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteFinish())
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->Finish();
client_peer()->stream()->WriteData({}, /*fin=*/true);
run_loop.RunUntilCallbacksFired();
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteReset())
......@@ -1035,7 +1042,7 @@ TEST_F(P2PQuicTransportTest, StreamResetAfterFinish) {
// Tests that if a Reset() is called after receiving a stream frame with the FIN
// bit set from the remote side, both sides close down properly.
TEST_F(P2PQuicTransportTest, StreamResetAfterRemoteFinish) {
TEST_F(P2PQuicTransportTest, StreamResetAfterReceivingFin) {
Initialize();
Connect();
SetupConnectedStreams();
......@@ -1044,7 +1051,7 @@ TEST_F(P2PQuicTransportTest, StreamResetAfterRemoteFinish) {
EXPECT_CALL(*server_peer()->stream_delegate(), OnRemoteFinish())
.WillOnce(FireCallback(run_loop.CreateCallback()));
client_peer()->stream()->Finish();
client_peer()->stream()->WriteData({}, /*fin=*/true);
run_loop.RunUntilCallbacksFired();
EXPECT_CALL(*client_peer()->stream_delegate(), OnRemoteReset())
......
......@@ -49,10 +49,13 @@ void QuicStreamHost::Reset() {
Delete();
}
// TODO(https://crbug.com/874296): When the blink binding (RTCQuicStream) is
// updated to support reading/writing, remove this function.
void QuicStreamHost::Finish() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(p2p_stream_);
p2p_stream_->Finish();
std::vector<uint8_t> data;
p2p_stream_->WriteData(data, true);
writeable_ = false;
if (!readable_ && !writeable_) {
Delete();
......
......@@ -44,9 +44,13 @@ void QuicTransportHost::Initialize(
DCHECK(ice_transport_host);
DCHECK(!ice_transport_host_);
ice_transport_host_ = ice_transport_host;
// TODO(https://crbug.com/874296): Pass through values for read and write
// stream buffer sizes in the P2PQuicTransportConfig. Currently this is just
// set to the same size as the QUIC receive window size (24 MB).
uint32_t stream_buffer_size = 24 * 1024 * 1024;
P2PQuicTransportConfig config(
this, ice_transport_host->ConnectConsumer(this)->packet_transport(),
certificates);
certificates, stream_buffer_size);
config.is_server = (perspective == quic::Perspective::IS_SERVER);
quic_transport_ =
quic_transport_factory_->CreateQuicTransport(std::move(config));
......
......@@ -16,6 +16,8 @@ class MockP2PQuicStream : public testing::NiceMock<P2PQuicStream> {
MOCK_METHOD0(Reset, void());
MOCK_METHOD0(Finish, void());
MOCK_METHOD1(SetDelegate, void(Delegate*));
MOCK_METHOD1(MarkDataConsumed, void(uint32_t));
MOCK_METHOD2(WriteData, void(std::vector<uint8_t>, bool));
};
} // namespace blink
......
......@@ -14,6 +14,7 @@ class MockP2PQuicStreamDelegate
: public testing::NiceMock<P2PQuicStream::Delegate> {
public:
// P2PQuicStream::Delegate overrides.
MOCK_METHOD1(OnWriteDataConsumed, void(uint32_t));
MOCK_METHOD0(OnRemoteReset, void());
MOCK_METHOD0(OnRemoteFinish, void());
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment