ChannelMojo: Introduce thread-safe Send()

With the thread-safe Send(), ChannelProxy() calls Channel::Send()
from the UI thread instead of PostTask()-ing the Channel to the IO thread.
This eliminates one extra thread hop, improves the IPC latency.

Times taken to ping-pong 14kb messages 500k times using ChannelProxy:

 * ChannelPosix:       25,306ms +/- 2,012ms
 * ChannelMojo before: 27,870ms +/- 4,205ms
 * ChannelMojo now:    19,758ms +/- 2,074ms

BUG=272113,473367
[email protected], [email protected]

Review URL: https://codereview.chromium.org/1054943005

Cr-Commit-Position: refs/heads/master@{#325911}
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
index 4689e5f6..b7192da 100644
--- a/ipc/mojo/ipc_channel_mojo.cc
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -251,6 +251,7 @@
     : mode_(mode),
       listener_(listener),
       peer_pid_(base::kNullProcessId),
+      io_runner_(io_runner),
       weak_factory_(this) {
   // Create MojoBootstrap after all members are set as it touches
   // ChannelMojo from a different thread.
@@ -293,31 +294,63 @@
 }
 
 void ChannelMojo::Close() {
-  message_reader_.reset();
+  scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted;
+
+  {
+    // |message_reader_| has to be cleared inside the lock,
+    // but the instance has to be deleted outside.
+    base::AutoLock l(lock_);
+    to_be_deleted = message_reader_.Pass();
+  }
+
   channel_info_.reset();
   ipc_support_.reset();
+  to_be_deleted.reset();
 }
 
 void ChannelMojo::OnBootstrapError() {
   listener_->OnChannelError();
 }
 
+namespace {
+
+// ClosingDeleter calls |CloseWithErrorIfPending| before deleting the
+// |MessagePipeReader|.
+struct ClosingDeleter {
+  typedef base::DefaultDeleter<internal::MessagePipeReader> DefaultType;
+
+  void operator()(internal::MessagePipeReader* ptr) const {
+    ptr->CloseWithErrorIfPending();
+    delete ptr;
+  }
+};
+
+} // namespace
+
 void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe,
                                     int32_t peer_pid) {
-  message_reader_ =
-      make_scoped_ptr(new internal::MessagePipeReader(pipe.Pass(), this));
+  scoped_ptr<internal::MessagePipeReader, ClosingDeleter> reader(
+      new internal::MessagePipeReader(pipe.Pass(), this));
 
-  for (size_t i = 0; i < pending_messages_.size(); ++i) {
-    bool sent = message_reader_->Send(make_scoped_ptr(pending_messages_[i]));
-    pending_messages_[i] = nullptr;
-    if (!sent) {
-      pending_messages_.clear();
-      listener_->OnChannelError();
-      return;
+  {
+    base::AutoLock l(lock_);
+    for (size_t i = 0; i < pending_messages_.size(); ++i) {
+      bool sent = reader->Send(make_scoped_ptr(pending_messages_[i]));
+      pending_messages_[i] = nullptr;
+      if (!sent) {
+        // OnChannelError() is notified through ClosingDeleter.
+        pending_messages_.clear();
+        LOG(ERROR)  << "Failed to flush pending messages";
+        return;
+      }
     }
-  }
 
-  pending_messages_.clear();
+    // We set |message_reader_| here and won't get any |pending_messages_|
+    // hereafter. Although we might have some if there is an error, we don't
+    // care. They cannot be sent anyway.
+    message_reader_.reset(reader.release());
+    pending_messages_.clear();
+  }
 
   set_peer_pid(peer_pid);
   listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
@@ -334,7 +367,9 @@
 }
 
 
+// Warning: Keep the implementation thread-safe.
 bool ChannelMojo::Send(Message* message) {
+  base::AutoLock l(lock_);
   if (!message_reader_) {
     pending_messages_.push_back(message);
     return true;
@@ -343,6 +378,10 @@
   return message_reader_->Send(make_scoped_ptr(message));
 }
 
+bool ChannelMojo::IsSendThreadSafe() const {
+  return false;
+}
+
 base::ProcessId ChannelMojo::GetPeerPID() const {
   return peer_pid_;
 }
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
index 40bc256..2f98b17 100644
--- a/ipc/mojo/ipc_channel_mojo.h
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -10,6 +10,7 @@
 #include "base/memory/scoped_ptr.h"
 #include "base/memory/scoped_vector.h"
 #include "base/memory/weak_ptr.h"
+#include "base/synchronization/lock.h"
 #include "ipc/ipc_channel.h"
 #include "ipc/ipc_channel_factory.h"
 #include "ipc/ipc_export.h"
@@ -91,6 +92,7 @@
   bool Connect() override;
   void Close() override;
   bool Send(Message* message) override;
+  bool IsSendThreadSafe() const override;
   base::ProcessId GetPeerPID() const override;
   base::ProcessId GetSelfPID() const override;
 
@@ -147,9 +149,16 @@
   Mode mode_;
   Listener* listener_;
   base::ProcessId peer_pid_;
+  scoped_refptr<base::TaskRunner> io_runner_;
   scoped_ptr<mojo::embedder::ChannelInfo,
              ChannelInfoDeleter> channel_info_;
 
+  // Guards |message_reader_| and |pending_messages_|
+  //
+  // * The contents of |pending_messages_| can be modified from any thread.
+  // * |message_reader_| is modified only from the IO thread,
+  //   but they can be referenced from other threads.
+  base::Lock lock_;
   scoped_ptr<internal::MessagePipeReader, ReaderDeleter> message_reader_;
   ScopedVector<Message> pending_messages_;
 
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
index 13bfbed..9dd26a6 100644
--- a/ipc/mojo/ipc_message_pipe_reader.cc
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -21,14 +21,19 @@
       delegate_(delegate),
       async_waiter_(
           new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
-                                           base::Unretained(this)))) {
+                                           base::Unretained(this)))),
+      pending_send_error_(MOJO_RESULT_OK) {
 }
 
 MessagePipeReader::~MessagePipeReader() {
+  // The pipe should be closed before deletion.
   CHECK(!IsValid());
+  DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
 }
 
 void MessagePipeReader::Close() {
+  // All pending errors should be signaled before Close().
+  DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
   async_waiter_.reset();
   pipe_.reset();
   OnPipeClosed();
@@ -39,6 +44,19 @@
   Close();
 }
 
