Commit 34e0cd9a authored by Yuwei Huang's avatar Yuwei Huang Committed by Commit Bot

[remoting] Add stream request parser

This CL adds a stream request parser to parse StreamBody messages sent by
a server that handles protobuf over HTTP. It will later be used by
ProtobufHttpClient to handle stream request.

It also makes ProtobufHttpStatus something similar to grpc::Status, that
has a unified code and message, as the error code can now come from three
different places.

Bug: 1103416
Change-Id: I62c4d9c74cc17a9b10244fa1013372308ec9d4a0
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2305113
Commit-Queue: Yuwei Huang <yuweih@chromium.org>
Reviewed-by: default avatarJoe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#790063}
parent a64933b5
...@@ -4,6 +4,10 @@ ...@@ -4,6 +4,10 @@
import("//third_party/protobuf/proto_library.gni") import("//third_party/protobuf/proto_library.gni")
proto_library("protobuf_http_client_messages_proto") {
sources = [ "protobuf_http_client_messages.proto" ]
}
source_set("base") { source_set("base") {
sources = [ sources = [
"auto_thread.cc", "auto_thread.cc",
...@@ -31,6 +35,8 @@ source_set("base") { ...@@ -31,6 +35,8 @@ source_set("base") {
"protobuf_http_request.h", "protobuf_http_request.h",
"protobuf_http_status.cc", "protobuf_http_status.cc",
"protobuf_http_status.h", "protobuf_http_status.h",
"protobuf_http_stream_parser.cc",
"protobuf_http_stream_parser.h",
"rate_counter.cc", "rate_counter.cc",
"rate_counter.h", "rate_counter.h",
"result.h", "result.h",
...@@ -72,6 +78,7 @@ source_set("base") { ...@@ -72,6 +78,7 @@ source_set("base") {
"//third_party/protobuf:protobuf_lite", "//third_party/protobuf:protobuf_lite",
] ]
deps = [ deps = [
":protobuf_http_client_messages_proto",
"//base/third_party/dynamic_annotations", "//base/third_party/dynamic_annotations",
"//google_apis", "//google_apis",
"//remoting/base/grpc_support", "//remoting/base/grpc_support",
...@@ -170,6 +177,7 @@ source_set("unit_tests") { ...@@ -170,6 +177,7 @@ source_set("unit_tests") {
"leaky_bucket_unittest.cc", "leaky_bucket_unittest.cc",
"oauth_token_getter_proxy_unittest.cc", "oauth_token_getter_proxy_unittest.cc",
"protobuf_http_client_unittest.cc", "protobuf_http_client_unittest.cc",
"protobuf_http_stream_parser_unittest.cc",
"rate_counter_unittest.cc", "rate_counter_unittest.cc",
"result_unittest.cc", "result_unittest.cc",
"rsa_key_pair_unittest.cc", "rsa_key_pair_unittest.cc",
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
package remoting.protobufhttpclient;
message Status {
int32 code = 1;
string message = 2;
}
message StreamBody {
repeated bytes messages = 1;
Status status = 2;
repeated bytes noop = 15;
}
...@@ -44,12 +44,8 @@ constexpr char kAuthorizationHeaderKey[] = "Authorization"; ...@@ -44,12 +44,8 @@ constexpr char kAuthorizationHeaderKey[] = "Authorization";
constexpr char kFakeAccessToken[] = "fake_access_token"; constexpr char kFakeAccessToken[] = "fake_access_token";
constexpr char kFakeAccessTokenHeaderValue[] = "Bearer fake_access_token"; constexpr char kFakeAccessTokenHeaderValue[] = "Bearer fake_access_token";
MATCHER_P(HasStatusCode, status_code, "") { MATCHER_P(HasErrorCode, error_code, "") {
return arg.http_status_code() == status_code; return arg.error_code() == error_code;
}
MATCHER_P(HasNetError, net_error, "") {
return arg.net_error() == net_error;
} }
MATCHER(IsResponseText, "") { MATCHER(IsResponseText, "") {
...@@ -108,7 +104,7 @@ TEST_F(ProtobufHttpClientTest, SendRequestAndDecodeResponse) { ...@@ -108,7 +104,7 @@ TEST_F(ProtobufHttpClientTest, SendRequestAndDecodeResponse) {
MockEchoResponseCallback response_callback; MockEchoResponseCallback response_callback;
EXPECT_CALL(response_callback, EXPECT_CALL(response_callback,
Run(HasStatusCode(net::HTTP_OK), IsResponseText())) Run(HasErrorCode(ProtobufHttpStatus::Code::OK), IsResponseText()))
.WillOnce([&]() { run_loop.Quit(); }); .WillOnce([&]() { run_loop.Quit(); });
auto request = CreateDefaultTestRequest(); auto request = CreateDefaultTestRequest();
...@@ -163,7 +159,8 @@ TEST_F(ProtobufHttpClientTest, ...@@ -163,7 +159,8 @@ TEST_F(ProtobufHttpClientTest,
MockEchoResponseCallback response_callback; MockEchoResponseCallback response_callback;
EXPECT_CALL(response_callback, EXPECT_CALL(response_callback,
Run(HasStatusCode(net::HTTP_UNAUTHORIZED), IsNullResponse())) Run(HasErrorCode(ProtobufHttpStatus::Code::UNAUTHENTICATED),
IsNullResponse()))
.WillOnce([&]() { run_loop.Quit(); }); .WillOnce([&]() { run_loop.Quit(); });
auto request = CreateDefaultTestRequest(); auto request = CreateDefaultTestRequest();
...@@ -183,7 +180,7 @@ TEST_F(ProtobufHttpClientTest, FailedToParseResponse_GetsInvalidResponseError) { ...@@ -183,7 +180,7 @@ TEST_F(ProtobufHttpClientTest, FailedToParseResponse_GetsInvalidResponseError) {
MockEchoResponseCallback response_callback; MockEchoResponseCallback response_callback;
EXPECT_CALL( EXPECT_CALL(
response_callback, response_callback,
Run(HasNetError(net::Error::ERR_INVALID_RESPONSE), IsNullResponse())) Run(HasErrorCode(ProtobufHttpStatus::Code::INTERNAL), IsNullResponse()))
.WillOnce([&]() { run_loop.Quit(); }); .WillOnce([&]() { run_loop.Quit(); });
auto request = CreateDefaultTestRequest(); auto request = CreateDefaultTestRequest();
...@@ -203,7 +200,8 @@ TEST_F(ProtobufHttpClientTest, ServerRespondsWithError) { ...@@ -203,7 +200,8 @@ TEST_F(ProtobufHttpClientTest, ServerRespondsWithError) {
MockEchoResponseCallback response_callback; MockEchoResponseCallback response_callback;
EXPECT_CALL(response_callback, EXPECT_CALL(response_callback,
Run(HasStatusCode(net::HTTP_UNAUTHORIZED), IsNullResponse())) Run(HasErrorCode(ProtobufHttpStatus::Code::UNAUTHENTICATED),
IsNullResponse()))
.WillOnce([&]() { run_loop.Quit(); }); .WillOnce([&]() { run_loop.Quit(); });
auto request = CreateDefaultTestRequest(); auto request = CreateDefaultTestRequest();
......
...@@ -8,29 +8,88 @@ ...@@ -8,29 +8,88 @@
namespace remoting { namespace remoting {
const ProtobufHttpStatus& ProtobufHttpStatus::OK = namespace {
ProtobufHttpStatus(net::HttpStatusCode::HTTP_OK);
ProtobufHttpStatus::ProtobufHttpStatus(net::HttpStatusCode http_status_code) ProtobufHttpStatus::Code HttpStatusCodeToClientCode(
: http_status_code_(http_status_code), net::HttpStatusCode http_status_code) {
net_error_(net::Error::ERR_HTTP_RESPONSE_CODE_FAILURE), DCHECK_LT(0, http_status_code);
error_message_(net::GetHttpReasonPhrase(http_status_code)) { switch (http_status_code) {
DCHECK_LE(0, http_status_code) << "Invalid http status code"; case net::HttpStatusCode::HTTP_OK:
return ProtobufHttpStatus::Code::OK;
case net::HttpStatusCode::HTTP_BAD_REQUEST:
return ProtobufHttpStatus::Code::INVALID_ARGUMENT;
case net::HttpStatusCode::HTTP_GATEWAY_TIMEOUT:
case net::HttpStatusCode::HTTP_REQUEST_TIMEOUT:
return ProtobufHttpStatus::Code::DEADLINE_EXCEEDED;
case net::HttpStatusCode::HTTP_NOT_FOUND:
return ProtobufHttpStatus::Code::NOT_FOUND;
case net::HttpStatusCode::HTTP_CONFLICT:
return ProtobufHttpStatus::Code::ALREADY_EXISTS;
case net::HttpStatusCode::HTTP_FORBIDDEN:
return ProtobufHttpStatus::Code::PERMISSION_DENIED;
case net::HttpStatusCode::HTTP_UNAUTHORIZED:
return ProtobufHttpStatus::Code::UNAUTHENTICATED;
case net::HttpStatusCode::HTTP_TOO_MANY_REQUESTS:
return ProtobufHttpStatus::Code::RESOURCE_EXHAUSTED;
case net::HttpStatusCode::HTTP_PRECONDITION_FAILED:
return ProtobufHttpStatus::Code::FAILED_PRECONDITION;
case net::HttpStatusCode::HTTP_NOT_IMPLEMENTED:
return ProtobufHttpStatus::Code::UNIMPLEMENTED;
case net::HttpStatusCode::HTTP_INTERNAL_SERVER_ERROR:
return ProtobufHttpStatus::Code::INTERNAL;
case net::HttpStatusCode::HTTP_SERVICE_UNAVAILABLE:
return ProtobufHttpStatus::Code::UNAVAILABLE;
default:
return ProtobufHttpStatus::Code::UNKNOWN;
}
} }
ProtobufHttpStatus::ProtobufHttpStatus(net::Error net_error) ProtobufHttpStatus::Code NetErrorToClientCode(net::Error net_error) {
: http_status_code_(-1), DCHECK_GT(0, net_error);
net_error_(net_error),
error_message_(net::ErrorToString(net_error)) {
DCHECK_NE(net::Error::OK, net_error) << "Use the HttpStatusCode overload";
DCHECK_NE(net::Error::ERR_HTTP_RESPONSE_CODE_FAILURE, net_error) DCHECK_NE(net::Error::ERR_HTTP_RESPONSE_CODE_FAILURE, net_error)
<< "Use the HttpStatusCode overload"; << "Use the HttpStatusCode overload";
switch (net_error) {
case net::Error::OK:
return ProtobufHttpStatus::Code::OK;
case net::Error::ERR_INVALID_ARGUMENT:
return ProtobufHttpStatus::Code::INVALID_ARGUMENT;
case net::Error::ERR_CONNECTION_TIMED_OUT:
case net::Error::ERR_TIMED_OUT:
return ProtobufHttpStatus::Code::DEADLINE_EXCEEDED;
case net::Error::ERR_INVALID_AUTH_CREDENTIALS:
return ProtobufHttpStatus::Code::PERMISSION_DENIED;
case net::Error::ERR_MISSING_AUTH_CREDENTIALS:
return ProtobufHttpStatus::Code::UNAUTHENTICATED;
case net::Error::ERR_NOT_IMPLEMENTED:
return ProtobufHttpStatus::Code::UNIMPLEMENTED;
case net::Error::ERR_INVALID_RESPONSE:
return ProtobufHttpStatus::Code::INTERNAL;
default:
return ProtobufHttpStatus::Code::UNKNOWN;
}
} }
} // namespace
const ProtobufHttpStatus& ProtobufHttpStatus::OK =
ProtobufHttpStatus(Code::OK, "OK");
ProtobufHttpStatus::ProtobufHttpStatus(net::HttpStatusCode http_status_code)
: error_code_(HttpStatusCodeToClientCode(http_status_code)),
error_message_(net::GetHttpReasonPhrase(http_status_code)) {}
ProtobufHttpStatus::ProtobufHttpStatus(net::Error net_error)
: error_code_(NetErrorToClientCode(net_error)),
error_message_(net::ErrorToString(net_error)) {}
ProtobufHttpStatus::ProtobufHttpStatus(Code code,
const std::string& error_message)
: error_code_(code), error_message_(error_message) {}
ProtobufHttpStatus::~ProtobufHttpStatus() = default; ProtobufHttpStatus::~ProtobufHttpStatus() = default;
bool ProtobufHttpStatus::ok() const { bool ProtobufHttpStatus::ok() const {
return http_status_code_ == net::HttpStatusCode::HTTP_OK; return error_code_ == Code::OK;
} }
} // namespace remoting } // namespace remoting
...@@ -14,30 +14,46 @@ namespace remoting { ...@@ -14,30 +14,46 @@ namespace remoting {
class ProtobufHttpStatus { class ProtobufHttpStatus {
public: public:
// This is the same as the gRPC status code.
enum class Code : int {
OK = 0,
CANCELLED = 1,
UNKNOWN = 2,
INVALID_ARGUMENT = 3,
DEADLINE_EXCEEDED = 4,
NOT_FOUND = 5,
ALREADY_EXISTS = 6,
PERMISSION_DENIED = 7,
UNAUTHENTICATED = 16,
RESOURCE_EXHAUSTED = 8,
FAILED_PRECONDITION = 9,
ABORTED = 10,
OUT_OF_RANGE = 11,
UNIMPLEMENTED = 12,
INTERNAL = 13,
UNAVAILABLE = 14,
DATA_LOSS = 15,
};
// An OK pre-defined instance. // An OK pre-defined instance.
static const ProtobufHttpStatus& OK; static const ProtobufHttpStatus& OK;
explicit ProtobufHttpStatus(net::HttpStatusCode http_status_code); explicit ProtobufHttpStatus(net::HttpStatusCode http_status_code);
explicit ProtobufHttpStatus(net::Error net_error); explicit ProtobufHttpStatus(net::Error net_error);
explicit ProtobufHttpStatus(Code code, const std::string& error_message);
~ProtobufHttpStatus(); ~ProtobufHttpStatus();
// Indicates whether the http request was successful based on the status code. // Indicates whether the http request was successful based on the status code.
bool ok() const; bool ok() const;
// The http status code, or -1 if the request fails to make, in which case // The instance's error code.
// the underlying error can be found by calling net_error(). Code error_code() const { return error_code_; }
int http_status_code() const { return http_status_code_; }
// The net error. If the error is ERR_HTTP_RESPONSE_CODE_FAILURE, the status
// code can be retrieved by calling http_status_code().
net::Error net_error() const { return net_error_; }
// The message that describes the error. // The message that describes the error.
const std::string& error_message() const { return error_message_; } const std::string& error_message() const { return error_message_; }
private: private:
int http_status_code_; Code error_code_;
net::Error net_error_;
std::string error_message_; std::string error_message_;
}; };
......
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/base/protobuf_http_stream_parser.h"
#include <string.h>
#include "base/logging.h"
#include "net/base/io_buffer.h"
#include "remoting/base/protobuf_http_client_messages.pb.h"
#include "remoting/base/protobuf_http_status.h"
#include "third_party/protobuf/src/google/protobuf/io/coded_stream.h"
#include "third_party/protobuf/src/google/protobuf/wire_format_lite.h"
namespace remoting {
namespace {
using ::google::protobuf::internal::WireFormatLite;
constexpr int kReadBufferSpareCapacity = 512;
} // namespace
ProtobufHttpStreamParser::ProtobufHttpStreamParser(
const MessageCallback& message_callback,
StreamClosedCallback stream_closed_callback)
: message_callback_(message_callback),
stream_closed_callback_(std::move(stream_closed_callback)) {
DCHECK(message_callback_);
DCHECK(stream_closed_callback_);
}
ProtobufHttpStreamParser::~ProtobufHttpStreamParser() = default;
void ProtobufHttpStreamParser::Append(base::StringPiece data) {
int required_remaining_capacity = data.size() + kReadBufferSpareCapacity;
if (!read_buffer_) {
read_buffer_ = base::MakeRefCounted<net::GrowableIOBuffer>();
read_buffer_->SetCapacity(required_remaining_capacity);
} else if (read_buffer_->RemainingCapacity() < required_remaining_capacity) {
read_buffer_->SetCapacity(read_buffer_->offset() +
required_remaining_capacity);
}
DCHECK_GE(read_buffer_->RemainingCapacity(), static_cast<int>(data.size()));
memcpy(read_buffer_->data(), data.data(), data.size());
read_buffer_->set_offset(read_buffer_->offset() + data.size());
ParseStreamIfAvailable();
}
bool ProtobufHttpStreamParser::HasPendingData() const {
return read_buffer_ && read_buffer_->offset() > 0;
}
void ProtobufHttpStreamParser::ParseStreamIfAvailable() {
DCHECK(read_buffer_);
google::protobuf::io::CodedInputStream input_stream(
reinterpret_cast<const uint8_t*>(read_buffer_->StartOfBuffer()),
read_buffer_->offset());
int bytes_consumed = 0;
auto weak_this = weak_factory_.GetWeakPtr();
// We can't use StreamBody::ParseFromString() here, as it can't do partial
// parsing, nor can it tell how many bytes are consumed.
while (bytes_consumed < read_buffer_->offset()) {
bool is_successful = ParseOneField(&input_stream);
if (!weak_this) {
// The callback might have deleted |this|, in which case we need to
// carefully return without touching any member of |this|.
return;
}
if (is_successful) {
// Only update |bytes_consumed| if the whole field is decoded.
// |input_stream| can still advance when the field is not decodable.
bytes_consumed = input_stream.CurrentPosition();
} else {
// The stream data can't be fully decoded yet.
break;
}
}
if (bytes_consumed == 0) {
return;
}
CHECK_LE(bytes_consumed, read_buffer_->offset());
int bytes_not_consumed = read_buffer_->offset() - bytes_consumed;
memmove(read_buffer_->StartOfBuffer(),
read_buffer_->StartOfBuffer() + bytes_consumed, bytes_not_consumed);
read_buffer_->set_offset(bytes_not_consumed);
}
bool ProtobufHttpStreamParser::ParseOneField(
google::protobuf::io::CodedInputStream* input_stream) {
// Note that the StreamBody definition is only significant in its tag ID
// allocations for "messages" and "status". There isn't any clear boundary
// between two StreamBody instances.
//
// A typical stream looks like:
//
// [message tag] <length> <message> [message tag] <length> <message> ...
// [status tag] <status> EOF
//
// Stream data failing to comply with this format usually means more data is
// needed.
uint32_t message_tag = input_stream->ReadTag();
if (message_tag == 0) {
VLOG(1) << "Can't read message tag yet.";
return false;
}
WireFormatLite::WireType wire_type =
WireFormatLite::GetTagWireType(message_tag);
int field_number = WireFormatLite::GetTagFieldNumber(message_tag);
switch (field_number) {
case protobufhttpclient::StreamBody::kMessagesFieldNumber: {
DCHECK_EQ(WireFormatLite::WireType::WIRETYPE_LENGTH_DELIMITED, wire_type);
std::string message;
if (!WireFormatLite::ReadBytes(input_stream, &message)) {
VLOG(1) << "Can't read stream message yet.";
return false;
}
VLOG(1) << "Stream message decoded.";
message_callback_.Run(message);
break;
}
case protobufhttpclient::StreamBody::kStatusFieldNumber: {
DCHECK_EQ(WireFormatLite::WireType::WIRETYPE_LENGTH_DELIMITED, wire_type);
protobufhttpclient::Status status;
if (!WireFormatLite::ReadMessage(input_stream, &status)) {
VLOG(1) << "Can't read status yet.";
return false;
}
VLOG(1) << "Client status decoded.";
ProtobufHttpStatus client_status(
static_cast<ProtobufHttpStatus::Code>(status.code()),
status.message());
std::move(stream_closed_callback_).Run(client_status);
break;
}
default:
if (field_number == protobufhttpclient::StreamBody::kNoopFieldNumber) {
VLOG(1) << "Found noop field.";
} else {
LOG(WARNING) << "Skipping unrecognized StreamBody field: "
<< field_number
<< ", wire type: " << static_cast<int>(wire_type);
}
if (!WireFormatLite::SkipField(input_stream, message_tag)) {
VLOG(1) << "Can't skip the field yet.";
return false;
}
break;
}
return true;
}
} // namespace remoting
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_BASE_PROTOBUF_HTTP_STREAM_PARSER_H_
#define REMOTING_BASE_PROTOBUF_HTTP_STREAM_PARSER_H_
#include "base/callback.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/strings/string_piece_forward.h"
namespace google {
namespace protobuf {
namespace io {
class CodedInputStream;
} // namespace io
} // namespace protobuf
} // namespace google
namespace net {
class GrowableIOBuffer;
} // namespace net
namespace remoting {
class ProtobufHttpStatus;
// Class to parse incoming stream data wrapped with a StreamBody protobuf
// message.
class ProtobufHttpStreamParser final {
public:
using MessageCallback = base::RepeatingCallback<void(const std::string&)>;
using StreamClosedCallback =
base::OnceCallback<void(const ProtobufHttpStatus&)>;
ProtobufHttpStreamParser(const MessageCallback& message_callback,
StreamClosedCallback stream_closed_callback);
~ProtobufHttpStreamParser();
ProtobufHttpStreamParser(const ProtobufHttpStreamParser&) = delete;
ProtobufHttpStreamParser& operator=(const ProtobufHttpStreamParser&) = delete;
// Appends the stream data (which should be the partial or full serialized
// StreamBody) and runs callbacks if there is something decodable.
void Append(base::StringPiece data);
// Indicates whether the parser has pending data that needs more input to
// complete a StreamBody message.
bool HasPendingData() const;
private:
void ParseStreamIfAvailable();
bool ParseOneField(google::protobuf::io::CodedInputStream* input_stream);
MessageCallback message_callback_;
StreamClosedCallback stream_closed_callback_;
scoped_refptr<net::GrowableIOBuffer> read_buffer_;
base::WeakPtrFactory<ProtobufHttpStreamParser> weak_factory_{this};
};
} // namespace remoting
#endif // REMOTING_BASE_PROTOBUF_HTTP_STREAM_PARSER_H_
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/base/protobuf_http_stream_parser.h"
#include <memory>
#include <string>
#include "base/test/mock_callback.h"
#include "remoting/base/protobuf_http_client_messages.pb.h"
#include "remoting/base/protobuf_http_status.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace remoting {
namespace {
using ::testing::_;
constexpr char kFirstTestMessage[] = "This is the first message.";
constexpr char kSecondTestMessage[] = "This is the second message.";
constexpr char kThirdTestMessage[] = "This is the third message.";
MATCHER(IsCancelStatus, "") {
return arg.error_code() == ProtobufHttpStatus::Code::CANCELLED &&
arg.error_message() == "Cancelled";
}
protobufhttpclient::StreamBody CreateDefaultStream() {
protobufhttpclient::StreamBody stream_body;
stream_body.add_messages(kFirstTestMessage);
stream_body.add_messages(kSecondTestMessage);
stream_body.add_messages(kThirdTestMessage);
return stream_body;
}
std::string CreateDefaultStreamData() {
return CreateDefaultStream().SerializeAsString();
}
void SplitStringInMiddle(const std::string& input,
std::string* str_1,
std::string* str_2) {
size_t message_split_pos = input.size() / 2;
*str_1 = input.substr(0, message_split_pos);
*str_2 = input.substr(message_split_pos);
}
} // namespace
class ProtobufHttpStreamParserTest : public testing::Test {
protected:
void ExpectReceiveDefaultStreamData();
base::MockCallback<ProtobufHttpStreamParser::MessageCallback>
message_callback_;
base::MockCallback<ProtobufHttpStreamParser::StreamClosedCallback>
stream_closed_callback_;
ProtobufHttpStreamParser stream_parser_{message_callback_.Get(),
stream_closed_callback_.Get()};
};
void ProtobufHttpStreamParserTest::ExpectReceiveDefaultStreamData() {
EXPECT_CALL(message_callback_, Run(kFirstTestMessage));
EXPECT_CALL(message_callback_, Run(kSecondTestMessage));
EXPECT_CALL(message_callback_, Run(kThirdTestMessage));
}
TEST_F(ProtobufHttpStreamParserTest, ParseStreamBodyInOneShot) {
ExpectReceiveDefaultStreamData();
ASSERT_FALSE(stream_parser_.HasPendingData());
stream_parser_.Append(CreateDefaultStreamData());
ASSERT_FALSE(stream_parser_.HasPendingData());
}
TEST_F(ProtobufHttpStreamParserTest, ParseSplitStreamBody) {
ExpectReceiveDefaultStreamData();
std::string stream_data = CreateDefaultStreamData();
std::string data_1, data_2;
SplitStringInMiddle(stream_data, &data_1, &data_2);
stream_parser_.Append(data_1);
ASSERT_TRUE(stream_parser_.HasPendingData());
stream_parser_.Append(data_2);
ASSERT_FALSE(stream_parser_.HasPendingData());
}
TEST_F(ProtobufHttpStreamParserTest, CloseStreamWithCancelled) {
EXPECT_CALL(stream_closed_callback_, Run(IsCancelStatus()));
protobufhttpclient::StreamBody stream_body;
stream_body.mutable_status()->set_code(
static_cast<int>(ProtobufHttpStatus::Code::CANCELLED));
stream_body.mutable_status()->set_message("Cancelled");
stream_parser_.Append(stream_body.SerializeAsString());
}
TEST_F(ProtobufHttpStreamParserTest, ParseStreamBodyWithNoops_NoopsIgnored) {
ExpectReceiveDefaultStreamData();
protobufhttpclient::StreamBody stream_body = CreateDefaultStream();
stream_body.add_noop("111111111111111");
stream_body.add_noop("111111111111111");
stream_body.add_noop("111111111111111");
stream_parser_.Append(stream_body.SerializeAsString());
ASSERT_FALSE(stream_parser_.HasPendingData());
}
TEST_F(ProtobufHttpStreamParserTest,
ParseSplitStreamBodyWithOnlyNoops_NoopsIgnored) {
protobufhttpclient::StreamBody stream_body;
stream_body.add_noop("111111111111111");
stream_body.add_noop("111111111111111");
stream_body.add_noop("111111111111111");
std::string stream_data = stream_body.SerializeAsString();
std::string data_1, data_2;
SplitStringInMiddle(stream_data, &data_1, &data_2);
stream_parser_.Append(data_1);
ASSERT_TRUE(stream_parser_.HasPendingData());
stream_parser_.Append(data_2);
ASSERT_FALSE(stream_parser_.HasPendingData());
}
} // namespace remoting
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment