[fuchsia] Flush LegacyMetricsClient buffer for OnCloseSoon FIDL event.

Handles the fuchsia.legacymetrics.MetricsRecorder OnCloseSoon() event.
It is sent by the service to request that its clients deliver any
buffered metrics data prior to an impending orderly shutdown.

Bug: 1087971
Change-Id: I5eb76bb2362dc372890cdea32e4a56a77265951e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2225578
Commit-Queue: Kevin Marshall <[email protected]>
Reviewed-by: Sergey Ulanov <[email protected]>
Cr-Commit-Position: refs/heads/master@{#775342}
diff --git a/fuchsia/base/legacymetrics_client.cc b/fuchsia/base/legacymetrics_client.cc
index a2d45a83..c8ad92e 100644
--- a/fuchsia/base/legacymetrics_client.cc
+++ b/fuchsia/base/legacymetrics_client.cc
@@ -39,6 +39,8 @@
                           ->Connect<fuchsia::legacymetrics::MetricsRecorder>();
   metrics_recorder_.set_error_handler(fit::bind_member(
       this, &LegacyMetricsClient::OnMetricsRecorderDisconnected));
+  metrics_recorder_.events().OnCloseSoon =
+      fit::bind_member(this, &LegacyMetricsClient::OnCloseSoon);
   user_events_recorder_ = std::make_unique<LegacyMetricsUserActionRecorder>();
   ScheduleNextReport();
 }
@@ -54,7 +56,18 @@
   report_additional_callback_ = std::move(callback);
 }
 
+void LegacyMetricsClient::SetNotifyFlushCallback(NotifyFlushCallback callback) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  DCHECK(callback);
+  DCHECK(!metrics_recorder_)
+      << "SetNotifyFlushCallback() must be called before Start().";
+
+  notify_flush_callback_ = std::move(callback);
+}
+
 void LegacyMetricsClient::ScheduleNextReport() {
+  DCHECK(!is_flushing_);
+
   DVLOG(1) << "Scheduling next report in " << report_interval_.InSeconds()
            << "seconds.";
   timer_.Start(FROM_HERE, report_interval_, this,
@@ -91,35 +104,46 @@
     }
   }
 
-  if (events.empty()) {
-    ScheduleNextReport();
-    return;
-  }
+  std::move(events.begin(), events.end(), std::back_inserter(to_send_));
 
-  DrainBuffer(std::move(events));
+  DrainBuffer();
 }
 