+void MessagePipeReader::CloseWithErrorIfPending() {
+  if (pending_send_error_ == MOJO_RESULT_OK)
+    return;
+  MojoResult error = pending_send_error_;
+  pending_send_error_ = MOJO_RESULT_OK;
+  CloseWithError(error);
+  return;
+}
+
+void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
+  pending_send_error_ = error;
+}
+
 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
   DCHECK(IsValid());
 
@@ -57,7 +75,11 @@
 
   if (result != MOJO_RESULT_OK) {
     std::for_each(handles.begin(), handles.end(), &MojoClose);
-    CloseWithError(result);
+    // We cannot call CloseWithError() here as Send() is protected by
+    // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
+    // cannot call CloseWithError() also because Send() can be called from
+    // non-UI thread while OnPipeError() expects to be called on IO thread.
+    CloseWithErrorLater(result);
     return false;
   }
 
@@ -173,6 +195,13 @@
 }
 
 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
+  CloseWithErrorIfPending();
+  if (!IsValid()) {
+    // There was a pending error and it closed the pipe.
+    // We cannot do the work anymore.
+    return;
+  }
+
   if (wait_result != MOJO_RESULT_OK) {
     if (wait_result != MOJO_RESULT_ABORTED) {
       // FAILED_PRECONDITION happens every time the peer is dead so
diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h
index 74de71b..937930ee 100644
--- a/ipc/mojo/ipc_message_pipe_reader.h
+++ b/ipc/mojo/ipc_message_pipe_reader.h
@@ -79,6 +79,9 @@
   void Close();
   // Close the mesage pipe with notifying the client with the error.
   void CloseWithError(MojoResult error);
+  void CloseWithErrorLater(MojoResult error);
+  void CloseWithErrorIfPending();
+
   // Return true if the MessagePipe is alive.
   bool IsValid() { return pipe_.is_valid(); }
 
@@ -100,6 +103,7 @@
   // |delegate_| and |async_waiter_| are null once the message pipe is closed.
   Delegate* delegate_;
   scoped_ptr<AsyncHandleWaiter> async_waiter_;
+  MojoResult pending_send_error_;
 
   DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
 };