[Remoting][FTL] Extend GrpcAsyncDispatcher to support server stream RPC
This CL extends GrpcAsyncDispatcher so that it can handle server stream
RPC. It also changes the grpc::Status param in unary call callback to
const ref because grpc::Status contains string field and might be
expensive to copy.
Bug: 927962
Change-Id: Ide85df5fc25933f3e81fd8ed89d043a65d5eefa1
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1504834
Commit-Queue: Yuwei Huang <[email protected]>
Reviewed-by: Joe Downing <[email protected]>
Cr-Commit-Position: refs/heads/master@{#639154}
diff --git a/remoting/signaling/BUILD.gn b/remoting/signaling/BUILD.gn
index 82491176..aff211c 100644
--- a/remoting/signaling/BUILD.gn
+++ b/remoting/signaling/BUILD.gn
@@ -10,10 +10,6 @@
"delegating_signal_strategy.h",
"ftl_client.cc",
"ftl_client.h",
- "grpc_async_call_data.cc",
- "grpc_async_call_data.h",
- "grpc_async_dispatcher.cc",
- "grpc_async_dispatcher.h",
"iq_sender.cc",
"iq_sender.h",
"jid_util.cc",
@@ -49,6 +45,7 @@
]
deps = [
+ ":grpc_async_dispatcher",
"//base",
"//crypto",
"//google_apis",
@@ -62,15 +59,12 @@
sources -= [
"ftl_client.cc",
"ftl_client.h",
- "grpc_async_call_data.cc",
- "grpc_async_call_data.h",
- "grpc_async_dispatcher.cc",
- "grpc_async_dispatcher.h",
"log_to_server.cc",
"server_log_entry.cc",
"xmpp_signal_strategy.cc",
]
deps -= [
+ ":grpc_async_dispatcher",
"//google_apis",
"//third_party/grpc:grpcpp",
]
@@ -78,6 +72,25 @@
}
}
+static_library("grpc_async_dispatcher") {
+ sources = [
+ "grpc_async_call_data.cc",
+ "grpc_async_call_data.h",
+ "grpc_async_dispatcher.cc",
+ "grpc_async_dispatcher.h",
+ "grpc_async_server_streaming_call_data.cc",
+ "grpc_async_server_streaming_call_data.h",
+ "grpc_async_unary_call_data.h",
+ "scoped_grpc_server_stream.cc",
+ "scoped_grpc_server_stream.h",
+ ]
+
+ deps = [
+ "//base",
+ "//third_party/grpc:grpcpp",
+ ]
+}
+
cc_grpc_library("ftl_grpc_library") {
sources = [
"ftl.proto",
diff --git a/remoting/signaling/ftl_client.h b/remoting/signaling/ftl_client.h
index 3d0f02d..edfaf2eb 100644
--- a/remoting/signaling/ftl_client.h
+++ b/remoting/signaling/ftl_client.h
@@ -23,7 +23,7 @@
public:
template <typename ResponseType>
using RpcCallback =
- base::OnceCallback<void(grpc::Status, const ResponseType&)>;
+ base::OnceCallback<void(const grpc::Status&, const ResponseType&)>;
explicit FtlClient(OAuthTokenGetter* token_getter);
~FtlClient();
diff --git a/remoting/signaling/grpc_async_call_data.cc b/remoting/signaling/grpc_async_call_data.cc
index 819a986..c42f67e 100644
--- a/remoting/signaling/grpc_async_call_data.cc
+++ b/remoting/signaling/grpc_async_call_data.cc
@@ -4,30 +4,33 @@
#include "remoting/signaling/grpc_async_call_data.h"
-#include "base/bind.h"
-#include "base/callback.h"
#include "base/threading/thread_task_runner_handle.h"
#include "third_party/grpc/src/include/grpcpp/client_context.h"
namespace remoting {
+namespace internal {
-GrpcAsyncCallDataBase::GrpcAsyncCallDataBase(
+GrpcAsyncCallData::GrpcAsyncCallData(
std::unique_ptr<grpc::ClientContext> context) {
context_ = std::move(context);
caller_task_runner_ = base::ThreadTaskRunnerHandle::Get();
}
-GrpcAsyncCallDataBase::~GrpcAsyncCallDataBase() = default;
+GrpcAsyncCallData::~GrpcAsyncCallData() = default;
-void GrpcAsyncCallDataBase::RunCallbackAndSelfDestroyOnDone() {
- caller_task_runner_->PostTask(
- FROM_HERE,
- base::BindOnce(&GrpcAsyncCallDataBase::RunCallbackOnCallerThread,
- base::Owned(this)));
-}
-
-void GrpcAsyncCallDataBase::CancelRequest() {
+void GrpcAsyncCallData::CancelRequest() {
+ VLOG(0) << "Canceling request: " << this;
context_->TryCancel();
+ OnRequestCanceled();
}
+void GrpcAsyncCallData::DeleteOnCallerThread() {
+ if (caller_task_runner_->BelongsToCurrentThread()) {
+ delete this;
+ return;
+ }
+ caller_task_runner_->DeleteSoon(FROM_HERE, this);
+}
+
+} // namespace internal
} // namespace remoting
diff --git a/remoting/signaling/grpc_async_call_data.h b/remoting/signaling/grpc_async_call_data.h
index c838ba4d..9d3ae4c 100644
--- a/remoting/signaling/grpc_async_call_data.h
+++ b/remoting/signaling/grpc_async_call_data.h
@@ -8,11 +8,9 @@
#include <memory>
#include <utility>
-#include "base/callback_forward.h"
#include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/single_thread_task_runner.h"
-#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
#include "third_party/grpc/src/include/grpcpp/support/status.h"
namespace grpc {
@@ -20,64 +18,52 @@
} // namespace grpc
namespace remoting {
+namespace internal {
// The GrpcAsyncCallData base class that holds logic invariant to the response
// type.
-class GrpcAsyncCallDataBase {
+//
+// The lifetime of GrpcAsyncCallData is bound to the completion queue. A
+// subclass may enqueue itself multiple times into the completion queue, and
+// GrpcAsyncDispatcher will dequeue it and call OnDequeuedOnDispatcherThread()
+// on a background thread when the event is handled. If the subclass won't
+// re-enqueue itself, OnDequeuedOnDispatcherThread() should return false, which
+// will delete the call data by calling DeleteOnCallerThread().
+//
+// Ctor, dtor, and methods except OnDequeuedOnDispatcherThread() will be called
+// from the same thread (caller_task_runner_).
+class GrpcAsyncCallData {
public:
- explicit GrpcAsyncCallDataBase(std::unique_ptr<grpc::ClientContext> context);
- virtual ~GrpcAsyncCallDataBase();
+ explicit GrpcAsyncCallData(std::unique_ptr<grpc::ClientContext> context);
+ virtual ~GrpcAsyncCallData();
- void RunCallbackAndSelfDestroyOnDone();
+ // Force dequeues any pending request.
void CancelRequest();
- virtual void RegisterAndMoveOwnershipToCompletionQueue() = 0;
- virtual void RunCallbackOnCallerThread() = 0;
+ // Can be called from any thread.
+ void DeleteOnCallerThread();
+
+ // Returns true iff the task is not finished and the subclass will
+ // *immediately* enqueue itself back to the completion queue. If this method
+ // returns false, the object will be deleted by calling
+ // DeleteOnCallerThread().
+ // Note that this will be called from an anonymous thread.
+ virtual bool OnDequeuedOnDispatcherThread(bool operation_succeeded) = 0;
protected:
+ // Called after CancelRequest() is called.
+ virtual void OnRequestCanceled() {}
+
+ scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
grpc::Status status_{grpc::StatusCode::UNKNOWN, "Uninitialized"};
private:
std::unique_ptr<grpc::ClientContext> context_;
- scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
-
- DISALLOW_COPY_AND_ASSIGN(GrpcAsyncCallDataBase);
-};
-
-template <typename ResponseType>
-class GrpcAsyncCallData : public GrpcAsyncCallDataBase {
- public:
- using RpcCallback =
- base::OnceCallback<void(grpc::Status, const ResponseType&)>;
-
- GrpcAsyncCallData(
- std::unique_ptr<grpc::ClientContext> context,
- std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
- response_reader,
- RpcCallback callback)
- : GrpcAsyncCallDataBase(std::move(context)) {
- response_reader_ = std::move(response_reader);
- callback_ = std::move(callback);
- }
- ~GrpcAsyncCallData() override = default;
-
- void RegisterAndMoveOwnershipToCompletionQueue() override {
- response_reader_->Finish(&response_, &status_, /* event_tag */ this);
- }
-
- void RunCallbackOnCallerThread() override {
- std::move(callback_).Run(status_, response_);
- }
-
- private:
- std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
- response_reader_;
- ResponseType response_;
- RpcCallback callback_;
DISALLOW_COPY_AND_ASSIGN(GrpcAsyncCallData);
};
+} // namespace internal
} // namespace remoting
#endif // REMOTING_SIGNALING_GRPC_ASYNC_CALL_DATA_H_
diff --git a/remoting/signaling/grpc_async_dispatcher.cc b/remoting/signaling/grpc_async_dispatcher.cc
index c1c34716..e26f72f 100644
--- a/remoting/signaling/grpc_async_dispatcher.cc
+++ b/remoting/signaling/grpc_async_dispatcher.cc
@@ -19,7 +19,6 @@
}
GrpcAsyncDispatcher::~GrpcAsyncDispatcher() {
- completion_queue_.Shutdown();
{
base::AutoLock autolock(pending_rpcs_lock_);
VLOG(0) << "# of pending RPCs at destruction: " << pending_rpcs_.size();
@@ -27,6 +26,7 @@
pending_rpc->CancelRequest();
}
}
+ completion_queue_.Shutdown();
dispatcher_thread_.Stop();
DCHECK_EQ(0u, pending_rpcs_.size());
}
@@ -37,29 +37,30 @@
// completion_queue_.Next() blocks until a response is received.
while (completion_queue_.Next(&event_tag, &operation_succeeded)) {
- // |operation_succeeded| is always true for client-side finish event.
- DCHECK(operation_succeeded);
- VLOG(0) << "Dequeuing RPC: " << event_tag;
- GrpcAsyncCallDataBase* rpc_data =
- reinterpret_cast<GrpcAsyncCallDataBase*>(event_tag);
+ internal::GrpcAsyncCallData* rpc_data =
+ reinterpret_cast<internal::GrpcAsyncCallData*>(event_tag);
{
base::AutoLock autolock(pending_rpcs_lock_);
- DCHECK(pending_rpcs_.find(rpc_data) != pending_rpcs_.end());
- pending_rpcs_.erase(rpc_data);
+ if (!rpc_data->OnDequeuedOnDispatcherThread(operation_succeeded)) {
+ VLOG(0) << "Dequeuing RPC: " << event_tag;
+ DCHECK(pending_rpcs_.find(rpc_data) != pending_rpcs_.end());
+ pending_rpcs_.erase(rpc_data);
+ rpc_data->DeleteOnCallerThread();
+ } else {
+ VLOG(0) << "Re-enqueuing RPC: " << event_tag;
+ }
}
- rpc_data->RunCallbackAndSelfDestroyOnDone();
}
}
void GrpcAsyncDispatcher::RegisterRpcData(
- std::unique_ptr<GrpcAsyncCallDataBase> rpc_data) {
- {
- base::AutoLock autolock(pending_rpcs_lock_);
- DCHECK(pending_rpcs_.find(rpc_data.get()) == pending_rpcs_.end());
- pending_rpcs_.insert(rpc_data.get());
- }
+ std::unique_ptr<internal::GrpcAsyncCallData> rpc_data) {
VLOG(0) << "Enqueuing RPC: " << rpc_data.get();
- rpc_data.release()->RegisterAndMoveOwnershipToCompletionQueue();
+ base::AutoLock autolock(pending_rpcs_lock_);
+ DCHECK(pending_rpcs_.find(rpc_data.get()) == pending_rpcs_.end());
+ pending_rpcs_.insert(rpc_data.get());
+ // Ownership of |rpc_data| is transferred to the completion queue.
+ rpc_data.release();
}
} // namespace remoting
diff --git a/remoting/signaling/grpc_async_dispatcher.h b/remoting/signaling/grpc_async_dispatcher.h
index 49a48b03..d54a675 100644
--- a/remoting/signaling/grpc_async_dispatcher.h
+++ b/remoting/signaling/grpc_async_dispatcher.h
@@ -12,64 +12,27 @@
#include "base/containers/flat_set.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
+#include "base/thread_annotations.h"
#include "base/threading/thread.h"
#include "remoting/signaling/grpc_async_call_data.h"
+#include "remoting/signaling/grpc_async_server_streaming_call_data.h"
+#include "remoting/signaling/grpc_async_unary_call_data.h"
+#include "remoting/signaling/scoped_grpc_server_stream.h"
#include "third_party/grpc/src/include/grpcpp/completion_queue.h"
#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
namespace remoting {
-// This class helps adopting the gRPC async completion queue handling logic into
-// Chromium's callback paradigm.
-//
-// Basic usage looks like this:
-//
-// class MyClass {
-// public:
-// MyClass() : weak_factory_(this) {}
-// ~MyClass() {}
-//
-// void SayHello() {
-// HelloRequest request;
-// dispatcher_->ExecuteAsyncRpc(
-// // This is run immediately inside the call stack of
-// // |ExecuteAsyncRpc|.
-// base::BindOnce(&HelloService::Stub::AsyncSayHello,
-// base::Unretained(stub_.get())),
-// std::make_unique<grpc::ClientContext>(), request,
-// // Callback might be called after the dispatcher is destroyed.
-// base::BindOnce(&MyClass::OnHelloResult,
-// weak_factory_.GetWeakPtr()));
-// }
-//
-// private:
-// void OnHelloResult(grpc::Status status,
-// const HelloResponse& response) {
-// if (status.error_code() == grpc::StatusCode::CANCELLED) {
-// // The request has been canceled because |dispatcher_| is destroyed.
-// // If you need to access class members here, make sure to bind a weak
-// // pointer in the RpcCallback. Otherwise using base::Unretained() is
-// // fine.
-// return;
-// }
-//
-// if (!status.ok()) {
-// // Handle other error here.
-// return;
-// }
-//
-// // Response is received. Use the result here.
-// }
-//
-// std::unique_ptr<HelloService::Stub> stub_;
-// GrpcAsyncDispatcher dispatcher_;
-// base::WeakPtrFactory<MyClass> weak_factory_;
-// };
+// This class helps adapting the gRPC async completion queue handling logic into
+// Chromium's callback paradigm. See using_grpc_async_dispatcher.md for detailed
+// usage.
+// TODO(yuweih): Move GrpcAsyncDispatcher and other helper classes into a
+// subdirectory.
class GrpcAsyncDispatcher {
public:
template <typename ResponseType>
using RpcCallback =
- base::OnceCallback<void(grpc::Status, const ResponseType&)>;
+ base::OnceCallback<void(const grpc::Status&, const ResponseType&)>;
template <typename RequestType, typename ResponseType>
using AsyncRpcFunction = base::OnceCallback<std::unique_ptr<
@@ -77,6 +40,20 @@
const RequestType&,
grpc::CompletionQueue*)>;
+ template <typename ResponseType>
+ using RpcStreamCallback = base::RepeatingCallback<void(const ResponseType&)>;
+
+ using RpcChannelClosedCallback =
+ base::OnceCallback<void(const grpc::Status&)>;
+
+ template <typename RequestType, typename ResponseType>
+ using AsyncServerStreamingRpcFunction =
+ base::OnceCallback<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
+ grpc::ClientContext*,
+ const RequestType&,
+ grpc::CompletionQueue*,
+ void*)>;
+
GrpcAsyncDispatcher();
~GrpcAsyncDispatcher();
@@ -85,9 +62,8 @@
// destroyed before the server responses, |callback| will be called with
// a CANCELLED status *after* the dispatcher is destroyed.
//
- // It is safe to bind raw pointer into |rpc_function|, but you might want to
- // bind weak pointer in |callback| if you need to access your bound object in
- // the cancel case.
+ // It is safe to bind raw pointer into |rpc_function|, but you should bind
+ // weak pointer in |callback|.
template <typename RequestType, typename ResponseType>
void ExecuteAsyncRpc(AsyncRpcFunction<RequestType, ResponseType> rpc_function,
std::unique_ptr<grpc::ClientContext> context,
@@ -95,14 +71,49 @@
RpcCallback<ResponseType> callback) {
auto response_reader =
std::move(rpc_function).Run(context.get(), request, &completion_queue_);
- auto data = std::make_unique<GrpcAsyncCallData<ResponseType>>(
- std::move(context), std::move(response_reader), std::move(callback));
+ auto data =
+ std::make_unique<internal::GrpcAsyncUnaryCallData<ResponseType>>(
+ std::move(context), std::move(response_reader),
+ std::move(callback));
RegisterRpcData(std::move(data));
}
+ // Starts a server streaming RPC. Delete or reset the returned stream holder
+ // if you need to stop the stream.
+ //
+ // |on_incoming_msg| is called whenever a new message is received from the
+ // server.
+ // |on_channel_closed| will be called with CANCELLED if the stream is stopped
+ // by the client, either by deleting the stream holder or deleting the
+ // dispatcher.
+ //
+ // |rpc_function| is called immediately inside the call stack, while
+ // |on_incoming_msg| and |on_channel_closed| might be called after the
+ // dispatcher is deleted.
+ template <typename RequestType, typename ResponseType>
+ std::unique_ptr<ScopedGrpcServerStream> ExecuteAsyncServerStreamingRpc(
+ AsyncServerStreamingRpcFunction<RequestType, ResponseType> rpc_function,
+ std::unique_ptr<grpc::ClientContext> context,
+ const RequestType& request,
+ const RpcStreamCallback<ResponseType>& on_incoming_msg,
+ RpcChannelClosedCallback on_channel_closed) WARN_UNUSED_RESULT {
+ grpc::ClientContext* raw_context = context.get();
+ auto data = std::make_unique<
+ internal::GrpcAsyncServerStreamingCallData<ResponseType>>(
+ std::move(context), on_incoming_msg, std::move(on_channel_closed));
+ auto reader =
+ std::move(rpc_function)
+ .Run(raw_context, request, &completion_queue_, data.get());
+ data->Initialize(std::move(reader));
+ std::unique_ptr<ScopedGrpcServerStream> stream_holder =
+ data->CreateStreamHolder();
+ RegisterRpcData(std::move(data));
+ return stream_holder;
+ }
+
private:
void RunQueueOnDispatcherThread();
- void RegisterRpcData(std::unique_ptr<GrpcAsyncCallDataBase> rpc_data);
+ void RegisterRpcData(std::unique_ptr<internal::GrpcAsyncCallData> rpc_data);
// We need a dedicated thread because getting response from the completion
// queue will block until any response is received. Note that the RPC call
@@ -114,7 +125,8 @@
grpc::CompletionQueue completion_queue_;
// Keep the list of pending RPCs so that we can cancel them at destruction.
- base::flat_set<GrpcAsyncCallDataBase*> pending_rpcs_;
+ base::flat_set<internal::GrpcAsyncCallData*> pending_rpcs_
+ GUARDED_BY(pending_rpcs_lock_);
base::Lock pending_rpcs_lock_;
DISALLOW_COPY_AND_ASSIGN(GrpcAsyncDispatcher);
diff --git a/remoting/signaling/grpc_async_dispatcher_test_services.proto b/remoting/signaling/grpc_async_dispatcher_test_services.proto
index de78309..5066aca 100644
--- a/remoting/signaling/grpc_async_dispatcher_test_services.proto
+++ b/remoting/signaling/grpc_async_dispatcher_test_services.proto
@@ -18,4 +18,5 @@
service GrpcAsyncDispatcherTestService {
rpc Echo(EchoRequest) returns (EchoResponse) {}
+ rpc StreamEcho(EchoRequest) returns (stream EchoResponse) {}
}
diff --git a/remoting/signaling/grpc_async_dispatcher_unittest.cc b/remoting/signaling/grpc_async_dispatcher_unittest.cc
index 96a0791..db175cd 100644
--- a/remoting/signaling/grpc_async_dispatcher_unittest.cc
+++ b/remoting/signaling/grpc_async_dispatcher_unittest.cc
@@ -9,9 +9,11 @@
#include <utility>
#include "base/bind.h"
+#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/test/bind_test_util.h"
+#include "base/threading/thread_task_runner_handle.h"
#include "remoting/signaling/grpc_async_dispatcher_test_services.grpc.pb.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/grpc/src/include/grpcpp/grpcpp.h"
@@ -20,6 +22,88 @@
namespace {
+void* TagForInt(int num) {
+ return reinterpret_cast<void*>(num);
+}
+
+base::RepeatingCallback<void(const EchoResponse&)>
+NotReachedStreamingCallback() {
+ return base::BindRepeating([](const EchoResponse&) { NOTREACHED(); });
+}
+
+GrpcAsyncDispatcher::RpcChannelClosedCallback
+CheckStatusThenQuitRunLoopCallback(grpc::StatusCode expected_status_code,
+ base::RunLoop* run_loop) {
+ return base::BindLambdaForTesting([=](const grpc::Status& status) {
+ ASSERT_EQ(expected_status_code, status.error_code());
+ run_loop->QuitWhenIdle();
+ });
+}
+
+class EchoStream {
+ public:
+ EchoStream(std::unique_ptr<grpc::ServerContext> context,
+ grpc::ServerCompletionQueue* completion_queue,
+ std::unique_ptr<grpc::ServerAsyncWriter<EchoResponse>> writer);
+ ~EchoStream();
+
+ // SendEcho() must be followed by a call to OnClientReceivedEcho().
+ void SendEcho(const std::string& text);
+ void OnClientReceivedEcho();
+ void Close(const grpc::Status& status);
+
+ private:
+ std::unique_ptr<grpc::ServerContext> context_;
+ grpc::ServerCompletionQueue* completion_queue_;
+ std::unique_ptr<grpc::ServerAsyncWriter<EchoResponse>> writer_;
+ bool closed_ = false;
+};
+
+EchoStream::EchoStream(
+ std::unique_ptr<grpc::ServerContext> context,
+ grpc::ServerCompletionQueue* completion_queue,
+ std::unique_ptr<grpc::ServerAsyncWriter<EchoResponse>> writer)
+ : context_(std::move(context)),
+ completion_queue_(completion_queue),
+ writer_(std::move(writer)) {}
+
+EchoStream::~EchoStream() {
+ Close(grpc::Status::OK);
+}
+
+void EchoStream::SendEcho(const std::string& text) {
+ EchoResponse response;
+ response.set_text(text);
+ writer_->Write(response, this);
+}
+
+void EchoStream::OnClientReceivedEcho() {
+ void* tag;
+ bool ok;
+ completion_queue_->Next(&tag, &ok);
+ ASSERT_TRUE(ok);
+ ASSERT_EQ(this, tag);
+}
+
+void EchoStream::Close(const grpc::Status& status) {
+ if (closed_) {
+ return;
+ }
+
+ writer_->Finish(status, this);
+
+ void* tag;
+ bool ok;
+ completion_queue_->Next(&tag, &ok);
+ if (!ok) {
+ LOG(WARNING) << "Failed to finish stream. Connection might be dropped.";
+ }
+ ASSERT_EQ(this, tag);
+ closed_ = true;
+}
+
+// EchoStream
+
class EchoServerImpl {
public:
EchoServerImpl();
@@ -27,7 +111,9 @@
void Start();
std::shared_ptr<grpc::Channel> CreateInProcessChannel();
- void HandleOneRequest();
+ void HandleOneEchoRequest();
+ std::unique_ptr<EchoStream> AcceptEchoStream(
+ const std::string& expected_request_text);
private:
GrpcAsyncDispatcherTestService::AsyncService async_service_;
@@ -60,28 +146,49 @@
return server_->InProcessChannel(grpc::ChannelArguments());
}
-void EchoServerImpl::HandleOneRequest() {
+void EchoServerImpl::HandleOneEchoRequest() {
grpc::ServerContext context;
EchoRequest request;
grpc::ServerAsyncResponseWriter<EchoResponse> responder(&context);
async_service_.RequestEcho(&context, &request, &responder,
completion_queue_.get(), completion_queue_.get(),
- (void*)1);
+ TagForInt(1));
void* tag;
bool ok;
completion_queue_->Next(&tag, &ok);
ASSERT_TRUE(ok);
- ASSERT_EQ((void*)1, tag);
+ ASSERT_EQ(TagForInt(1), tag);
EchoResponse response;
response.set_text(request.text());
- responder.Finish(response, grpc::Status::OK, (void*)2);
+ responder.Finish(response, grpc::Status::OK, TagForInt(2));
completion_queue_->Next(&tag, &ok);
ASSERT_TRUE(ok);
- ASSERT_EQ((void*)2, tag);
+ ASSERT_EQ(TagForInt(2), tag);
+}
+
+std::unique_ptr<EchoStream> EchoServerImpl::AcceptEchoStream(
+ const std::string& expected_request_text) {
+ auto context = std::make_unique<grpc::ServerContext>();
+ EchoRequest request;
+ auto writer =
+ std::make_unique<grpc::ServerAsyncWriter<EchoResponse>>(context.get());
+ async_service_.RequestStreamEcho(context.get(), &request, writer.get(),
+ completion_queue_.get(),
+ completion_queue_.get(), TagForInt(3));
+
+ void* tag;
+ bool ok;
+ completion_queue_->Next(&tag, &ok);
+ EXPECT_TRUE(ok);
+ EXPECT_EQ(TagForInt(3), tag);
+ EXPECT_EQ(expected_request_text, request.text());
+
+ return std::make_unique<EchoStream>(
+ std::move(context), completion_queue_.get(), std::move(writer));
}
} // namespace
@@ -94,6 +201,11 @@
protected:
void AsyncSendText(const std::string& text,
GrpcAsyncDispatcher::RpcCallback<EchoResponse> callback);
+ std::unique_ptr<ScopedGrpcServerStream> StartEchoStream(
+ const std::string& request_text,
+ const GrpcAsyncDispatcher::RpcStreamCallback<EchoResponse>
+ on_incoming_msg,
+ GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed);
std::unique_ptr<EchoServerImpl> server_;
@@ -130,42 +242,56 @@
std::make_unique<grpc::ClientContext>(), request, std::move(callback));
}
+std::unique_ptr<ScopedGrpcServerStream>
+GrpcAsyncDispatcherTest::StartEchoStream(
+ const std::string& request_text,
+ const GrpcAsyncDispatcher::RpcStreamCallback<EchoResponse> on_incoming_msg,
+ GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed) {
+ EchoRequest request;
+ request.set_text(request_text);
+ return dispatcher_->ExecuteAsyncServerStreamingRpc(
+ base::BindOnce(&GrpcAsyncDispatcherTestService::Stub::AsyncStreamEcho,
+ base::Unretained(stub_.get())),
+ std::make_unique<grpc::ClientContext>(), request, on_incoming_msg,
+ std::move(on_channel_closed));
+}
+
TEST_F(GrpcAsyncDispatcherTest, DoNothing) {}
TEST_F(GrpcAsyncDispatcherTest, SendOneTextAndRespond) {
base::RunLoop run_loop;
AsyncSendText("Hello",
- base::BindLambdaForTesting(
- [&](grpc::Status status, const EchoResponse& response) {
- EXPECT_TRUE(status.ok());
- EXPECT_EQ("Hello", response.text());
- run_loop.Quit();
- }));
- server_->HandleOneRequest();
+ base::BindLambdaForTesting([&](const grpc::Status& status,
+ const EchoResponse& response) {
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ("Hello", response.text());
+ run_loop.QuitWhenIdle();
+ }));
+ server_->HandleOneEchoRequest();
run_loop.Run();
}
TEST_F(GrpcAsyncDispatcherTest, SendTwoTextsAndRespondOneByOne) {
base::RunLoop run_loop_1;
AsyncSendText("Hello 1",
- base::BindLambdaForTesting(
- [&](grpc::Status status, const EchoResponse& response) {
- EXPECT_TRUE(status.ok());
- EXPECT_EQ("Hello 1", response.text());
- run_loop_1.Quit();
- }));
- server_->HandleOneRequest();
+ base::BindLambdaForTesting([&](const grpc::Status& status,
+ const EchoResponse& response) {
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ("Hello 1", response.text());
+ run_loop_1.QuitWhenIdle();
+ }));
+ server_->HandleOneEchoRequest();
run_loop_1.Run();
base::RunLoop run_loop_2;
AsyncSendText("Hello 2",
- base::BindLambdaForTesting(
- [&](grpc::Status status, const EchoResponse& response) {
- EXPECT_TRUE(status.ok());
- EXPECT_EQ("Hello 2", response.text());
- run_loop_2.Quit();
- }));
- server_->HandleOneRequest();
+ base::BindLambdaForTesting([&](const grpc::Status& status,
+ const EchoResponse& response) {
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ("Hello 2", response.text());
+ run_loop_2.QuitWhenIdle();
+ }));
+ server_->HandleOneEchoRequest();
run_loop_2.Run();
}
@@ -175,38 +301,161 @@
auto on_received_one_response = [&]() {
response_count++;
if (response_count == 2) {
- run_loop.Quit();
+ run_loop.QuitWhenIdle();
}
};
AsyncSendText("Hello 1",
- base::BindLambdaForTesting(
- [&](grpc::Status status, const EchoResponse& response) {
- EXPECT_TRUE(status.ok());
- EXPECT_EQ("Hello 1", response.text());
- on_received_one_response();
- }));
+ base::BindLambdaForTesting([&](const grpc::Status& status,
+ const EchoResponse& response) {
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ("Hello 1", response.text());
+ on_received_one_response();
+ }));
AsyncSendText("Hello 2",
- base::BindLambdaForTesting(
- [&](grpc::Status status, const EchoResponse& response) {
- EXPECT_TRUE(status.ok());
- EXPECT_EQ("Hello 2", response.text());
- on_received_one_response();
- }));
- server_->HandleOneRequest();
- server_->HandleOneRequest();
+ base::BindLambdaForTesting([&](const grpc::Status& status,
+ const EchoResponse& response) {
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ("Hello 2", response.text());
+ on_received_one_response();
+ }));
+ server_->HandleOneEchoRequest();
+ server_->HandleOneEchoRequest();
run_loop.Run();
}
TEST_F(GrpcAsyncDispatcherTest, RpcCanceledOnDestruction) {
base::RunLoop run_loop;
AsyncSendText("Hello",
- base::BindLambdaForTesting([&](grpc::Status status,
+ base::BindLambdaForTesting([&](const grpc::Status& status,
const EchoResponse& response) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, status.error_code());
- run_loop.Quit();
+ run_loop.QuitWhenIdle();
}));
dispatcher_.reset();
run_loop.Run();
}
+TEST_F(GrpcAsyncDispatcherTest, ServerStreamNotAcceptedByServer) {
+ base::RunLoop run_loop;
+ auto scoped_stream =
+ StartEchoStream("Hello", NotReachedStreamingCallback(),
+ CheckStatusThenQuitRunLoopCallback(
+ grpc::StatusCode::CANCELLED, &run_loop));
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::BindLambdaForTesting([&]() { dispatcher_.reset(); }));
+ run_loop.Run();
+}
+
+TEST_F(GrpcAsyncDispatcherTest, ServerStreamImmediatelyClosedByServer) {
+ base::RunLoop run_loop;
+ auto scoped_stream = StartEchoStream(
+ "Hello", NotReachedStreamingCallback(),
+ CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::OK, &run_loop));
+ std::unique_ptr<EchoStream> stream = server_->AcceptEchoStream("Hello");
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::BindLambdaForTesting([&]() { stream.reset(); }));
+ run_loop.Run();
+}
+
+TEST_F(GrpcAsyncDispatcherTest,
+ ServerStreamImmediatelyClosedByServerWithError) {
+ base::RunLoop run_loop;
+ auto scoped_stream =
+ StartEchoStream("Hello", NotReachedStreamingCallback(),
+ CheckStatusThenQuitRunLoopCallback(
+ grpc::StatusCode::UNAUTHENTICATED, &run_loop));
+ std::unique_ptr<EchoStream> stream = server_->AcceptEchoStream("Hello");
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::BindLambdaForTesting([&]() {
+ stream->Close(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
+ }));
+ run_loop.Run();
+}
+
+TEST_F(GrpcAsyncDispatcherTest, ServerStreamsOneMessageThenClosedByServer) {
+ base::RunLoop run_loop;
+ std::unique_ptr<EchoStream> stream;
+ auto scoped_stream = StartEchoStream(
+ "Hello", base::BindLambdaForTesting([&](const EchoResponse& response) {
+ ASSERT_EQ("Echo 1", response.text());
+ stream->OnClientReceivedEcho();
+ stream.reset();
+ }),
+ CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::OK, &run_loop));
+ stream = server_->AcceptEchoStream("Hello");
+ stream->SendEcho("Echo 1");
+ run_loop.Run();
+}
+
+TEST_F(GrpcAsyncDispatcherTest, ServerStreamsTwoMessagesThenClosedByServer) {
+ base::RunLoop run_loop;
+ std::unique_ptr<EchoStream> stream;
+ int received_messages_count = 0;
+ auto scoped_stream = StartEchoStream(
+ "Hello", base::BindLambdaForTesting([&](const EchoResponse& response) {
+ if (received_messages_count == 0) {
+ ASSERT_EQ("Echo 1", response.text());
+ stream->OnClientReceivedEcho();
+ stream->SendEcho("Echo 2");
+ received_messages_count++;
+ return;
+ }
+ if (received_messages_count == 1) {
+ ASSERT_EQ("Echo 2", response.text());
+ stream->OnClientReceivedEcho();
+ stream.reset();
+ received_messages_count++;
+ return;
+ }
+ NOTREACHED();
+ }),
+ CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::OK, &run_loop));
+ stream = server_->AcceptEchoStream("Hello");
+ stream->SendEcho("Echo 1");
+ run_loop.Run();
+ ASSERT_EQ(2, received_messages_count);
+}
+
+TEST_F(GrpcAsyncDispatcherTest,
+ ServerStreamOpenThenClosedByClientAtDestruction) {
+ base::RunLoop run_loop;
+ auto scoped_stream =
+ StartEchoStream("Hello", NotReachedStreamingCallback(),
+ CheckStatusThenQuitRunLoopCallback(
+ grpc::StatusCode::CANCELLED, &run_loop));
+ std::unique_ptr<EchoStream> stream = server_->AcceptEchoStream("Hello");
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::BindLambdaForTesting([&]() { dispatcher_.reset(); }));
+ run_loop.Run();
+}
+
+TEST_F(GrpcAsyncDispatcherTest, ServerStreamClosedByStreamHolder) {
+ base::RunLoop run_loop;
+ std::unique_ptr<EchoStream> stream;
+ std::unique_ptr<ScopedGrpcServerStream> scoped_stream =
+ StartEchoStream("Hello", NotReachedStreamingCallback(),
+ CheckStatusThenQuitRunLoopCallback(
+ grpc::StatusCode::CANCELLED, &run_loop));
+ stream = server_->AcceptEchoStream("Hello");
+ scoped_stream.reset();
+ run_loop.Run();
+}
+
+TEST_F(GrpcAsyncDispatcherTest,
+ ServerStreamsOneMessageThenClosedByStreamHolder) {
+ base::RunLoop run_loop;
+ std::unique_ptr<EchoStream> stream;
+ std::unique_ptr<ScopedGrpcServerStream> scoped_stream = StartEchoStream(
+ "Hello", base::BindLambdaForTesting([&](const EchoResponse& response) {
+ ASSERT_EQ("Echo 1", response.text());
+ stream->OnClientReceivedEcho();
+ scoped_stream.reset();
+ }),
+ CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::CANCELLED,
+ &run_loop));
+ stream = server_->AcceptEchoStream("Hello");
+ stream->SendEcho("Echo 1");
+ run_loop.Run();
+}
+
} // namespace remoting
diff --git a/remoting/signaling/grpc_async_server_streaming_call_data.cc b/remoting/signaling/grpc_async_server_streaming_call_data.cc
new file mode 100644
index 0000000..8f07f5a
--- /dev/null
+++ b/remoting/signaling/grpc_async_server_streaming_call_data.cc
@@ -0,0 +1,77 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/signaling/grpc_async_server_streaming_call_data.h"
+
+#include "base/bind.h"
+#include "remoting/signaling/scoped_grpc_server_stream.h"
+
+namespace remoting {
+namespace internal {
+
+GrpcAsyncServerStreamingCallDataBase::GrpcAsyncServerStreamingCallDataBase(
+ std::unique_ptr<grpc::ClientContext> context,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed)
+ : GrpcAsyncCallData(std::move(context)), weak_factory_(this) {
+ DCHECK(on_channel_closed);
+ on_channel_closed_ = std::move(on_channel_closed);
+}
+
+GrpcAsyncServerStreamingCallDataBase::~GrpcAsyncServerStreamingCallDataBase() =
+ default;
+
+bool GrpcAsyncServerStreamingCallDataBase::OnDequeuedOnDispatcherThread(
+ bool operation_succeeded) {
+ base::AutoLock autolock(state_lock_);
+ if (state_ == State::CLOSED) {
+ return false;
+ }
+ if (state_ == State::FINISHING) {
+ DCHECK(operation_succeeded);
+ state_ = State::CLOSED;
+ ResolveChannelClosed();
+ return false;
+ }
+ if (!operation_succeeded) {
+ VLOG(0) << "Can't read any more data. Figuring out the reason..."
+ << " Streaming call: " << this;
+ state_ = State::FINISHING;
+ FinishStream();
+ return true;
+ }
+ if (state_ == State::STARTING) {
+ VLOG(0) << "Streaming call started: " << this;
+ state_ = State::STREAMING;
+ WaitForIncomingMessage();
+ return true;
+ }
+ DCHECK_EQ(State::STREAMING, state_);
+ VLOG(0) << "Streaming call received message: " << this;
+ ResolveIncomingMessage();
+ WaitForIncomingMessage();
+ return true;
+}
+
+std::unique_ptr<ScopedGrpcServerStream>
+GrpcAsyncServerStreamingCallDataBase::CreateStreamHolder() {
+ return std::make_unique<ScopedGrpcServerStream>(weak_factory_.GetWeakPtr());
+}
+
+void GrpcAsyncServerStreamingCallDataBase::OnRequestCanceled() {
+ base::AutoLock autolock(state_lock_);
+ if (state_ == State::CLOSED) {
+ return;
+ }
+ state_ = State::CLOSED;
+ status_ = grpc::Status::CANCELLED;
+ ResolveChannelClosed();
+}
+
+void GrpcAsyncServerStreamingCallDataBase::ResolveChannelClosed() {
+ caller_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(std::move(on_channel_closed_), status_));
+}
+
+} // namespace internal
+} // namespace remoting
diff --git a/remoting/signaling/grpc_async_server_streaming_call_data.h b/remoting/signaling/grpc_async_server_streaming_call_data.h
new file mode 100644
index 0000000..230224f
--- /dev/null
+++ b/remoting/signaling/grpc_async_server_streaming_call_data.h
@@ -0,0 +1,121 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_GRPC_ASYNC_SERVER_STREAMING_CALL_DATA_H_
+#define REMOTING_SIGNALING_GRPC_ASYNC_SERVER_STREAMING_CALL_DATA_H_
+
+#include <memory>
+#include <utility>
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/macros.h"
+#include "base/memory/weak_ptr.h"
+#include "base/synchronization/lock.h"
+#include "base/thread_annotations.h"
+#include "remoting/signaling/grpc_async_call_data.h"
+#include "third_party/grpc/src/include/grpcpp/support/async_stream.h"
+
+namespace remoting {
+
+class ScopedGrpcServerStream;
+
+namespace internal {
+
+// GrpcAsyncCallData implementation for server streaming call. The object is
+// first enqueued for starting the stream, then kept being re-enqueued to
+// receive a new message, until it's canceled by calling CancelRequest().
+class GrpcAsyncServerStreamingCallDataBase : public GrpcAsyncCallData {
+ public:
+ GrpcAsyncServerStreamingCallDataBase(
+ std::unique_ptr<grpc::ClientContext> context,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed);
+ ~GrpcAsyncServerStreamingCallDataBase() override;
+
+ // GrpcAsyncCallData implementations.
+ bool OnDequeuedOnDispatcherThread(bool operation_succeeded) override;
+
+ std::unique_ptr<ScopedGrpcServerStream> CreateStreamHolder();
+
+ protected:
+ enum class State {
+ STARTING,
+ STREAMING,
+
+ // Server has closed the stream and we are getting back the reason.
+ FINISHING,
+
+ CLOSED,
+ };
+
+ virtual void ResolveIncomingMessage() = 0;
+ virtual void WaitForIncomingMessage() = 0;
+ virtual void FinishStream() = 0;
+
+ // GrpcAsyncCallData implementations.
+ void OnRequestCanceled() override;
+
+ base::Lock state_lock_;
+ State state_ GUARDED_BY(state_lock_) = State::STARTING;
+
+ private:
+ void ResolveChannelClosed();
+
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed_;
+
+ base::WeakPtrFactory<GrpcAsyncServerStreamingCallDataBase> weak_factory_;
+ DISALLOW_COPY_AND_ASSIGN(GrpcAsyncServerStreamingCallDataBase);
+};
+
+template <typename ResponseType>
+class GrpcAsyncServerStreamingCallData
+ : public GrpcAsyncServerStreamingCallDataBase {
+ public:
+ using OnIncomingMessageCallback =
+ base::RepeatingCallback<void(const ResponseType&)>;
+
+ GrpcAsyncServerStreamingCallData(
+ std::unique_ptr<grpc::ClientContext> context,
+ const OnIncomingMessageCallback& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed)
+ : GrpcAsyncServerStreamingCallDataBase(std::move(context),
+ std::move(on_channel_closed)) {
+ on_incoming_msg_ = on_incoming_msg;
+ }
+ ~GrpcAsyncServerStreamingCallData() override = default;
+
+ void Initialize(
+ std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> reader) {
+ reader_ = std::move(reader);
+ }
+
+ protected:
+ // GrpcAsyncServerStreamingCallDataBase implementations.
+ void ResolveIncomingMessage() override {
+ caller_task_runner_->PostTask(FROM_HERE,
+ base::BindOnce(on_incoming_msg_, response_));
+ }
+
+ void WaitForIncomingMessage() override {
+ DCHECK(reader_);
+ reader_->Read(&response_, /* event_tag */ this);
+ }
+
+ void FinishStream() override {
+ DCHECK(reader_);
+ reader_->Finish(&status_, /* event_tag */ this);
+ }
+
+ private:
+ ResponseType response_;
+ std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> reader_;
+ OnIncomingMessageCallback on_incoming_msg_;
+
+ DISALLOW_COPY_AND_ASSIGN(GrpcAsyncServerStreamingCallData);
+};
+
+} // namespace internal
+} // namespace remoting
+
+#endif // REMOTING_SIGNALING_GRPC_ASYNC_SERVER_STREAMING_CALL_DATA_H_
diff --git a/remoting/signaling/grpc_async_unary_call_data.h b/remoting/signaling/grpc_async_unary_call_data.h
new file mode 100644
index 0000000..09c58f0
--- /dev/null
+++ b/remoting/signaling/grpc_async_unary_call_data.h
@@ -0,0 +1,59 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_GRPC_ASYNC_UNARY_CALL_DATA_H_
+#define REMOTING_SIGNALING_GRPC_ASYNC_UNARY_CALL_DATA_H_
+
+#include <memory>
+#include <utility>
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/macros.h"
+#include "remoting/signaling/grpc_async_call_data.h"
+#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
+
+namespace remoting {
+namespace internal {
+
+// GrpcAsyncCallData implementation for unary call. The object is enqueued
+// when waiting for response and dequeued once the response is received.
+template <typename ResponseType>
+class GrpcAsyncUnaryCallData : public GrpcAsyncCallData {
+ public:
+ using RpcCallback =
+ base::OnceCallback<void(const grpc::Status&, const ResponseType&)>;
+
+ GrpcAsyncUnaryCallData(
+ std::unique_ptr<grpc::ClientContext> context,
+ std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
+ response_reader,
+ RpcCallback callback)
+ : GrpcAsyncCallData(std::move(context)) {
+ response_reader_ = std::move(response_reader);
+ callback_ = std::move(callback);
+ response_reader_->Finish(&response_, &status_, /* event_tag */ this);
+ }
+ ~GrpcAsyncUnaryCallData() override = default;
+
+ bool OnDequeuedOnDispatcherThread(bool operation_succeeded) override {
+ DCHECK(operation_succeeded);
+ caller_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(std::move(callback_), status_, response_));
+ return false;
+ }
+
+ private:
+ std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
+ response_reader_;
+ ResponseType response_;
+ RpcCallback callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(GrpcAsyncUnaryCallData);
+};
+
+} // namespace internal
+} // namespace remoting
+
+#endif // REMOTING_SIGNALING_GRPC_ASYNC_UNARY_CALL_DATA_H_
diff --git a/remoting/signaling/scoped_grpc_server_stream.cc b/remoting/signaling/scoped_grpc_server_stream.cc
new file mode 100644
index 0000000..acfa360
--- /dev/null
+++ b/remoting/signaling/scoped_grpc_server_stream.cc
@@ -0,0 +1,21 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/signaling/scoped_grpc_server_stream.h"
+
+#include "remoting/signaling/grpc_async_server_streaming_call_data.h"
+
+namespace remoting {
+
+ScopedGrpcServerStream::ScopedGrpcServerStream(
+ base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data)
+ : call_data_(call_data) {}
+
+ScopedGrpcServerStream::~ScopedGrpcServerStream() {
+ if (call_data_) {
+ call_data_->CancelRequest();
+ }
+}
+
+} // namespace remoting
diff --git a/remoting/signaling/scoped_grpc_server_stream.h b/remoting/signaling/scoped_grpc_server_stream.h
new file mode 100644
index 0000000..9b83311
--- /dev/null
+++ b/remoting/signaling/scoped_grpc_server_stream.h
@@ -0,0 +1,32 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_SCOPED_GRPC_SERVER_STREAM_H_
+#define REMOTING_SIGNALING_SCOPED_GRPC_SERVER_STREAM_H_
+
+#include "base/macros.h"
+#include "base/memory/weak_ptr.h"
+
+namespace remoting {
+
+namespace internal {
+class GrpcAsyncServerStreamingCallDataBase;
+} // namespace internal
+
+// A class that holds a gRPC server stream. The streaming channel will be closed
+// once the holder object is deleted.
+class ScopedGrpcServerStream {
+ public:
+ explicit ScopedGrpcServerStream(
+ base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data);
+ ~ScopedGrpcServerStream();
+
+ private:
+ base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data_;
+ DISALLOW_COPY_AND_ASSIGN(ScopedGrpcServerStream);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_SIGNALING_SCOPED_GRPC_SERVER_STREAM_H_
diff --git a/remoting/signaling/using_grpc_async_dispatcher.md b/remoting/signaling/using_grpc_async_dispatcher.md
new file mode 100644
index 0000000..ae6a9b4
--- /dev/null
+++ b/remoting/signaling/using_grpc_async_dispatcher.md
@@ -0,0 +1,91 @@
+# What does GrpcAsyncDispatcher do?
+
+gRPC++ uses
+[completion queue](https://grpc.io/docs/tutorials/async/helloasync-cpp.html)
+to handle async operations. It doesn't fit well with Chromium's callback-based
+async handling paradigm, and it is technically still a blocking API unless you
+create a new thread to run the queue. gRPC is working on adding callback-based
+APIs but it won't be ready to use in the near future, so we created a
+GrpcAsyncDispatcher class to help adapting gRPC's completion queue logic into
+Chromium's callback paradigm.
+
+# Basic usage
+
+```cpp
+class MyClass {
+ public:
+ MyClass() : weak_factory_(this) {}
+ ~MyClass() {}
+
+ void SayHello() {
+ HelloRequest request;
+ dispatcher_->ExecuteAsyncRpc(
+ // This is run immediately inside the call stack of |ExecuteAsyncRpc|.
+ base::BindOnce(&HelloService::Stub::AsyncSayHello,
+ base::Unretained(stub_.get())),
+ std::make_unique<grpc::ClientContext>(), request,
+ // Callback might be called after the dispatcher is destroyed. Make sure
+ // to bind WeakPtr here.
+ base::BindOnce(&MyClass::OnHelloResult,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ void StartHelloStream() {
+ StreamHelloRequest request;
+ scoped_hello_stream_ = dispatcher_->ExecuteAsyncServerStreamingRpc(
+ base::BindOnce(&HelloService::Stub::AsyncStreamHello,
+ base::Unretained(stub_.get())),
+ std::make_unique<grpc::ClientContext>(), request,
+ base::BindRepeating(&MyClass::OnHelloStreamMessage,
+ weak_factory_.GetWeakPtr()),
+ base::BindOnce(&MyClass::OnHelloStreamClosed,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ void CloseHelloStream() {
+ scoped_hello_stream_.reset();
+ }
+
+ private:
+ void OnHelloResult(const grpc::Status& status,
+ const HelloResponse& response) {
+ if (status.error_code() == grpc::StatusCode::CANCELLED) {
+ // The request has been canceled because |dispatcher_| is destroyed.
+ return;
+ }
+
+ if (!status.ok()) {
+ // Handle other error here.
+ return;
+ }
+
+ // Response is received. Use the result here.
+ }
+
+ void OnHelloStreamMessage(const HelloStreamResponse& response) {
+ // This will be called every time the server sends back messages
+ // through the stream.
+ }
+
+ void OnHelloStreamClosed(const grpc::Status& status) {
+ switch (status.error_code()) {
+ case grpc::StatusCode::CANCELLED:
+ // The stream is closed by the client, either because the scoped stream
+ // is deleted or the dispatcher is deleted.
+ break;
+ case grpc::StatusCode::UNAVAILABLE:
+ // The stream is either closed by the server or dropped due to
+ // network issues.
+ break;
+ default:
+ NOTREACHED();
+ break;
+ }
+ }
+
+ std::unique_ptr<HelloService::Stub> stub_;
+ GrpcAsyncDispatcher dispatcher_;
+ std::unique_ptr<ScopedGrpcServerStream> scoped_hello_stream_;
+ base::WeakPtrFactory<MyClass> weak_factory_;
+};
+```
diff --git a/remoting/test/ftl_signaling_playground.cc b/remoting/test/ftl_signaling_playground.cc
index 6ed29616..fb119f0 100644
--- a/remoting/test/ftl_signaling_playground.cc
+++ b/remoting/test/ftl_signaling_playground.cc
@@ -215,7 +215,7 @@
void FtlSignalingPlayground::OnGetIceServerResponse(
base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::GetICEServerResponse& response) {
if (status.ok()) {
printf("Ice transport policy: %s\n",
@@ -261,7 +261,7 @@
void FtlSignalingPlayground::OnSignInGaiaResponse(
base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::SignInGaiaResponse& response) {
if (status.ok()) {
// TODO(yuweih): Allow loading auth token directly from command line.
@@ -293,7 +293,7 @@
void FtlSignalingPlayground::OnPullMessagesResponse(
base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::PullMessagesResponse& response) {
if (!status.ok()) {
if (status.error_code() == grpc::StatusCode::UNAUTHENTICATED) {
@@ -352,7 +352,7 @@
void FtlSignalingPlayground::OnAckMessagesResponse(
base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::AckMessagesResponse& response) {
if (status.ok()) {
VLOG(0) << "Messages acked";
diff --git a/remoting/test/ftl_signaling_playground.h b/remoting/test/ftl_signaling_playground.h
index d790b36..b92a804 100644
--- a/remoting/test/ftl_signaling_playground.h
+++ b/remoting/test/ftl_signaling_playground.h
@@ -42,20 +42,20 @@
void GetIceServer(base::OnceClosure on_done);
void OnGetIceServerResponse(base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::GetICEServerResponse& response);
void SignInGaia(base::OnceClosure on_done);
void OnSignInGaiaResponse(base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::SignInGaiaResponse& response);
void PullMessages(base::OnceClosure on_done);
void OnPullMessagesResponse(base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::PullMessagesResponse& response);
void OnAckMessagesResponse(base::OnceClosure on_done,
- grpc::Status status,
+ const grpc::Status& status,
const ftl::AckMessagesResponse& response);
std::unique_ptr<test::TestTokenStorage> storage_;