Commit d69c427c authored by James Vecore's avatar James Vecore Committed by Commit Bot

[Nearby] Wait for FastPathReady before sending msgs

We now wait for the FastPathReady message before returning the result
for the StartReceivingMessages mojo call.

Fixed: 1134212
Change-Id: Ic1b64421544a90a7ae82b0d11f69edf64c2d1021
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2506391Reviewed-by: default avatarAlex Gough <ajgo@chromium.org>
Reviewed-by: default avatarRyan Hansberry <hansberry@chromium.org>
Commit-Queue: James Vecore <vecore@google.com>
Cr-Commit-Position: refs/heads/master@{#822061}
parent 88f545d1
...@@ -78,7 +78,9 @@ void ReceiveMessagesExpress::StartReceivingMessages( ...@@ -78,7 +78,9 @@ void ReceiveMessagesExpress::StartReceivingMessages(
} }
success_callback_ = std::move(callback); success_callback_ = std::move(callback);
url_loader_.reset(); url_loader_.reset();
stream_parser_ = std::make_unique<StreamParser>(listener); stream_parser_ = std::make_unique<StreamParser>(
listener, base::BindOnce(&ReceiveMessagesExpress::OnFastPathReady,
base::Unretained(this)));
token_fetcher_->GetAccessToken( token_fetcher_->GetAccessToken(
base::BindOnce(&ReceiveMessagesExpress::DoStartReceivingMessages, base::BindOnce(&ReceiveMessagesExpress::DoStartReceivingMessages,
...@@ -124,9 +126,6 @@ void ReceiveMessagesExpress::StopReceivingMessages() { ...@@ -124,9 +126,6 @@ void ReceiveMessagesExpress::StopReceivingMessages() {
void ReceiveMessagesExpress::OnDataReceived(base::StringPiece data, void ReceiveMessagesExpress::OnDataReceived(base::StringPiece data,
base::OnceClosure resume) { base::OnceClosure resume) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!success_callback_.is_null())
std::move(success_callback_).Run(true);
stream_parser_->Append(data); stream_parser_->Append(data);
std::move(resume).Run(); std::move(resume).Run();
} }
...@@ -143,3 +142,9 @@ void ReceiveMessagesExpress::OnComplete(bool success) { ...@@ -143,3 +142,9 @@ void ReceiveMessagesExpress::OnComplete(bool success) {
void ReceiveMessagesExpress::OnRetry(base::OnceClosure start_retry) { void ReceiveMessagesExpress::OnRetry(base::OnceClosure start_retry) {
NOTIMPLEMENTED(); NOTIMPLEMENTED();
} }
void ReceiveMessagesExpress::OnFastPathReady() {
if (!success_callback_.is_null()) {
std::move(success_callback_).Run(true);
}
}
...@@ -63,6 +63,8 @@ class ReceiveMessagesExpress : public network::SimpleURLLoaderStreamConsumer { ...@@ -63,6 +63,8 @@ class ReceiveMessagesExpress : public network::SimpleURLLoaderStreamConsumer {
void OnComplete(bool success) override; void OnComplete(bool success) override;
void OnRetry(base::OnceClosure start_retry) override; void OnRetry(base::OnceClosure start_retry) override;
void OnFastPathReady();
TokenFetcher* token_fetcher_; TokenFetcher* token_fetcher_;
scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory_; scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory_;
std::unique_ptr<network::SimpleURLLoader> url_loader_; std::unique_ptr<network::SimpleURLLoader> url_loader_;
......
...@@ -30,9 +30,21 @@ CreateReceiveMessagesResponse(const std::string& msg) { ...@@ -30,9 +30,21 @@ CreateReceiveMessagesResponse(const std::string& msg) {
return response; return response;
} }
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
CreateFastPathReadyResponse() {
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
response;
response.mutable_fast_path_ready();
return response;
}
chrome_browser_nearby_sharing_instantmessaging::StreamBody BuildResponseProto( chrome_browser_nearby_sharing_instantmessaging::StreamBody BuildResponseProto(
const std::vector<std::string>& messages) { const std::vector<std::string>& messages,
bool include_fast_path_ready = true) {
chrome_browser_nearby_sharing_instantmessaging::StreamBody stream_body; chrome_browser_nearby_sharing_instantmessaging::StreamBody stream_body;
if (include_fast_path_ready) {
stream_body.add_messages(CreateFastPathReadyResponse().SerializeAsString());
}
for (const auto& msg : messages) { for (const auto& msg : messages) {
stream_body.add_messages( stream_body.add_messages(
CreateReceiveMessagesResponse(msg).SerializeAsString()); CreateReceiveMessagesResponse(msg).SerializeAsString());
......
...@@ -5,10 +5,13 @@ ...@@ -5,10 +5,13 @@
#include "chrome/browser/nearby_sharing/instantmessaging/stream_parser.h" #include "chrome/browser/nearby_sharing/instantmessaging/stream_parser.h"
#include "chrome/browser/nearby_sharing/instantmessaging/proto/instantmessaging.pb.h" #include "chrome/browser/nearby_sharing/instantmessaging/proto/instantmessaging.pb.h"
#include "chrome/browser/nearby_sharing/logging/logging.h"
StreamParser::StreamParser( StreamParser::StreamParser(
base::RepeatingCallback<void(const std::string& message)> listener) base::RepeatingCallback<void(const std::string& message)> listener,
: listener_(listener) {} base::OnceClosure fastpath_ready_callback)
: listener_(listener),
fastpath_ready_callback_(std::move(fastpath_ready_callback)) {}
StreamParser::~StreamParser() = default; StreamParser::~StreamParser() = default;
void StreamParser::Append(base::StringPiece data) { void StreamParser::Append(base::StringPiece data) {
...@@ -73,9 +76,22 @@ void StreamParser::DelegateMessage( ...@@ -73,9 +76,22 @@ void StreamParser::DelegateMessage(
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
response; response;
response.ParseFromString(stream_body.messages(i)); response.ParseFromString(stream_body.messages(i));
if (response.body_case() != chrome_browser_nearby_sharing_instantmessaging:: switch (response.body_case()) {
ReceiveMessagesResponse::kInboxMessage) case chrome_browser_nearby_sharing_instantmessaging::
continue; ReceiveMessagesResponse::kFastPathReady:
listener_.Run(response.inbox_message().message()); NS_LOG(INFO) << __func__ << ": received kFastPathReady";
if (fastpath_ready_callback_) {
std::move(fastpath_ready_callback_).Run();
}
break;
case chrome_browser_nearby_sharing_instantmessaging::
ReceiveMessagesResponse::kInboxMessage:
listener_.Run(response.inbox_message().message());
break;
default:
NS_LOG(ERROR) << __func__ << ": message body case was unexpected: "
<< response.body_case();
NOTREACHED();
}
} }
} }
...@@ -20,7 +20,8 @@ class StreamBody; ...@@ -20,7 +20,8 @@ class StreamBody;
class StreamParser { class StreamParser {
public: public:
explicit StreamParser( explicit StreamParser(
base::RepeatingCallback<void(const std::string& message)> listener); base::RepeatingCallback<void(const std::string& message)> listener,
base::OnceClosure fastpath_ready);
~StreamParser(); ~StreamParser();
void Append(base::StringPiece data); void Append(base::StringPiece data);
...@@ -33,6 +34,7 @@ class StreamParser { ...@@ -33,6 +34,7 @@ class StreamParser {
stream_body); stream_body);
base::RepeatingCallback<void(const std::string& message)> listener_; base::RepeatingCallback<void(const std::string& message)> listener_;
base::OnceClosure fastpath_ready_callback_;
std::string data_; std::string data_;
}; };
......
...@@ -21,9 +21,21 @@ CreateReceiveMessagesResponse(const std::string& msg) { ...@@ -21,9 +21,21 @@ CreateReceiveMessagesResponse(const std::string& msg) {
return response; return response;
} }
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
CreateFastPathReadyResponse() {
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
response;
response.mutable_fast_path_ready();
return response;
}
chrome_browser_nearby_sharing_instantmessaging::StreamBody BuildProto( chrome_browser_nearby_sharing_instantmessaging::StreamBody BuildProto(
const std::vector<std::string>& messages) { const std::vector<std::string>& messages,
bool include_fast_path = false) {
chrome_browser_nearby_sharing_instantmessaging::StreamBody stream_body; chrome_browser_nearby_sharing_instantmessaging::StreamBody stream_body;
if (include_fast_path) {
stream_body.add_messages(CreateFastPathReadyResponse().SerializeAsString());
}
for (const auto& msg : messages) { for (const auto& msg : messages) {
stream_body.add_messages( stream_body.add_messages(
CreateReceiveMessagesResponse(msg).SerializeAsString()); CreateReceiveMessagesResponse(msg).SerializeAsString());
...@@ -37,6 +49,8 @@ class StreamParserTest : public testing::Test { ...@@ -37,6 +49,8 @@ class StreamParserTest : public testing::Test {
public: public:
StreamParserTest() StreamParserTest()
: stream_parser_(base::BindRepeating(&StreamParserTest::OnMessageReceived, : stream_parser_(base::BindRepeating(&StreamParserTest::OnMessageReceived,
base::Unretained(this)),
base::BindRepeating(&StreamParserTest::OnFastPathReady,
base::Unretained(this))) {} base::Unretained(this))) {}
~StreamParserTest() override = default; ~StreamParserTest() override = default;
...@@ -46,11 +60,15 @@ class StreamParserTest : public testing::Test { ...@@ -46,11 +60,15 @@ class StreamParserTest : public testing::Test {
const std::vector<std::string> GetMessages() { return messages_received_; } const std::vector<std::string> GetMessages() { return messages_received_; }
bool fast_path_ready() { return fast_path_ready_; }
private: private:
void OnMessageReceived(const std::string& message) { void OnMessageReceived(const std::string& message) {
messages_received_.push_back(message); messages_received_.push_back(message);
} }
void OnFastPathReady() { fast_path_ready_ = true; }
bool fast_path_ready_ = false;
StreamParser stream_parser_; StreamParser stream_parser_;
std::vector<std::string> messages_received_; std::vector<std::string> messages_received_;
}; };
...@@ -121,3 +139,15 @@ TEST_F(StreamParserTest, MultipleMessagesSplit) { ...@@ -121,3 +139,15 @@ TEST_F(StreamParserTest, MultipleMessagesSplit) {
EXPECT_EQ(5, MessagesReceived()); EXPECT_EQ(5, MessagesReceived());
EXPECT_EQ(messages_1, GetMessages()); EXPECT_EQ(messages_1, GetMessages());
} }
// The fast path message triggers callback.
TEST_F(StreamParserTest, FastPathReadyTriggersCallback) {
std::vector<std::string> messages = {"random 42"};
chrome_browser_nearby_sharing_instantmessaging::StreamBody stream_body =
BuildProto(messages, true);
GetStreamParser().Append(stream_body.SerializeAsString());
EXPECT_TRUE(fast_path_ready());
EXPECT_EQ(1, MessagesReceived());
EXPECT_EQ(messages, GetMessages());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment