[Remoting][FTL] Implement PullMessages

This CL implements a new FtlSignalingPlayground option to pull messages
from the messaging backend and ack them.

Bug: 927962
Change-Id: I65296ad80c251a48cec2513cec144beed7ea6d47
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1497800
Commit-Queue: Yuwei Huang <[email protected]>
Reviewed-by: Joe Downing <[email protected]>
Cr-Commit-Position: refs/heads/master@{#637481}
diff --git a/remoting/test/ftl_signaling_playground.cc b/remoting/test/ftl_signaling_playground.cc
index e6acc05..6ed29616 100644
--- a/remoting/test/ftl_signaling_playground.cc
+++ b/remoting/test/ftl_signaling_playground.cc
@@ -7,6 +7,7 @@
 #include <inttypes.h>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "base/base64.h"
 #include "base/bind.h"
@@ -113,7 +114,8 @@
         "\nOptions:\n"
         "  1. GetIceServer\n"
         "  2. SignInGaia\n"
-        "  3. Quit\n\n"
+        "  3. PullMessages\n"
+        "  4. Quit\n\n"
         "Your choice [number]: ");
     int choice = 0;
     base::StringToInt(ReadString(), &choice);
@@ -126,6 +128,9 @@
         SignInGaia(run_loop.QuitClosure());
         break;
       case 3:
+        PullMessages(run_loop.QuitClosure());
+        break;
+      case 4:
         return;
       default:
         fprintf(stderr, "Unknown option\n");
@@ -270,6 +275,87 @@
         "auth_token.expires_in=%" PRId64 "\n",
         registration_id_base64.c_str(), auth_token_base64.c_str(),
         response.auth_token().expires_in());
+    client_->SetAuthToken(response.auth_token().payload());
+    VLOG(0) << "Auth token set on FtlClient";
+  } else {
+    HandleGrpcStatusError(status);
+  }
+  std::move(on_done).Run();
+}
+
+void FtlSignalingPlayground::PullMessages(base::OnceClosure on_done) {
+  DCHECK(client_);
+  VLOG(0) << "Running PullMessages...";
+  client_->PullMessages(
+      base::BindOnce(&FtlSignalingPlayground::OnPullMessagesResponse,
+                     weak_factory_.GetWeakPtr(), std::move(on_done)));
+}
+
+void FtlSignalingPlayground::OnPullMessagesResponse(
+    base::OnceClosure on_done,
+    grpc::Status status,
+    const ftl::PullMessagesResponse& response) {
+  if (!status.ok()) {
+    if (status.error_code() == grpc::StatusCode::UNAUTHENTICATED) {
+      fprintf(stderr, "Please run SignInGaia first\n");
+    } else {
+      HandleGrpcStatusError(status);
+    }
+    std::move(on_done).Run();
+    return;
+  }
+
+  std::vector<ftl::ReceiverMessage> receiver_messages;
+  printf("pull_all=%d\n", response.pulled_all());
+  for (const auto& message : response.messages()) {
+    printf(
+        "Message:\n"
+        "  message_type=%d\n"
+        "  message_id=%s\n"
+        "  sender_id.id=%s\n"
+        "  receiver_id.id=%s\n",
+        message.message_type(), message.message_id().c_str(),
+        message.sender_id().id().c_str(), message.receiver_id().id().c_str());
+
+    if (message.message_type() ==
+        ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE) {
+      ftl::ChromotingMessage chromoting_message;
+      chromoting_message.ParseFromString(message.message());
+      printf("  message(ChromotingMessage deserialized)=%s\n",
+             chromoting_message.message().c_str());
+    } else {
+      std::string message_base64;
+      base::Base64Encode(message.message(), &message_base64);
+      printf("  message(base64)=%s\n", message_base64.c_str());
+    }
+
+    ftl::ReceiverMessage receiver_message;
+    receiver_message.set_message_id(message.message_id());
+    receiver_message.set_allocated_receiver_id(
+        new ftl::Id(message.receiver_id()));
+    receiver_messages.push_back(std::move(receiver_message));
+  }
+
+  if (receiver_messages.empty()) {
+    VLOG(0) << "No message has been received";
+    std::move(on_done).Run();
+    return;
+  }
+
+  // TODO(yuweih): Might need retry logic.
+  VLOG(0) << "Acking " << receiver_messages.size() << " messages";
+  client_->AckMessages(
+      receiver_messages,
+      base::BindOnce(&FtlSignalingPlayground::OnAckMessagesResponse,
+                     weak_factory_.GetWeakPtr(), std::move(on_done)));
+}
+
+void FtlSignalingPlayground::OnAckMessagesResponse(
+    base::OnceClosure on_done,
+    grpc::Status status,
+    const ftl::AckMessagesResponse& response) {
+  if (status.ok()) {
+    VLOG(0) << "Messages acked";
   } else {
     HandleGrpcStatusError(status);
   }