blob: db175cd51b7b5c1d4e8088b36ed461e854f4fa8a [file] [log] [blame]
Yuwei Huang54a4f732019-02-27 02:01:321// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "remoting/signaling/grpc_async_dispatcher.h"
6
7#include <memory>
8#include <string>
9#include <utility>
10
11#include "base/bind.h"
Yuwei Huang81bbc1a2019-03-08 20:46:0612#include "base/logging.h"
Yuwei Huang54a4f732019-02-27 02:01:3213#include "base/message_loop/message_loop.h"
14#include "base/run_loop.h"
15#include "base/test/bind_test_util.h"
Yuwei Huang81bbc1a2019-03-08 20:46:0616#include "base/threading/thread_task_runner_handle.h"
Yuwei Huang54a4f732019-02-27 02:01:3217#include "remoting/signaling/grpc_async_dispatcher_test_services.grpc.pb.h"
18#include "testing/gtest/include/gtest/gtest.h"
19#include "third_party/grpc/src/include/grpcpp/grpcpp.h"
20
21namespace remoting {
22
23namespace {
24
Yuwei Huang81bbc1a2019-03-08 20:46:0625void* TagForInt(int num) {
26 return reinterpret_cast<void*>(num);
27}
28
29base::RepeatingCallback<void(const EchoResponse&)>
30NotReachedStreamingCallback() {
31 return base::BindRepeating([](const EchoResponse&) { NOTREACHED(); });
32}
33
34GrpcAsyncDispatcher::RpcChannelClosedCallback
35CheckStatusThenQuitRunLoopCallback(grpc::StatusCode expected_status_code,
36 base::RunLoop* run_loop) {
37 return base::BindLambdaForTesting([=](const grpc::Status& status) {
38 ASSERT_EQ(expected_status_code, status.error_code());
39 run_loop->QuitWhenIdle();
40 });
41}
42
43class EchoStream {
44 public:
45 EchoStream(std::unique_ptr<grpc::ServerContext> context,
46 grpc::ServerCompletionQueue* completion_queue,
47 std::unique_ptr<grpc::ServerAsyncWriter<EchoResponse>> writer);
48 ~EchoStream();
49
50 // SendEcho() must be followed by a call to OnClientReceivedEcho().
51 void SendEcho(const std::string& text);
52 void OnClientReceivedEcho();
53 void Close(const grpc::Status& status);
54
55 private:
56 std::unique_ptr<grpc::ServerContext> context_;
57 grpc::ServerCompletionQueue* completion_queue_;
58 std::unique_ptr<grpc::ServerAsyncWriter<EchoResponse>> writer_;
59 bool closed_ = false;
60};
61
62EchoStream::EchoStream(
63 std::unique_ptr<grpc::ServerContext> context,
64 grpc::ServerCompletionQueue* completion_queue,
65 std::unique_ptr<grpc::ServerAsyncWriter<EchoResponse>> writer)
66 : context_(std::move(context)),
67 completion_queue_(completion_queue),
68 writer_(std::move(writer)) {}
69
70EchoStream::~EchoStream() {
71 Close(grpc::Status::OK);
72}
73
74void EchoStream::SendEcho(const std::string& text) {
75 EchoResponse response;
76 response.set_text(text);
77 writer_->Write(response, this);
78}
79
80void EchoStream::OnClientReceivedEcho() {
81 void* tag;
82 bool ok;
83 completion_queue_->Next(&tag, &ok);
84 ASSERT_TRUE(ok);
85 ASSERT_EQ(this, tag);
86}
87
88void EchoStream::Close(const grpc::Status& status) {
89 if (closed_) {
90 return;
91 }
92
93 writer_->Finish(status, this);
94
95 void* tag;
96 bool ok;
97 completion_queue_->Next(&tag, &ok);
98 if (!ok) {
99 LOG(WARNING) << "Failed to finish stream. Connection might be dropped.";
100 }
101 ASSERT_EQ(this, tag);
102 closed_ = true;
103}
104
105// EchoStream
106
Yuwei Huang54a4f732019-02-27 02:01:32107class EchoServerImpl {
108 public:
109 EchoServerImpl();
110 ~EchoServerImpl();
111
112 void Start();
113 std::shared_ptr<grpc::Channel> CreateInProcessChannel();
Yuwei Huang81bbc1a2019-03-08 20:46:06114 void HandleOneEchoRequest();
115 std::unique_ptr<EchoStream> AcceptEchoStream(
116 const std::string& expected_request_text);
Yuwei Huang54a4f732019-02-27 02:01:32117
118 private:
119 GrpcAsyncDispatcherTestService::AsyncService async_service_;
120 std::unique_ptr<grpc::Server> server_;
121 std::unique_ptr<grpc::ServerCompletionQueue> completion_queue_;
122};
123
124EchoServerImpl::EchoServerImpl() = default;
125
126EchoServerImpl::~EchoServerImpl() {
127 server_->Shutdown();
128 completion_queue_->Shutdown();
129
130 // gRPC requires draining the completion queue before destroying it.
131 void* tag;
132 bool ok;
133 while (completion_queue_->Next(&tag, &ok)) {
134 }
135}
136
137void EchoServerImpl::Start() {
138 DCHECK(!server_);
139 grpc::ServerBuilder builder;
140 builder.RegisterService(&async_service_);
141 completion_queue_ = builder.AddCompletionQueue();
142 server_ = builder.BuildAndStart();
143}
144
145std::shared_ptr<grpc::Channel> EchoServerImpl::CreateInProcessChannel() {
146 return server_->InProcessChannel(grpc::ChannelArguments());
147}
148
Yuwei Huang81bbc1a2019-03-08 20:46:06149void EchoServerImpl::HandleOneEchoRequest() {
Yuwei Huang54a4f732019-02-27 02:01:32150 grpc::ServerContext context;
151 EchoRequest request;
152 grpc::ServerAsyncResponseWriter<EchoResponse> responder(&context);
153 async_service_.RequestEcho(&context, &request, &responder,
154 completion_queue_.get(), completion_queue_.get(),
Yuwei Huang81bbc1a2019-03-08 20:46:06155 TagForInt(1));
Yuwei Huang54a4f732019-02-27 02:01:32156
157 void* tag;
158 bool ok;
159
160 completion_queue_->Next(&tag, &ok);
161 ASSERT_TRUE(ok);
Yuwei Huang81bbc1a2019-03-08 20:46:06162 ASSERT_EQ(TagForInt(1), tag);
Yuwei Huang54a4f732019-02-27 02:01:32163
164 EchoResponse response;
165 response.set_text(request.text());
Yuwei Huang81bbc1a2019-03-08 20:46:06166 responder.Finish(response, grpc::Status::OK, TagForInt(2));
Yuwei Huang54a4f732019-02-27 02:01:32167
168 completion_queue_->Next(&tag, &ok);
169 ASSERT_TRUE(ok);
Yuwei Huang81bbc1a2019-03-08 20:46:06170 ASSERT_EQ(TagForInt(2), tag);
171}
172
173std::unique_ptr<EchoStream> EchoServerImpl::AcceptEchoStream(
174 const std::string& expected_request_text) {
175 auto context = std::make_unique<grpc::ServerContext>();
176 EchoRequest request;
177 auto writer =
178 std::make_unique<grpc::ServerAsyncWriter<EchoResponse>>(context.get());
179 async_service_.RequestStreamEcho(context.get(), &request, writer.get(),
180 completion_queue_.get(),
181 completion_queue_.get(), TagForInt(3));
182
183 void* tag;
184 bool ok;
185 completion_queue_->Next(&tag, &ok);
186 EXPECT_TRUE(ok);
187 EXPECT_EQ(TagForInt(3), tag);
188 EXPECT_EQ(expected_request_text, request.text());
189
190 return std::make_unique<EchoStream>(
191 std::move(context), completion_queue_.get(), std::move(writer));
Yuwei Huang54a4f732019-02-27 02:01:32192}
193
194} // namespace
195
196class GrpcAsyncDispatcherTest : public testing::Test {
197 public:
198 void SetUp() override;
199 void TearDown() override;
200
201 protected:
202 void AsyncSendText(const std::string& text,
203 GrpcAsyncDispatcher::RpcCallback<EchoResponse> callback);
Yuwei Huang81bbc1a2019-03-08 20:46:06204 std::unique_ptr<ScopedGrpcServerStream> StartEchoStream(
205 const std::string& request_text,
206 const GrpcAsyncDispatcher::RpcStreamCallback<EchoResponse>
207 on_incoming_msg,
208 GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed);
Yuwei Huang54a4f732019-02-27 02:01:32209
210 std::unique_ptr<EchoServerImpl> server_;
211
212 protected:
213 std::unique_ptr<GrpcAsyncDispatcher> dispatcher_;
214
215 private:
216 base::MessageLoop message_loop_;
217 std::unique_ptr<GrpcAsyncDispatcherTestService::Stub> stub_;
218};
219
220void GrpcAsyncDispatcherTest::SetUp() {
221 dispatcher_ = std::make_unique<GrpcAsyncDispatcher>();
222 server_ = std::make_unique<EchoServerImpl>();
223 server_->Start();
224 stub_ = GrpcAsyncDispatcherTestService::NewStub(
225 server_->CreateInProcessChannel());
226}
227
228void GrpcAsyncDispatcherTest::TearDown() {
229 server_.reset();
230 dispatcher_.reset();
231 stub_.reset();
232}
233
234void GrpcAsyncDispatcherTest::AsyncSendText(
235 const std::string& text,
236 GrpcAsyncDispatcher::RpcCallback<EchoResponse> callback) {
237 EchoRequest request;
238 request.set_text(text);
239 dispatcher_->ExecuteAsyncRpc(
240 base::BindOnce(&GrpcAsyncDispatcherTestService::Stub::AsyncEcho,
241 base::Unretained(stub_.get())),
242 std::make_unique<grpc::ClientContext>(), request, std::move(callback));
243}
244
Yuwei Huang81bbc1a2019-03-08 20:46:06245std::unique_ptr<ScopedGrpcServerStream>
246GrpcAsyncDispatcherTest::StartEchoStream(
247 const std::string& request_text,
248 const GrpcAsyncDispatcher::RpcStreamCallback<EchoResponse> on_incoming_msg,
249 GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed) {
250 EchoRequest request;
251 request.set_text(request_text);
252 return dispatcher_->ExecuteAsyncServerStreamingRpc(
253 base::BindOnce(&GrpcAsyncDispatcherTestService::Stub::AsyncStreamEcho,
254 base::Unretained(stub_.get())),
255 std::make_unique<grpc::ClientContext>(), request, on_incoming_msg,
256 std::move(on_channel_closed));
257}
258
Yuwei Huang54a4f732019-02-27 02:01:32259TEST_F(GrpcAsyncDispatcherTest, DoNothing) {}
260
261TEST_F(GrpcAsyncDispatcherTest, SendOneTextAndRespond) {
262 base::RunLoop run_loop;
263 AsyncSendText("Hello",
Yuwei Huang81bbc1a2019-03-08 20:46:06264 base::BindLambdaForTesting([&](const grpc::Status& status,
265 const EchoResponse& response) {
266 EXPECT_TRUE(status.ok());
267 EXPECT_EQ("Hello", response.text());
268 run_loop.QuitWhenIdle();
269 }));
270 server_->HandleOneEchoRequest();
Yuwei Huang54a4f732019-02-27 02:01:32271 run_loop.Run();
272}
273
274TEST_F(GrpcAsyncDispatcherTest, SendTwoTextsAndRespondOneByOne) {
275 base::RunLoop run_loop_1;
276 AsyncSendText("Hello 1",
Yuwei Huang81bbc1a2019-03-08 20:46:06277 base::BindLambdaForTesting([&](const grpc::Status& status,
278 const EchoResponse& response) {
279 EXPECT_TRUE(status.ok());
280 EXPECT_EQ("Hello 1", response.text());
281 run_loop_1.QuitWhenIdle();
282 }));
283 server_->HandleOneEchoRequest();
Yuwei Huang54a4f732019-02-27 02:01:32284 run_loop_1.Run();
285
286 base::RunLoop run_loop_2;
287 AsyncSendText("Hello 2",
Yuwei Huang81bbc1a2019-03-08 20:46:06288 base::BindLambdaForTesting([&](const grpc::Status& status,
289 const EchoResponse& response) {
290 EXPECT_TRUE(status.ok());
291 EXPECT_EQ("Hello 2", response.text());
292 run_loop_2.QuitWhenIdle();
293 }));
294 server_->HandleOneEchoRequest();
Yuwei Huang54a4f732019-02-27 02:01:32295 run_loop_2.Run();
296}
297
298TEST_F(GrpcAsyncDispatcherTest, SendTwoTextsAndRespondTogether) {
299 base::RunLoop run_loop;
300 size_t response_count = 0;
301 auto on_received_one_response = [&]() {
302 response_count++;
303 if (response_count == 2) {
Yuwei Huang81bbc1a2019-03-08 20:46:06304 run_loop.QuitWhenIdle();
Yuwei Huang54a4f732019-02-27 02:01:32305 }
306 };
307 AsyncSendText("Hello 1",
Yuwei Huang81bbc1a2019-03-08 20:46:06308 base::BindLambdaForTesting([&](const grpc::Status& status,
309 const EchoResponse& response) {
310 EXPECT_TRUE(status.ok());
311 EXPECT_EQ("Hello 1", response.text());
312 on_received_one_response();
313 }));
Yuwei Huang54a4f732019-02-27 02:01:32314 AsyncSendText("Hello 2",
Yuwei Huang81bbc1a2019-03-08 20:46:06315 base::BindLambdaForTesting([&](const grpc::Status& status,
316 const EchoResponse& response) {
317 EXPECT_TRUE(status.ok());
318 EXPECT_EQ("Hello 2", response.text());
319 on_received_one_response();
320 }));
321 server_->HandleOneEchoRequest();
322 server_->HandleOneEchoRequest();
Yuwei Huang54a4f732019-02-27 02:01:32323 run_loop.Run();
324}
325
326TEST_F(GrpcAsyncDispatcherTest, RpcCanceledOnDestruction) {
327 base::RunLoop run_loop;
328 AsyncSendText("Hello",
Yuwei Huang81bbc1a2019-03-08 20:46:06329 base::BindLambdaForTesting([&](const grpc::Status& status,
Yuwei Huang54a4f732019-02-27 02:01:32330 const EchoResponse& response) {
331 EXPECT_EQ(grpc::StatusCode::CANCELLED, status.error_code());
Yuwei Huang81bbc1a2019-03-08 20:46:06332 run_loop.QuitWhenIdle();
Yuwei Huang54a4f732019-02-27 02:01:32333 }));
334 dispatcher_.reset();
335 run_loop.Run();
336}
337
Yuwei Huang81bbc1a2019-03-08 20:46:06338TEST_F(GrpcAsyncDispatcherTest, ServerStreamNotAcceptedByServer) {
339 base::RunLoop run_loop;
340 auto scoped_stream =
341 StartEchoStream("Hello", NotReachedStreamingCallback(),
342 CheckStatusThenQuitRunLoopCallback(
343 grpc::StatusCode::CANCELLED, &run_loop));
344 base::ThreadTaskRunnerHandle::Get()->PostTask(
345 FROM_HERE, base::BindLambdaForTesting([&]() { dispatcher_.reset(); }));
346 run_loop.Run();
347}
348
349TEST_F(GrpcAsyncDispatcherTest, ServerStreamImmediatelyClosedByServer) {
350 base::RunLoop run_loop;
351 auto scoped_stream = StartEchoStream(
352 "Hello", NotReachedStreamingCallback(),
353 CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::OK, &run_loop));
354 std::unique_ptr<EchoStream> stream = server_->AcceptEchoStream("Hello");
355 base::ThreadTaskRunnerHandle::Get()->PostTask(
356 FROM_HERE, base::BindLambdaForTesting([&]() { stream.reset(); }));
357 run_loop.Run();
358}
359
360TEST_F(GrpcAsyncDispatcherTest,
361 ServerStreamImmediatelyClosedByServerWithError) {
362 base::RunLoop run_loop;
363 auto scoped_stream =
364 StartEchoStream("Hello", NotReachedStreamingCallback(),
365 CheckStatusThenQuitRunLoopCallback(
366 grpc::StatusCode::UNAUTHENTICATED, &run_loop));
367 std::unique_ptr<EchoStream> stream = server_->AcceptEchoStream("Hello");
368 base::ThreadTaskRunnerHandle::Get()->PostTask(
369 FROM_HERE, base::BindLambdaForTesting([&]() {
370 stream->Close(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
371 }));
372 run_loop.Run();
373}
374
375TEST_F(GrpcAsyncDispatcherTest, ServerStreamsOneMessageThenClosedByServer) {
376 base::RunLoop run_loop;
377 std::unique_ptr<EchoStream> stream;
378 auto scoped_stream = StartEchoStream(
379 "Hello", base::BindLambdaForTesting([&](const EchoResponse& response) {
380 ASSERT_EQ("Echo 1", response.text());
381 stream->OnClientReceivedEcho();
382 stream.reset();
383 }),
384 CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::OK, &run_loop));
385 stream = server_->AcceptEchoStream("Hello");
386 stream->SendEcho("Echo 1");
387 run_loop.Run();
388}
389
390TEST_F(GrpcAsyncDispatcherTest, ServerStreamsTwoMessagesThenClosedByServer) {
391 base::RunLoop run_loop;
392 std::unique_ptr<EchoStream> stream;
393 int received_messages_count = 0;
394 auto scoped_stream = StartEchoStream(
395 "Hello", base::BindLambdaForTesting([&](const EchoResponse& response) {
396 if (received_messages_count == 0) {
397 ASSERT_EQ("Echo 1", response.text());
398 stream->OnClientReceivedEcho();
399 stream->SendEcho("Echo 2");
400 received_messages_count++;
401 return;
402 }
403 if (received_messages_count == 1) {
404 ASSERT_EQ("Echo 2", response.text());
405 stream->OnClientReceivedEcho();
406 stream.reset();
407 received_messages_count++;
408 return;
409 }
410 NOTREACHED();
411 }),
412 CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::OK, &run_loop));
413 stream = server_->AcceptEchoStream("Hello");
414 stream->SendEcho("Echo 1");
415 run_loop.Run();
416 ASSERT_EQ(2, received_messages_count);
417}
418
419TEST_F(GrpcAsyncDispatcherTest,
420 ServerStreamOpenThenClosedByClientAtDestruction) {
421 base::RunLoop run_loop;
422 auto scoped_stream =
423 StartEchoStream("Hello", NotReachedStreamingCallback(),
424 CheckStatusThenQuitRunLoopCallback(
425 grpc::StatusCode::CANCELLED, &run_loop));
426 std::unique_ptr<EchoStream> stream = server_->AcceptEchoStream("Hello");
427 base::ThreadTaskRunnerHandle::Get()->PostTask(
428 FROM_HERE, base::BindLambdaForTesting([&]() { dispatcher_.reset(); }));
429 run_loop.Run();
430}
431
432TEST_F(GrpcAsyncDispatcherTest, ServerStreamClosedByStreamHolder) {
433 base::RunLoop run_loop;
434 std::unique_ptr<EchoStream> stream;
435 std::unique_ptr<ScopedGrpcServerStream> scoped_stream =
436 StartEchoStream("Hello", NotReachedStreamingCallback(),
437 CheckStatusThenQuitRunLoopCallback(
438 grpc::StatusCode::CANCELLED, &run_loop));
439 stream = server_->AcceptEchoStream("Hello");
440 scoped_stream.reset();
441 run_loop.Run();
442}
443
444TEST_F(GrpcAsyncDispatcherTest,
445 ServerStreamsOneMessageThenClosedByStreamHolder) {
446 base::RunLoop run_loop;
447 std::unique_ptr<EchoStream> stream;
448 std::unique_ptr<ScopedGrpcServerStream> scoped_stream = StartEchoStream(
449 "Hello", base::BindLambdaForTesting([&](const EchoResponse& response) {
450 ASSERT_EQ("Echo 1", response.text());
451 stream->OnClientReceivedEcho();
452 scoped_stream.reset();
453 }),
454 CheckStatusThenQuitRunLoopCallback(grpc::StatusCode::CANCELLED,
455 &run_loop));
456 stream = server_->AcceptEchoStream("Hello");
457 stream->SendEcho("Echo 1");
458 run_loop.Run();
459}
460
Yuwei Huang54a4f732019-02-27 02:01:32461} // namespace remoting