-void LegacyMetricsClient::DrainBuffer(
-    std::vector<fuchsia::legacymetrics::Event> buffer) {
-  if (buffer.empty()) {
-    DVLOG(1) << "Buffer drained.";
-    ScheduleNextReport();
+void LegacyMetricsClient::DrainBuffer() {
+  DVLOG(1) << __func__ << " called.";
+
+  if (record_ack_pending_) {
+    // There is a Record() call already inflight. When it is acknowledged,
+    // buffer draining will continue.
     return;
   }
 
-  // Since ordering doesn't matter, we can efficiently drain |buffer| by
+  if (to_send_.empty()) {
+    DVLOG(1) << "Buffer drained.";
+
+    if (is_flushing_) {
+      metrics_recorder_.Unbind();
+    } else {
+      ScheduleNextReport();
+    }
+
+    return;
+  }
+
+  // Since ordering doesn't matter, we can efficiently drain |to_send_| by
   // repeatedly sending and truncating its tail.
-  const size_t batch_size = std::min(buffer.size(), kMaxBatchSize);
-  const size_t batch_start_idx = buffer.size() - batch_size;
+  const size_t batch_size = std::min(to_send_.size(), kMaxBatchSize);
+  const size_t batch_start_idx = to_send_.size() - batch_size;
   std::vector<fuchsia::legacymetrics::Event> batch;
   batch.resize(batch_size);
-  std::move(buffer.begin() + batch_start_idx, buffer.end(), batch.begin());
-  buffer.resize(buffer.size() - batch_size);
+  std::move(to_send_.begin() + batch_start_idx, to_send_.end(), batch.begin());
+  to_send_.resize(to_send_.size() - batch_size);
 
-  metrics_recorder_->Record(std::move(batch),
-                            [this, buffer = std::move(buffer)]() mutable {
-                              DrainBuffer(std::move(buffer));
-                            });
+  record_ack_pending_ = true;
+  metrics_recorder_->Record(std::move(batch), [this]() {
+    record_ack_pending_ = false;
+    DrainBuffer();
+  });
 }
 
 void LegacyMetricsClient::OnMetricsRecorderDisconnected(zx_status_t status) {
@@ -131,4 +155,20 @@
   timer_.AbandonAndStop();
 }
 
+void LegacyMetricsClient::OnCloseSoon() {
+  DVLOG(1) << __func__ << " called.";
+
+  timer_.AbandonAndStop();
+
+  is_flushing_ = true;
+  if (notify_flush_callback_) {
+    // Defer reporting until the flush operation has finished.
+    std::move(notify_flush_callback_)
+        .Run(base::BindOnce(&LegacyMetricsClient::StartReport,
+                            weak_factory_.GetWeakPtr()));
+  } else {
+    StartReport();
+  }
+}
+
 }  // namespace cr_fuchsia
diff --git a/fuchsia/base/legacymetrics_client.h b/fuchsia/base/legacymetrics_client.h
index 98576a1..b9ec76a2 100644
--- a/fuchsia/base/legacymetrics_client.h
+++ b/fuchsia/base/legacymetrics_client.h
@@ -31,6 +31,8 @@
 
   using ReportAdditionalMetricsCallback = base::RepeatingCallback<void(
       base::OnceCallback<void(std::vector<fuchsia::legacymetrics::Event>)>)>;
+  using NotifyFlushCallback =
+      base::OnceCallback<void(base::OnceClosure completion_cb)>;
 
   LegacyMetricsClient();
   ~LegacyMetricsClient();
@@ -50,18 +52,27 @@
   void SetReportAdditionalMetricsCallback(
       ReportAdditionalMetricsCallback callback);
 
+  // Sets a |callback| which is invoked to warn that the connection to the
+  // remote MetricsRecorder will be terminated. The completion closure passed to
+  // |callback| should be invoked to signal flush completion.
+  void SetNotifyFlushCallback(NotifyFlushCallback callback);
+
  private:
   void ScheduleNextReport();
   void StartReport();
   void Report(std::vector<fuchsia::legacymetrics::Event> additional_metrics);
   void OnMetricsRecorderDisconnected(zx_status_t status);
+  void OnCloseSoon();
 
-  // Incrementally sends the contents of |buffer| to |metrics_recorder|, and
-  // invokes |done_cb| when finished.
-  void DrainBuffer(std::vector<fuchsia::legacymetrics::Event> buffer);
+  // Incrementally sends the contents of |to_send_| to |metrics_recorder_|.
+  void DrainBuffer();
 
   base::TimeDelta report_interval_;
   ReportAdditionalMetricsCallback report_additional_callback_;
+  NotifyFlushCallback notify_flush_callback_;
+  bool is_flushing_ = false;
+  bool record_ack_pending_ = false;
+  std::vector<fuchsia::legacymetrics::Event> to_send_;
   std::unique_ptr<LegacyMetricsUserActionRecorder> user_events_recorder_;
 
   fuchsia::legacymetrics::MetricsRecorderPtr metrics_recorder_;
diff --git a/fuchsia/base/legacymetrics_client_unittest.cc b/fuchsia/base/legacymetrics_client_unittest.cc
index 8c735bbb..1b48827 100644
--- a/fuchsia/base/legacymetrics_client_unittest.cc
+++ b/fuchsia/base/legacymetrics_client_unittest.cc
@@ -14,6 +14,7 @@
 #include "base/threading/thread_task_runner_handle.h"
 #include "fuchsia/base/legacymetrics_client.h"
 #include "fuchsia/base/legacymetrics_histogram_flattener.h"
+#include "fuchsia/base/result_receiver.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
 namespace cr_fuchsia {
@@ -45,10 +46,21 @@
     ack_callback_ = base::nullopt;
   }
 
+  void set_expect_ack_dropped(bool expect_dropped) {
+    expect_ack_dropped_ = expect_dropped;
+  }
+
   // fuchsia::legacymetrics::MetricsRecorder implementation.
   void Record(std::vector<fuchsia::legacymetrics::Event> events,
               RecordCallback callback) override {
-    recorded_events_ = std::move(events);
+    std::move(events.begin(), events.end(),
+              std::back_inserter(recorded_events_));
+
+    // Received a call to Record() before the previous one was acknowledged,
+    // which can happen in some cases (e.g. flushing).
+    if (ack_callback_)
+      EXPECT_TRUE(expect_ack_dropped_);
+
     ack_callback_ = std::move(callback);
 
     if (on_record_cb_)
@@ -61,6 +73,7 @@
   std::vector<fuchsia::legacymetrics::Event> recorded_events_;
   base::OnceClosure on_record_cb_;
   base::Optional<RecordCallback> ack_callback_;
+  bool expect_ack_dropped_ = false;
 };
 
 class LegacyMetricsClientTest : public testing::Test {
@@ -71,9 +84,10 @@
   ~LegacyMetricsClientTest() override = default;
 
   void SetUp() override {
-    service_binding_ = std::make_unique<base::fuchsia::ScopedServiceBinding<
-        fuchsia::legacymetrics::MetricsRecorder>>(
-        test_context_.additional_services(), &test_recorder_);
+    service_binding_ =
+        std::make_unique<base::fuchsia::ScopedSingleClientServiceBinding<
+            fuchsia::legacymetrics::MetricsRecorder>>(
+            test_context_.additional_services(), &test_recorder_);
     base::SetRecordActionTaskRunner(base::ThreadTaskRunnerHandle::Get());
 
     // Flush any dirty histograms from previous test runs in this process.
@@ -84,7 +98,7 @@
   base::test::TaskEnvironment task_environment_;
   base::TestComponentContextForProcess test_context_;
   TestMetricsRecorder test_recorder_;
-  std::unique_ptr<base::fuchsia::ScopedServiceBinding<
+  std::unique_ptr<base::fuchsia::ScopedSingleClientServiceBinding<
       fuchsia::legacymetrics::MetricsRecorder>>
       service_binding_;
   LegacyMetricsClient client_;
@@ -216,5 +230,92 @@
   test_recorder_.SendAck();
 }
 
+TEST_F(LegacyMetricsClientTest, FlushWithPending) {
+  client_.Start(kReportInterval);
+  base::RunLoop().RunUntilIdle();
+
+  UMA_HISTOGRAM_COUNTS_1M("foo", 20);
+
+  EXPECT_FALSE(test_recorder_.IsRecordInFlight());
+  service_binding_->events().OnCloseSoon();
+  base::RunLoop().RunUntilIdle();
+  EXPECT_TRUE(test_recorder_.IsRecordInFlight());
+
+  // The service should be unbound once all data is drained.
+  EXPECT_TRUE(service_binding_->has_clients());
+  auto events = test_recorder_.WaitForEvents();
+  test_recorder_.SendAck();
+  base::RunLoop().RunUntilIdle();
+  EXPECT_EQ(1u, events.size());
+  EXPECT_EQ("foo", events[0].histogram().name());
+  EXPECT_FALSE(service_binding_->has_clients());
+}
+
+TEST_F(LegacyMetricsClientTest, FlushNoData) {
+  client_.Start(kReportInterval);
+  base::RunLoop().RunUntilIdle();
+
+  EXPECT_TRUE(service_binding_->has_clients());
+  EXPECT_FALSE(test_recorder_.IsRecordInFlight());
+  service_binding_->events().OnCloseSoon();
+  base::RunLoop().RunUntilIdle();
+  EXPECT_FALSE(service_binding_->has_clients());
+}
+
+TEST_F(LegacyMetricsClientTest, FlushWithOutstandingAck) {
+  client_.Start(kReportInterval);
+  base::RunLoop().RunUntilIdle();
+
+  // Send "foo", but don't ack.
+  UMA_HISTOGRAM_COUNTS_1M("foo", 20);
+  task_environment_.FastForwardBy(kReportInterval);
+  EXPECT_TRUE(test_recorder_.IsRecordInFlight());
+
+  // Allow the flush operation to call Record() without waiting for a prior ack.
+  test_recorder_.set_expect_ack_dropped(true);
+
+  // Buffer another event and trigger a flush.
+  UMA_HISTOGRAM_COUNTS_1M("bar", 20);
+  EXPECT_TRUE(service_binding_->has_clients());
+  service_binding_->events().OnCloseSoon();
+
+  // Simulate an asynchronous ack from the recorder, which be delivered around
+  // the same time as the flush's Record() call. The ack should be gracefully
+  // ignored by the client.
+  test_recorder_.SendAck();
+
+  base::RunLoop().RunUntilIdle();
+
+  auto events = test_recorder_.WaitForEvents();
+  test_recorder_.SendAck();
+  base::RunLoop().RunUntilIdle();
+  EXPECT_EQ(2u, events.size());
+  EXPECT_EQ("foo", events[0].histogram().name());
+  EXPECT_EQ("bar", events[1].histogram().name());
+  EXPECT_FALSE(service_binding_->has_clients());
+}
+
+TEST_F(LegacyMetricsClientTest, ExternalFlushSignal) {
+  ResultReceiver<base::OnceClosure> flush_receiver;
+  client_.SetNotifyFlushCallback(flush_receiver.GetReceiveCallback());
+  client_.Start(kReportInterval);
+  base::RunLoop().RunUntilIdle();
+
+  UMA_HISTOGRAM_COUNTS_1M("foo", 20);
+
+  // Verify that reporting does not start until the flush completion callback is
+  // run.
+  EXPECT_FALSE(test_recorder_.IsRecordInFlight());
+  service_binding_->events().OnCloseSoon();
+  base::RunLoop().RunUntilIdle();
+  EXPECT_FALSE(test_recorder_.IsRecordInFlight());
+
+  // Verify that invoking the completion callback unblocks reporting.
+  EXPECT_TRUE(flush_receiver.has_value());
+  std::move(*flush_receiver).Run();
+  base::RunLoop().RunUntilIdle();
+  EXPECT_TRUE(test_recorder_.IsRecordInFlight());
+}
+
 }  // namespace
 }  // namespace cr_fuchsia