Commit 54a4f733 authored by Yuwei Huang's avatar Yuwei Huang Committed by Commit Bot

[Remoting] Implement GrpcAsyncDispatcher

This CL implements a helper class that adapts gRPC's async unary call
paradigm (i.e. the completion queue) into Chromium's callback-based
paradigm.

Bug: 927962
Change-Id: I15c58748a0d024dc5f6953b0312e629fcf8c24b5
Reviewed-on: https://chromium-review.googlesource.com/c/1484736
Commit-Queue: Yuwei Huang <yuweih@chromium.org>
Reviewed-by: default avatarJoe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#635816}
parent 632b3109
...@@ -10,6 +10,10 @@ static_library("signaling") { ...@@ -10,6 +10,10 @@ static_library("signaling") {
"delegating_signal_strategy.h", "delegating_signal_strategy.h",
"ftl_client.cc", "ftl_client.cc",
"ftl_client.h", "ftl_client.h",
"grpc_async_call_data.cc",
"grpc_async_call_data.h",
"grpc_async_dispatcher.cc",
"grpc_async_dispatcher.h",
"iq_sender.cc", "iq_sender.cc",
"iq_sender.h", "iq_sender.h",
"jid_util.cc", "jid_util.cc",
...@@ -51,17 +55,25 @@ static_library("signaling") { ...@@ -51,17 +55,25 @@ static_library("signaling") {
"//jingle:jingle_glue", "//jingle:jingle_glue",
"//net", "//net",
"//remoting/base", "//remoting/base",
"//third_party/grpc:grpcpp",
] ]
if (is_nacl) { if (is_nacl) {
sources -= [ sources -= [
"ftl_client.cc", "ftl_client.cc",
"ftl_client.h", "ftl_client.h",
"grpc_async_call_data.cc",
"grpc_async_call_data.h",
"grpc_async_dispatcher.cc",
"grpc_async_dispatcher.h",
"log_to_server.cc", "log_to_server.cc",
"server_log_entry.cc", "server_log_entry.cc",
"xmpp_signal_strategy.cc", "xmpp_signal_strategy.cc",
] ]
deps -= [ "//google_apis" ] deps -= [
"//google_apis",
"//third_party/grpc:grpcpp",
]
public_deps -= [ ":ftl_grpc_library" ] public_deps -= [ ":ftl_grpc_library" ]
} }
} }
...@@ -93,6 +105,7 @@ source_set("unit_tests") { ...@@ -93,6 +105,7 @@ source_set("unit_tests") {
testonly = true testonly = true
sources = [ sources = [
"grpc_async_dispatcher_unittest.cc",
"iq_sender_unittest.cc", "iq_sender_unittest.cc",
"jid_util_unittest.cc", "jid_util_unittest.cc",
"log_to_server_unittest.cc", "log_to_server_unittest.cc",
...@@ -107,8 +120,15 @@ source_set("unit_tests") { ...@@ -107,8 +120,15 @@ source_set("unit_tests") {
deps = [ deps = [
":test_support", ":test_support",
":unit_tests_grpc_library",
"//net:test_support", "//net:test_support",
"//testing/gmock", "//testing/gmock",
"//testing/gtest", "//testing/gtest",
] ]
} }
cc_grpc_library("unit_tests_grpc_library") {
sources = [
"grpc_async_dispatcher_test_services.proto",
]
}
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/signaling/grpc_async_call_data.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/threading/thread_task_runner_handle.h"
#include "third_party/grpc/src/include/grpcpp/client_context.h"
namespace remoting {
GrpcAsyncCallDataBase::GrpcAsyncCallDataBase(
std::unique_ptr<grpc::ClientContext> context) {
context_ = std::move(context);
caller_task_runner_ = base::ThreadTaskRunnerHandle::Get();
}
GrpcAsyncCallDataBase::~GrpcAsyncCallDataBase() = default;
void GrpcAsyncCallDataBase::RunCallbackAndSelfDestroyOnDone() {
caller_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&GrpcAsyncCallDataBase::RunCallbackOnCallerThread,
base::Owned(this)));
}
void GrpcAsyncCallDataBase::CancelRequest() {
context_->TryCancel();
}
} // namespace remoting
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_SIGNALING_GRPC_ASYNC_CALL_DATA_H_
#define REMOTING_SIGNALING_GRPC_ASYNC_CALL_DATA_H_
#include <memory>
#include <utility>
#include "base/callback_forward.h"
#include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/single_thread_task_runner.h"
#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
#include "third_party/grpc/src/include/grpcpp/support/status.h"
namespace grpc {
class ClientContext;
} // namespace grpc
namespace remoting {
// The GrpcAsyncCallData base class that holds logic invariant to the response
// type.
class GrpcAsyncCallDataBase {
public:
explicit GrpcAsyncCallDataBase(std::unique_ptr<grpc::ClientContext> context);
virtual ~GrpcAsyncCallDataBase();
void RunCallbackAndSelfDestroyOnDone();
void CancelRequest();
virtual void RegisterAndMoveOwnershipToCompletionQueue() = 0;
virtual void RunCallbackOnCallerThread() = 0;
protected:
grpc::Status status_{grpc::StatusCode::UNKNOWN, "Uninitialized"};
private:
std::unique_ptr<grpc::ClientContext> context_;
scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
DISALLOW_COPY_AND_ASSIGN(GrpcAsyncCallDataBase);
};
template <typename ResponseType>
class GrpcAsyncCallData : public GrpcAsyncCallDataBase {
public:
using RpcCallback =
base::OnceCallback<void(grpc::Status, const ResponseType&)>;
GrpcAsyncCallData(
std::unique_ptr<grpc::ClientContext> context,
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
response_reader,
RpcCallback callback)
: GrpcAsyncCallDataBase(std::move(context)) {
response_reader_ = std::move(response_reader);
callback_ = std::move(callback);
}
~GrpcAsyncCallData() override = default;
void RegisterAndMoveOwnershipToCompletionQueue() override {
response_reader_->Finish(&response_, &status_, /* event_tag */ this);
}
void RunCallbackOnCallerThread() override {
std::move(callback_).Run(status_, response_);
}
private:
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
response_reader_;
ResponseType response_;
RpcCallback callback_;
DISALLOW_COPY_AND_ASSIGN(GrpcAsyncCallData);
};
} // namespace remoting
#endif // REMOTING_SIGNALING_GRPC_ASYNC_CALL_DATA_H_
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/signaling/grpc_async_dispatcher.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/threading/thread_task_runner_handle.h"
namespace remoting {
GrpcAsyncDispatcher::GrpcAsyncDispatcher() {
dispatcher_thread_.Start();
dispatcher_thread_.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&GrpcAsyncDispatcher::RunQueueOnDispatcherThread,
base::Unretained(this)));
}
GrpcAsyncDispatcher::~GrpcAsyncDispatcher() {
completion_queue_.Shutdown();
{
base::AutoLock autolock(pending_rpcs_lock_);
VLOG(0) << "# of pending RPCs at destruction: " << pending_rpcs_.size();
for (auto* pending_rpc : pending_rpcs_) {
pending_rpc->CancelRequest();
}
}
dispatcher_thread_.Stop();
DCHECK_EQ(0u, pending_rpcs_.size());
}
void GrpcAsyncDispatcher::RunQueueOnDispatcherThread() {
void* event_tag;
bool operation_succeeded = false;
// completion_queue_.Next() blocks until a response is received.
while (completion_queue_.Next(&event_tag, &operation_succeeded)) {
// |operation_succeeded| is always true for client-side finish event.
DCHECK(operation_succeeded);
VLOG(0) << "Dequeuing RPC: " << event_tag;
GrpcAsyncCallDataBase* rpc_data =
reinterpret_cast<GrpcAsyncCallDataBase*>(event_tag);
{
base::AutoLock autolock(pending_rpcs_lock_);
DCHECK(pending_rpcs_.find(rpc_data) != pending_rpcs_.end());
pending_rpcs_.erase(rpc_data);
}
rpc_data->RunCallbackAndSelfDestroyOnDone();
}
}
void GrpcAsyncDispatcher::RegisterRpcData(
std::unique_ptr<GrpcAsyncCallDataBase> rpc_data) {
{
base::AutoLock autolock(pending_rpcs_lock_);
DCHECK(pending_rpcs_.find(rpc_data.get()) == pending_rpcs_.end());
pending_rpcs_.insert(rpc_data.get());
}
VLOG(0) << "Enqueuing RPC: " << rpc_data.get();
rpc_data.release()->RegisterAndMoveOwnershipToCompletionQueue();
}
} // namespace remoting
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_SIGNALING_GRPC_ASYNC_DISPATCHER_H_
#define REMOTING_SIGNALING_GRPC_ASYNC_DISPATCHER_H_
#include <memory>
#include <utility>
#include "base/callback_forward.h"
#include "base/containers/flat_set.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread.h"
#include "remoting/signaling/grpc_async_call_data.h"
#include "third_party/grpc/src/include/grpcpp/completion_queue.h"
#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
namespace remoting {
// This class helps adopting the gRPC async completion queue handling logic into
// Chromium's callback paradigm.
//
// Basic usage looks like this:
//
// class MyClass {
// public:
// MyClass() : weak_factory_(this) {}
// ~MyClass() {}
//
// void SayHello() {
// HelloRequest request;
// dispatcher_->ExecuteAsyncRpc(
// // This is run immediately inside the call stack of
// // |ExecuteAsyncRpc|.
// base::BindOnce(&HelloService::Stub::AsyncSayHello,
// base::Unretained(stub_.get())),
// std::make_unique<grpc::ClientContext>(), request,
// // Callback might be called after the dispatcher is destroyed.
// base::BindOnce(&MyClass::OnHelloResult,
// weak_factory_.GetWeakPtr()));
// }
//
// private:
// void OnHelloResult(grpc::Status status,
// const HelloResponse& response) {
// if (status.error_code() == grpc::StatusCode::CANCELLED) {
// // The request has been canceled because |dispatcher_| is destroyed.
// // If you need to access class members here, make sure to bind a weak
// // pointer in the RpcCallback. Otherwise using base::Unretained() is
// // fine.
// return;
// }
//
// if (!status.ok()) {
// // Handle other error here.
// return;
// }
//
// // Response is received. Use the result here.
// }
//
// std::unique_ptr<HelloService::Stub> stub_;
// GrpcAsyncDispatcher dispatcher_;
// base::WeakPtrFactory<MyClass> weak_factory_;
// };
class GrpcAsyncDispatcher {
public:
template <typename ResponseType>
using RpcCallback =
base::OnceCallback<void(grpc::Status, const ResponseType&)>;
template <typename RequestType, typename ResponseType>
using AsyncRpcFunction = base::OnceCallback<std::unique_ptr<
grpc::ClientAsyncResponseReader<ResponseType>>(grpc::ClientContext*,
const RequestType&,
grpc::CompletionQueue*)>;
GrpcAsyncDispatcher();
~GrpcAsyncDispatcher();
// Immediately executes |rpc_function| inside the call stack of this function,
// and runs |callback| once the server responses. If the dispatcher is
// destroyed before the server responses, |callback| will be called with
// a CANCELLED status *after* the dispatcher is destroyed.
//
// It is safe to bind raw pointer into |rpc_function|, but you might want to
// bind weak pointer in |callback| if you need to access your bound object in
// the cancel case.
template <typename RequestType, typename ResponseType>
void ExecuteAsyncRpc(AsyncRpcFunction<RequestType, ResponseType> rpc_function,
std::unique_ptr<grpc::ClientContext> context,
const RequestType& request,
RpcCallback<ResponseType> callback) {
auto response_reader =
std::move(rpc_function).Run(context.get(), request, &completion_queue_);
auto data = std::make_unique<GrpcAsyncCallData<ResponseType>>(
std::move(context), std::move(response_reader), std::move(callback));
RegisterRpcData(std::move(data));
}
private:
void RunQueueOnDispatcherThread();
void RegisterRpcData(std::unique_ptr<GrpcAsyncCallDataBase> rpc_data);
// We need a dedicated thread because getting response from the completion
// queue will block until any response is received. Note that the RPC call
// itself is still async, meaning any new RPC call when the queue is blocked
// can still be made, and can unblock the queue once the response is ready.
base::Thread dispatcher_thread_{"grpc_async_dispatcher"};
// Note that the gRPC library is thread-safe.
grpc::CompletionQueue completion_queue_;
// Keep the list of pending RPCs so that we can cancel them at destruction.
base::flat_set<GrpcAsyncCallDataBase*> pending_rpcs_;
base::Lock pending_rpcs_lock_;
DISALLOW_COPY_AND_ASSIGN(GrpcAsyncDispatcher);
};
} // namespace remoting
#endif // REMOTING_SIGNALING_GRPC_ASYNC_DISPATCHER_H_
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
package remoting;
message EchoRequest {
string text = 1;
}
message EchoResponse {
string text = 1;
}
service GrpcAsyncDispatcherTestService {
rpc Echo(EchoRequest) returns (EchoResponse) {}
}
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/signaling/grpc_async_dispatcher.h"
#include <memory>
#include <string>
#include <utility>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/test/bind_test_util.h"
#include "remoting/signaling/grpc_async_dispatcher_test_services.grpc.pb.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/grpc/src/include/grpcpp/grpcpp.h"
namespace remoting {
namespace {
class EchoServerImpl {
public:
EchoServerImpl();
~EchoServerImpl();
void Start();
std::shared_ptr<grpc::Channel> CreateInProcessChannel();
void HandleOneRequest();
private:
GrpcAsyncDispatcherTestService::AsyncService async_service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> completion_queue_;
};
EchoServerImpl::EchoServerImpl() = default;
EchoServerImpl::~EchoServerImpl() {
server_->Shutdown();
completion_queue_->Shutdown();
// gRPC requires draining the completion queue before destroying it.
void* tag;
bool ok;
while (completion_queue_->Next(&tag, &ok)) {
}
}
void EchoServerImpl::Start() {
DCHECK(!server_);
grpc::ServerBuilder builder;
builder.RegisterService(&async_service_);
completion_queue_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
}
std::shared_ptr<grpc::Channel> EchoServerImpl::CreateInProcessChannel() {
return server_->InProcessChannel(grpc::ChannelArguments());
}
void EchoServerImpl::HandleOneRequest() {
grpc::ServerContext context;
EchoRequest request;
grpc::ServerAsyncResponseWriter<EchoResponse> responder(&context);
async_service_.RequestEcho(&context, &request, &responder,
completion_queue_.get(), completion_queue_.get(),
(void*)1);
void* tag;
bool ok;
completion_queue_->Next(&tag, &ok);
ASSERT_TRUE(ok);
ASSERT_EQ((void*)1, tag);
EchoResponse response;
response.set_text(request.text());
responder.Finish(response, grpc::Status::OK, (void*)2);
completion_queue_->Next(&tag, &ok);
ASSERT_TRUE(ok);
ASSERT_EQ((void*)2, tag);
}
} // namespace
class GrpcAsyncDispatcherTest : public testing::Test {
public:
void SetUp() override;
void TearDown() override;
protected:
void AsyncSendText(const std::string& text,
GrpcAsyncDispatcher::RpcCallback<EchoResponse> callback);
std::unique_ptr<EchoServerImpl> server_;
protected:
std::unique_ptr<GrpcAsyncDispatcher> dispatcher_;
private:
base::MessageLoop message_loop_;
std::unique_ptr<GrpcAsyncDispatcherTestService::Stub> stub_;
};
void GrpcAsyncDispatcherTest::SetUp() {
dispatcher_ = std::make_unique<GrpcAsyncDispatcher>();
server_ = std::make_unique<EchoServerImpl>();
server_->Start();
stub_ = GrpcAsyncDispatcherTestService::NewStub(
server_->CreateInProcessChannel());
}
void GrpcAsyncDispatcherTest::TearDown() {
server_.reset();
dispatcher_.reset();
stub_.reset();
}
void GrpcAsyncDispatcherTest::AsyncSendText(
const std::string& text,
GrpcAsyncDispatcher::RpcCallback<EchoResponse> callback) {
EchoRequest request;
request.set_text(text);
dispatcher_->ExecuteAsyncRpc(
base::BindOnce(&GrpcAsyncDispatcherTestService::Stub::AsyncEcho,
base::Unretained(stub_.get())),
std::make_unique<grpc::ClientContext>(), request, std::move(callback));
}
TEST_F(GrpcAsyncDispatcherTest, DoNothing) {}
TEST_F(GrpcAsyncDispatcherTest, SendOneTextAndRespond) {
base::RunLoop run_loop;
AsyncSendText("Hello",
base::BindLambdaForTesting(
[&](grpc::Status status, const EchoResponse& response) {
EXPECT_TRUE(status.ok());
EXPECT_EQ("Hello", response.text());
run_loop.Quit();
}));
server_->HandleOneRequest();
run_loop.Run();
}
TEST_F(GrpcAsyncDispatcherTest, SendTwoTextsAndRespondOneByOne) {
base::RunLoop run_loop_1;
AsyncSendText("Hello 1",
base::BindLambdaForTesting(
[&](grpc::Status status, const EchoResponse& response) {
EXPECT_TRUE(status.ok());
EXPECT_EQ("Hello 1", response.text());
run_loop_1.Quit();
}));
server_->HandleOneRequest();
run_loop_1.Run();
base::RunLoop run_loop_2;
AsyncSendText("Hello 2",
base::BindLambdaForTesting(
[&](grpc::Status status, const EchoResponse& response) {
EXPECT_TRUE(status.ok());
EXPECT_EQ("Hello 2", response.text());
run_loop_2.Quit();
}));
server_->HandleOneRequest();
run_loop_2.Run();
}
TEST_F(GrpcAsyncDispatcherTest, SendTwoTextsAndRespondTogether) {
base::RunLoop run_loop;
size_t response_count = 0;
auto on_received_one_response = [&]() {
response_count++;
if (response_count == 2) {
run_loop.Quit();
}
};
AsyncSendText("Hello 1",
base::BindLambdaForTesting(
[&](grpc::Status status, const EchoResponse& response) {
EXPECT_TRUE(status.ok());
EXPECT_EQ("Hello 1", response.text());
on_received_one_response();
}));
AsyncSendText("Hello 2",
base::BindLambdaForTesting(
[&](grpc::Status status, const EchoResponse& response) {
EXPECT_TRUE(status.ok());
EXPECT_EQ("Hello 2", response.text());
on_received_one_response();
}));
server_->HandleOneRequest();
server_->HandleOneRequest();
run_loop.Run();
}
TEST_F(GrpcAsyncDispatcherTest, RpcCanceledOnDestruction) {
base::RunLoop run_loop;
AsyncSendText("Hello",
base::BindLambdaForTesting([&](grpc::Status status,
const EchoResponse& response) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, status.error_code());
run_loop.Quit();
}));
dispatcher_.reset();
run_loop.Run();
}
} // namespace remoting
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment