Create a thread-safe observer list.  Will be used
by SystemMonitor.

Right now the class requires that Observers be RefCounted<>.  This is because we invoke tasks via NewRunnableMethod for them.  However, because we manually track lifecycle via AddObserver/RemoveObserver, we could override the RunnableMethodTraits to not require RefCounted<>.   This would have the advantage that callers do not need to make all Observers be RefCounted, but makes it more critical that observers not forget to call RemoveObserver().
Review URL: http://codereview.chromium.org/7353

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3787 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/base/observer_list_unittest.cc b/base/observer_list_unittest.cc
index 1fea315..529e8a94 100644
--- a/base/observer_list_unittest.cc
+++ b/base/observer_list_unittest.cc
@@ -2,7 +2,11 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "base/message_loop.h"
 #include "base/observer_list.h"
+#include "base/observer_list_threadsafe.h"
+#include "base/platform_thread.h"
+#include "base/ref_counted.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
 namespace {
@@ -18,7 +22,7 @@
 
 class Adder : public Foo {
  public:
-  Adder(int scaler) : total(0), scaler_(scaler) {}
+  explicit Adder(int scaler) : total(0), scaler_(scaler) {}
   virtual void Observe(int x) {
     total += x * scaler_;
   }
@@ -30,23 +34,126 @@
 
 class Disrupter : public Foo {
  public:
-  Disrupter(ObserverList<Foo>& list, Foo* doomed) : list_(list), doomed_(doomed) {
-  }
+  Disrupter(ObserverList<Foo>* list, Foo* doomed)
+      : list_(list), doomed_(doomed) { }
   virtual ~Disrupter() { }
   virtual void Observe(int x) {
-    list_.RemoveObserver(doomed_);
+    list_->RemoveObserver(doomed_);
   }
  private:
-  ObserverList<Foo>& list_;
+  ObserverList<Foo>* list_;
   Foo* doomed_;
 };
 
+class ThreadSafeDisrupter : public Foo {
+ public:
+  ThreadSafeDisrupter(ObserverListThreadSafe<Foo>* list, Foo* doomed)
+      : list_(list), doomed_(doomed) { }
+  virtual ~ThreadSafeDisrupter() { }
+  virtual void Observe(int x) {
+    list_->RemoveObserver(doomed_);
+  }
+ private:
+  ObserverListThreadSafe<Foo>* list_;
+  Foo* doomed_;
+};
+
+class ObserverListThreadSafeTest : public testing::Test {
+};
+
+static const int kThreadRunTime = 10000;  // ms to run the multi-threaded test.
+
+// A thread for use in the ThreadSafeObserver test
+// which will add and remove itself from the notification
+// list repeatedly.
+class AddRemoveThread : public PlatformThread::Delegate,
+                        public Foo {
+ public:
+  AddRemoveThread(ObserverListThreadSafe<Foo>* list, bool notify)
+      : list_(list),
+        in_list_(false),
+        start_(Time::Now()),
+        count_observes_(0),
+        count_addtask_(0),
+        do_notifies_(notify) {
+    factory_ = new ScopedRunnableMethodFactory<AddRemoveThread>(this);
+  }
+
+  ~AddRemoveThread() {
+    delete factory_;
+  }
+
+  void ThreadMain() {
+    loop_ = new MessageLoop();  // Fire up a message loop.
+    loop_->PostTask(FROM_HERE,
+      factory_->NewRunnableMethod(&AddRemoveThread::AddTask));
+    loop_->Run();
+    //LOG(ERROR) << "Loop 0x" << std::hex << loop_ << " done. " << count_observes_ << ", " << count_addtask_;
+    delete loop_;
+    loop_ = reinterpret_cast<MessageLoop*>(0xdeadbeef);
+  }
+
+  // This task just keeps posting to itself in an attempt
+  // to race with the notifier.
+  void AddTask() {
+    count_addtask_++;
+
+    if ((Time::Now() - start_).InMilliseconds() > kThreadRunTime) {
+      LOG(INFO) << "DONE!";
+      return;
+    }
+
+    if (!in_list_) {
+      list_->AddObserver(this);
+      in_list_ = true;
+    }
+
+    if (do_notifies_) {
+      list_->Notify(&Foo::Observe, 10);
+    }
+
+    loop_->PostDelayedTask(FROM_HERE,
+      factory_->NewRunnableMethod(&AddRemoveThread::AddTask), 0);
+  }
+
+  void Quit() {
+    loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask());
+  }
+
+  virtual void Observe(int x) {
+    count_observes_++;
+
+    // If we're getting called after we removed ourselves from
+    // the list, that is very bad!
+    DCHECK(in_list_);
+
+    // This callback should fire on the appropriate thread
+    EXPECT_EQ(loop_, MessageLoop::current());
+
+    list_->RemoveObserver(this);
+    in_list_ = false;
+  }
+
+ private:
+  ObserverListThreadSafe<Foo>* list_;
+  MessageLoop* loop_;
+  bool in_list_;        // Are we currently registered for notifications.
+                        // in_list_ is only used on |this| thread.
+  Time start_;          // The time we started the test.
+
+  int count_observes_;  // Number of times we observed.
+  int count_addtask_;   // Number of times thread AddTask was called
+  bool do_notifies_;    // Whether these threads should do notifications.
+
+  ScopedRunnableMethodFactory<AddRemoveThread>* factory_;
+};
+
 }  // namespace
 
 TEST(ObserverListTest, BasicTest) {
   ObserverList<Foo> observer_list;
   Adder a(1), b(-1), c(1), d(-1);
-  Disrupter evil(observer_list, &c);
+  Disrupter evil(&observer_list, &c);
 
   observer_list.AddObserver(&a);
   observer_list.AddObserver(&b);
@@ -65,3 +172,91 @@
   EXPECT_EQ(d.total, -10);
 }
 
+TEST(ObserverListThreadSafeTest, BasicTest) {
+  MessageLoop loop;
+
+  scoped_refptr<ObserverListThreadSafe<Foo> > observer_list(
+      new ObserverListThreadSafe<Foo>);
+  Adder a(1);
+  Adder b(-1);
+  Adder c(1);
+  Adder d(-1);
+  ThreadSafeDisrupter evil(observer_list.get(), &c);
+
+  observer_list->AddObserver(&a);
+  observer_list->AddObserver(&b);
+
+  observer_list->Notify(&Foo::Observe, 10);
+  loop.RunAllPending();
+
+  observer_list->AddObserver(&evil);
+  observer_list->AddObserver(&c);
+  observer_list->AddObserver(&d);
+
+  observer_list->Notify(&Foo::Observe, 10);
+  loop.RunAllPending();
+
+  EXPECT_EQ(a.total, 20);
+  EXPECT_EQ(b.total, -20);
+  EXPECT_EQ(c.total, 0);
+  EXPECT_EQ(d.total, -10);
+}
+
+
+// A test driver for a multi-threaded notification loop.  Runs a number
+// of observer threads, each of which constantly adds/removes itself
+// from the observer list.  Optionally, if cross_thread_notifies is set
+// to true, the observer threads will also trigger notifications to
+// all observers.
+static void ThreadSafeObserverHarness(int num_threads, 
+                                      bool cross_thread_notifies) {
+  MessageLoop loop;
+
+  const int kMaxThreads = 15;
+  num_threads = num_threads > kMaxThreads ? kMaxThreads : num_threads;
+
+  scoped_refptr<ObserverListThreadSafe<Foo> > observer_list(
+      new ObserverListThreadSafe<Foo>);
+  Adder a(1);
+  Adder b(-1);
+  Adder c(1);
+  Adder d(-1);
+
+  observer_list->AddObserver(&a);
+  observer_list->AddObserver(&b);
+
+  AddRemoveThread* threaded_observer[kMaxThreads];
+  PlatformThreadHandle threads[kMaxThreads];
+  for (int index = 0; index < num_threads; index++) {
+    threaded_observer[index] = new AddRemoveThread(observer_list.get(), false);
+    EXPECT_TRUE(PlatformThread::Create(0,
+                threaded_observer[index], &threads[index]));
+  }
+
+  Time start = Time::Now();
+  while (true) {
+    if ((Time::Now() - start).InMilliseconds() > kThreadRunTime)
+      break;
+
+    observer_list->Notify(&Foo::Observe, 10);
+
+    loop.RunAllPending();
+  }
+
+  for (int index = 0; index < num_threads; index++) {
+    threaded_observer[index]->Quit();
+    PlatformThread::Join(threads[index]);
+  }
+}
+
+TEST(ObserverListThreadSafeTest, CrossThreadObserver) {
+  // Use 7 observer threads.  Notifications only come from
+  // the main thread.
+  ThreadSafeObserverHarness(7, false);
+}
+
+TEST(ObserverListThreadSafeTest, CrossThreadNotifications) {
+  // Use 3 observer threads.  Notifications will fire from
+  // the main thread and all 3 observer threads.
+  ThreadSafeObserverHarness(3, true);
+}