Reland "Separate client and server pushed streams limits."

https://codereview.chromium.org/367963003/

Fix use-after-free error

BUG=170544, 377538
R=jgraettinger@chromium.org

TEST=SpdySessionTest.PushedStreamShouldNotCountToClientConcurrencyLimit
SpdySessionTest.RejectPushedStreamExceedingConcurrencyLimit
SpdySessionTest.IgnoreReservedRemoteStreamsCount

Review URL: https://codereview.chromium.org/371273003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@282889 0039d316-1c4b-4281-b951-d872f2087c98
parent ede9e125
...@@ -549,6 +549,8 @@ SpdySession::SpdySession( ...@@ -549,6 +549,8 @@ SpdySession::SpdySession(
http_server_properties_(http_server_properties), http_server_properties_(http_server_properties),
read_buffer_(new IOBuffer(kReadBufferSize)), read_buffer_(new IOBuffer(kReadBufferSize)),
stream_hi_water_mark_(kFirstStreamId), stream_hi_water_mark_(kFirstStreamId),
num_pushed_streams_(0u),
num_active_pushed_streams_(0u),
in_flight_write_frame_type_(DATA), in_flight_write_frame_type_(DATA),
in_flight_write_frame_size_(0), in_flight_write_frame_size_(0),
is_secure_(false), is_secure_(false),
...@@ -563,6 +565,7 @@ SpdySession::SpdySession( ...@@ -563,6 +565,7 @@ SpdySession::SpdySession(
max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
? kMaxConcurrentStreamLimit ? kMaxConcurrentStreamLimit
: max_concurrent_streams_limit), : max_concurrent_streams_limit),
max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
streams_initiated_count_(0), streams_initiated_count_(0),
streams_pushed_count_(0), streams_pushed_count_(0),
streams_pushed_and_claimed_count_(0), streams_pushed_and_claimed_count_(0),
...@@ -777,7 +780,7 @@ int SpdySession::TryCreateStream( ...@@ -777,7 +780,7 @@ int SpdySession::TryCreateStream(
return err; return err;
if (!max_concurrent_streams_ || if (!max_concurrent_streams_ ||
(active_streams_.size() + created_streams_.size() < (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
max_concurrent_streams_)) { max_concurrent_streams_)) {
return CreateStream(*request, stream); return CreateStream(*request, stream);
} }
...@@ -1218,8 +1221,12 @@ void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, ...@@ -1218,8 +1221,12 @@ void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
// probably something that we still want to support, although server // probably something that we still want to support, although server
// push is hardly used. Write tests for this and fix this. (See // push is hardly used. Write tests for this and fix this. (See
// http://crbug.com/261712 .) // http://crbug.com/261712 .)
if (owned_stream->type() == SPDY_PUSH_STREAM) if (owned_stream->type() == SPDY_PUSH_STREAM) {
unclaimed_pushed_streams_.erase(owned_stream->url()); unclaimed_pushed_streams_.erase(owned_stream->url());
num_pushed_streams_--;
if (!owned_stream->IsReservedRemote())
num_active_pushed_streams_--;
}
DeleteStream(owned_stream.Pass(), status); DeleteStream(owned_stream.Pass(), status);
MaybeFinishGoingAway(); MaybeFinishGoingAway();
...@@ -2095,6 +2102,23 @@ int SpdySession::OnInitialResponseHeadersReceived( ...@@ -2095,6 +2102,23 @@ int SpdySession::OnInitialResponseHeadersReceived(
SpdyStream* stream) { SpdyStream* stream) {
CHECK(in_io_loop_); CHECK(in_io_loop_);
SpdyStreamId stream_id = stream->stream_id(); SpdyStreamId stream_id = stream->stream_id();
if (stream->type() == SPDY_PUSH_STREAM) {
DCHECK(stream->IsReservedRemote());
if (max_concurrent_pushed_streams_ &&
num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
ResetStream(stream_id,
RST_STREAM_REFUSED_STREAM,
"Stream concurrency limit reached.");
return STATUS_CODE_REFUSED_STREAM;
}
}
if (stream->type() == SPDY_PUSH_STREAM) {
// Will be balanced in DeleteStream.
num_active_pushed_streams_++;
}
// May invalidate |stream|. // May invalidate |stream|.
int rv = stream->OnInitialResponseHeadersReceived( int rv = stream->OnInitialResponseHeadersReceived(
response_headers, response_time, recv_first_byte_time); response_headers, response_time, recv_first_byte_time);
...@@ -2102,6 +2126,7 @@ int SpdySession::OnInitialResponseHeadersReceived( ...@@ -2102,6 +2126,7 @@ int SpdySession::OnInitialResponseHeadersReceived(
DCHECK_NE(rv, ERR_IO_PENDING); DCHECK_NE(rv, ERR_IO_PENDING);
DCHECK(active_streams_.find(stream_id) == active_streams_.end()); DCHECK(active_streams_.find(stream_id) == active_streams_.end());
} }
return rv; return rv;
} }
...@@ -2584,6 +2609,7 @@ bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id, ...@@ -2584,6 +2609,7 @@ bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
active_it->second.stream->OnPushPromiseHeadersReceived(headers); active_it->second.stream->OnPushPromiseHeadersReceived(headers);
DCHECK(active_it->second.stream->IsReservedRemote()); DCHECK(active_it->second.stream->IsReservedRemote());
num_pushed_streams_++;
return true; return true;
} }
......
...@@ -427,10 +427,15 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, ...@@ -427,10 +427,15 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface,
// available for testing and diagnostics. // available for testing and diagnostics.
size_t num_active_streams() const { return active_streams_.size(); } size_t num_active_streams() const { return active_streams_.size(); }
size_t num_unclaimed_pushed_streams() const { size_t num_unclaimed_pushed_streams() const {
return unclaimed_pushed_streams_.size(); return unclaimed_pushed_streams_.size();
} }
size_t num_created_streams() const { return created_streams_.size(); } size_t num_created_streams() const { return created_streams_.size(); }
size_t num_pushed_streams() const { return num_pushed_streams_; }
size_t num_active_pushed_streams() const {
return num_active_pushed_streams_;
}
size_t pending_create_stream_queue_size(RequestPriority priority) const { size_t pending_create_stream_queue_size(RequestPriority priority) const {
DCHECK_GE(priority, MINIMUM_PRIORITY); DCHECK_GE(priority, MINIMUM_PRIORITY);
DCHECK_LE(priority, MAXIMUM_PRIORITY); DCHECK_LE(priority, MAXIMUM_PRIORITY);
...@@ -526,6 +531,11 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, ...@@ -526,6 +531,11 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface,
FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, StreamIdSpaceExhausted); FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, StreamIdSpaceExhausted);
FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, UnstallRacesWithStreamCreation); FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, UnstallRacesWithStreamCreation);
FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, GoAwayOnSessionFlowControlError); FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, GoAwayOnSessionFlowControlError);
FRIEND_TEST_ALL_PREFIXES(SpdySessionTest,
RejectPushedStreamExceedingConcurrencyLimit);
FRIEND_TEST_ALL_PREFIXES(SpdySessionTest, IgnoreReservedRemoteStreamsCount);
FRIEND_TEST_ALL_PREFIXES(SpdySessionTest,
CancelReservedStreamOnHeadersReceived);
typedef std::deque<base::WeakPtr<SpdyStreamRequest> > typedef std::deque<base::WeakPtr<SpdyStreamRequest> >
PendingStreamRequestQueue; PendingStreamRequestQueue;
...@@ -919,6 +929,10 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, ...@@ -919,6 +929,10 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface,
hung_interval_ = duration; hung_interval_ = duration;
} }
void set_max_concurrent_pushed_streams(size_t value) {
max_concurrent_pushed_streams_ = value;
}
int64 pings_in_flight() const { return pings_in_flight_; } int64 pings_in_flight() const { return pings_in_flight_; }
uint32 next_ping_id() const { return next_ping_id_; } uint32 next_ping_id() const { return next_ping_id_; }
...@@ -984,6 +998,16 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, ...@@ -984,6 +998,16 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface,
// |created_streams_| owns all its SpdyStream objects. // |created_streams_| owns all its SpdyStream objects.
CreatedStreamSet created_streams_; CreatedStreamSet created_streams_;
// Number of pushed streams. All active streams are stored in
// |active_streams_|, but it's better to know the number of push streams
// without traversing the whole collection.
size_t num_pushed_streams_;
// Number of active pushed streams in |active_streams_|, i.e. not in reserved
// remote state. Streams in reserved state are not counted towards any
// concurrency limits.
size_t num_active_pushed_streams_;
// The write queue. // The write queue.
SpdyWriteQueue write_queue_; SpdyWriteQueue write_queue_;
...@@ -1022,6 +1046,7 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, ...@@ -1022,6 +1046,7 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface,
// Limits // Limits
size_t max_concurrent_streams_; // 0 if no limit size_t max_concurrent_streams_; // 0 if no limit
size_t max_concurrent_streams_limit_; size_t max_concurrent_streams_limit_;
size_t max_concurrent_pushed_streams_;
// Some statistics counters for the session. // Some statistics counters for the session.
int streams_initiated_count_; int streams_initiated_count_;
......
...@@ -4609,6 +4609,348 @@ TEST_P(SpdySessionTest, SplitHeaders) { ...@@ -4609,6 +4609,348 @@ TEST_P(SpdySessionTest, SplitHeaders) {
EXPECT_EQ(kStreamUrl, request_url); EXPECT_EQ(kStreamUrl, request_url);
} }
// Regression. Sorta. Push streams and client streams were sharing a single
// limit for a long time.
TEST_P(SpdySessionTest, PushedStreamShouldNotCountToClientConcurrencyLimit) {
SettingsMap new_settings;
new_settings[SETTINGS_MAX_CONCURRENT_STREAMS] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 2);
scoped_ptr<SpdyFrame> settings_frame(
spdy_util_.ConstructSpdySettings(new_settings));
scoped_ptr<SpdyFrame> pushed(spdy_util_.ConstructSpdyPush(
NULL, 0, 2, 1, "http://www.google.com/a.dat"));
MockRead reads[] = {
CreateMockRead(*settings_frame), CreateMockRead(*pushed, 3),
MockRead(ASYNC, 0, 4),
};
scoped_ptr<SpdyFrame> settings_ack(spdy_util_.ConstructSpdySettingsAck());
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true));
MockWrite writes[] = {
CreateMockWrite(*settings_ack, 1), CreateMockWrite(*req, 2),
};
DeterministicSocketData data(
reads, arraysize(reads), writes, arraysize(writes));
MockConnect connect_data(SYNCHRONOUS, OK);
data.set_connect_data(connect_data);
session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data);
CreateDeterministicNetworkSession();
base::WeakPtr<SpdySession> session =
CreateInsecureSpdySession(http_session_, key_, BoundNetLog());
// Read the settings frame.
data.RunFor(1);
GURL url1(kDefaultURL);
base::WeakPtr<SpdyStream> spdy_stream1 = CreateStreamSynchronously(
SPDY_REQUEST_RESPONSE_STREAM, session, url1, LOWEST, BoundNetLog());
ASSERT_TRUE(spdy_stream1.get() != NULL);
EXPECT_EQ(0u, spdy_stream1->stream_id());
test::StreamDelegateDoNothing delegate1(spdy_stream1);
spdy_stream1->SetDelegate(&delegate1);
EXPECT_EQ(0u, session->num_active_streams());
EXPECT_EQ(1u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
scoped_ptr<SpdyHeaderBlock> headers(
spdy_util_.ConstructGetHeaderBlock(url1.spec()));
spdy_stream1->SendRequestHeaders(headers.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrlFromHeaders());
// Run until 1st stream is activated.
EXPECT_EQ(0u, delegate1.stream_id());
data.RunFor(2);
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(1u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
// Run until pushed stream is created.
data.RunFor(1);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Second stream should not be stalled, although we have 2 active streams, but
// one of them is push stream and should not be taken into account when we
// create streams on the client.
base::WeakPtr<SpdyStream> spdy_stream2 = CreateStreamSynchronously(
SPDY_REQUEST_RESPONSE_STREAM, session, url1, LOWEST, BoundNetLog());
EXPECT_TRUE(spdy_stream2.get() != NULL);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(1u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Read EOF.
data.RunFor(1);
}
TEST_P(SpdySessionTest, RejectPushedStreamExceedingConcurrencyLimit) {
scoped_ptr<SpdyFrame> push_a(spdy_util_.ConstructSpdyPush(
NULL, 0, 2, 1, "http://www.google.com/a.dat"));
scoped_ptr<SpdyFrame> push_b(spdy_util_.ConstructSpdyPush(
NULL, 0, 4, 1, "http://www.google.com/b.dat"));
MockRead reads[] = {
CreateMockRead(*push_a, 1), CreateMockRead(*push_b, 2),
MockRead(ASYNC, 0, 4),
};
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true));
scoped_ptr<SpdyFrame> rst(
spdy_util_.ConstructSpdyRstStream(4, RST_STREAM_REFUSED_STREAM));
MockWrite writes[] = {
CreateMockWrite(*req, 0), CreateMockWrite(*rst, 3),
};
DeterministicSocketData data(
reads, arraysize(reads), writes, arraysize(writes));
MockConnect connect_data(SYNCHRONOUS, OK);
data.set_connect_data(connect_data);
session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data);
CreateDeterministicNetworkSession();
base::WeakPtr<SpdySession> session =
CreateInsecureSpdySession(http_session_, key_, BoundNetLog());
session->set_max_concurrent_pushed_streams(1);
GURL url1(kDefaultURL);
base::WeakPtr<SpdyStream> spdy_stream1 = CreateStreamSynchronously(
SPDY_REQUEST_RESPONSE_STREAM, session, url1, LOWEST, BoundNetLog());
ASSERT_TRUE(spdy_stream1.get() != NULL);
EXPECT_EQ(0u, spdy_stream1->stream_id());
test::StreamDelegateDoNothing delegate1(spdy_stream1);
spdy_stream1->SetDelegate(&delegate1);
EXPECT_EQ(0u, session->num_active_streams());
EXPECT_EQ(1u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
scoped_ptr<SpdyHeaderBlock> headers(
spdy_util_.ConstructGetHeaderBlock(url1.spec()));
spdy_stream1->SendRequestHeaders(headers.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrlFromHeaders());
// Run until 1st stream is activated.
EXPECT_EQ(0u, delegate1.stream_id());
data.RunFor(1);
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(1u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
// Run until pushed stream is created.
data.RunFor(1);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Reset incoming pushed stream.
data.RunFor(2);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Read EOF.
data.RunFor(1);
}
TEST_P(SpdySessionTest, IgnoreReservedRemoteStreamsCount) {
// Streams in reserved remote state exist only in SPDY4.
if (spdy_util_.spdy_version() < SPDY4)
return;
scoped_ptr<SpdyFrame> push_a(spdy_util_.ConstructSpdyPush(
NULL, 0, 2, 1, "http://www.google.com/a.dat"));
scoped_ptr<SpdyHeaderBlock> push_headers(new SpdyHeaderBlock);
spdy_util_.AddUrlToHeaderBlock("http://www.google.com/b.dat",
push_headers.get());
scoped_ptr<SpdyFrame> push_b(
spdy_util_.ConstructInitialSpdyPushFrame(push_headers.Pass(), 4, 1));
scoped_ptr<SpdyFrame> headers_b(
spdy_util_.ConstructSpdyPushHeaders(4, NULL, 0));
MockRead reads[] = {
CreateMockRead(*push_a, 1), CreateMockRead(*push_b, 2),
CreateMockRead(*headers_b, 3), MockRead(ASYNC, 0, 5),
};
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true));
scoped_ptr<SpdyFrame> rst(
spdy_util_.ConstructSpdyRstStream(4, RST_STREAM_REFUSED_STREAM));
MockWrite writes[] = {
CreateMockWrite(*req, 0), CreateMockWrite(*rst, 4),
};
DeterministicSocketData data(
reads, arraysize(reads), writes, arraysize(writes));
MockConnect connect_data(SYNCHRONOUS, OK);
data.set_connect_data(connect_data);
session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data);
CreateDeterministicNetworkSession();
base::WeakPtr<SpdySession> session =
CreateInsecureSpdySession(http_session_, key_, BoundNetLog());
session->set_max_concurrent_pushed_streams(1);
GURL url1(kDefaultURL);
base::WeakPtr<SpdyStream> spdy_stream1 = CreateStreamSynchronously(
SPDY_REQUEST_RESPONSE_STREAM, session, url1, LOWEST, BoundNetLog());
ASSERT_TRUE(spdy_stream1.get() != NULL);
EXPECT_EQ(0u, spdy_stream1->stream_id());
test::StreamDelegateDoNothing delegate1(spdy_stream1);
spdy_stream1->SetDelegate(&delegate1);
EXPECT_EQ(0u, session->num_active_streams());
EXPECT_EQ(1u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
scoped_ptr<SpdyHeaderBlock> headers(
spdy_util_.ConstructGetHeaderBlock(url1.spec()));
spdy_stream1->SendRequestHeaders(headers.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrlFromHeaders());
// Run until 1st stream is activated.
EXPECT_EQ(0u, delegate1.stream_id());
data.RunFor(1);
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(1u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
// Run until pushed stream is created.
data.RunFor(1);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Accept promised stream. It should not count towards pushed stream limit.
data.RunFor(1);
EXPECT_EQ(3u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(2u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Reset last pushed stream upon headers reception as it is going to be 2nd,
// while we accept only one.
data.RunFor(2);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(1u, session->num_active_pushed_streams());
// Read EOF.
data.RunFor(1);
}
TEST_P(SpdySessionTest, CancelReservedStreamOnHeadersReceived) {
// Streams in reserved remote state exist only in SPDY4.
if (spdy_util_.spdy_version() < SPDY4)
return;
const char kPushedUrl[] = "http://www.google.com/a.dat";
scoped_ptr<SpdyHeaderBlock> push_headers(new SpdyHeaderBlock);
spdy_util_.AddUrlToHeaderBlock(kPushedUrl, push_headers.get());
scoped_ptr<SpdyFrame> push_promise(
spdy_util_.ConstructInitialSpdyPushFrame(push_headers.Pass(), 2, 1));
scoped_ptr<SpdyFrame> headers_frame(
spdy_util_.ConstructSpdyPushHeaders(2, NULL, 0));
MockRead reads[] = {
CreateMockRead(*push_promise, 1), CreateMockRead(*headers_frame, 2),
MockRead(ASYNC, 0, 4),
};
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true));
scoped_ptr<SpdyFrame> rst(
spdy_util_.ConstructSpdyRstStream(2, RST_STREAM_CANCEL));
MockWrite writes[] = {
CreateMockWrite(*req, 0), CreateMockWrite(*rst, 3),
};
DeterministicSocketData data(
reads, arraysize(reads), writes, arraysize(writes));
MockConnect connect_data(SYNCHRONOUS, OK);
data.set_connect_data(connect_data);
session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data);
CreateDeterministicNetworkSession();
base::WeakPtr<SpdySession> session =
CreateInsecureSpdySession(http_session_, key_, BoundNetLog());
GURL url1(kDefaultURL);
base::WeakPtr<SpdyStream> spdy_stream1 = CreateStreamSynchronously(
SPDY_REQUEST_RESPONSE_STREAM, session, url1, LOWEST, BoundNetLog());
ASSERT_TRUE(spdy_stream1.get() != NULL);
EXPECT_EQ(0u, spdy_stream1->stream_id());
test::StreamDelegateDoNothing delegate1(spdy_stream1);
spdy_stream1->SetDelegate(&delegate1);
EXPECT_EQ(0u, session->num_active_streams());
EXPECT_EQ(1u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
scoped_ptr<SpdyHeaderBlock> headers(
spdy_util_.ConstructGetHeaderBlock(url1.spec()));
spdy_stream1->SendRequestHeaders(headers.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrlFromHeaders());
// Run until 1st stream is activated.
EXPECT_EQ(0u, delegate1.stream_id());
data.RunFor(1);
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(1u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
// Run until pushed stream is created.
data.RunFor(1);
EXPECT_EQ(2u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(1u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
base::WeakPtr<SpdyStream> pushed_stream;
int rv =
session->GetPushStream(GURL(kPushedUrl), &pushed_stream, BoundNetLog());
ASSERT_EQ(OK, rv);
ASSERT_TRUE(pushed_stream.get() != NULL);
test::StreamDelegateCloseOnHeaders delegate2(pushed_stream);
pushed_stream->SetDelegate(&delegate2);
// Receive headers for pushed stream. Delegate will cancel the stream, ensure
// that all our counters are in consistent state.
data.RunFor(1);
EXPECT_EQ(1u, session->num_active_streams());
EXPECT_EQ(0u, session->num_created_streams());
EXPECT_EQ(0u, session->num_pushed_streams());
EXPECT_EQ(0u, session->num_active_pushed_streams());
// Read EOF.
data.RunFor(2);
}
TEST(MapFramerErrorToProtocolError, MapsValues) { TEST(MapFramerErrorToProtocolError, MapsValues) {
CHECK_EQ( CHECK_EQ(
SPDY_ERROR_INVALID_CONTROL_FRAME, SPDY_ERROR_INVALID_CONTROL_FRAME,
......
...@@ -141,6 +141,21 @@ void StreamDelegateWithBody::OnRequestHeadersSent() { ...@@ -141,6 +141,21 @@ void StreamDelegateWithBody::OnRequestHeadersSent() {
stream()->SendData(buf_.get(), buf_->size(), NO_MORE_DATA_TO_SEND); stream()->SendData(buf_.get(), buf_->size(), NO_MORE_DATA_TO_SEND);
} }
StreamDelegateCloseOnHeaders::StreamDelegateCloseOnHeaders(
const base::WeakPtr<SpdyStream>& stream)
: StreamDelegateBase(stream) {
}
StreamDelegateCloseOnHeaders::~StreamDelegateCloseOnHeaders() {
}
SpdyResponseHeadersStatus
StreamDelegateCloseOnHeaders::OnResponseHeadersUpdated(
const SpdyHeaderBlock& response_headers) {
stream()->Cancel();
return RESPONSE_HEADERS_ARE_COMPLETE;
}
} // namespace test } // namespace test
} // namespace net } // namespace net
...@@ -120,6 +120,16 @@ class StreamDelegateWithBody : public StreamDelegateBase { ...@@ -120,6 +120,16 @@ class StreamDelegateWithBody : public StreamDelegateBase {
scoped_refptr<StringIOBuffer> buf_; scoped_refptr<StringIOBuffer> buf_;
}; };
// Test delegate that closes stream in OnResponseHeadersUpdated().
class StreamDelegateCloseOnHeaders : public StreamDelegateBase {
public:
StreamDelegateCloseOnHeaders(const base::WeakPtr<SpdyStream>& stream);
virtual ~StreamDelegateCloseOnHeaders();
virtual SpdyResponseHeadersStatus OnResponseHeadersUpdated(
const SpdyHeaderBlock& response_headers) OVERRIDE;
};
} // namespace test } // namespace test
} // namespace net } // namespace net
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment