ChannelMojo: Introduce thread-safe Send()

With the thread-safe Send(), ChannelProxy() calls Channel::Send()
from the UI thread instead of PostTask()-ing the Channel to the IO thread.
This eliminates one extra thread hop, improves the IPC latency.

Times taken to ping-pong 14kb messages 500k times using ChannelProxy:

 * ChannelPosix:       25,306ms +/- 2,012ms
 * ChannelMojo before: 27,870ms +/- 4,205ms
 * ChannelMojo now:    19,758ms +/- 2,074ms

BUG=272113,473367
[email protected], [email protected]

Review URL: https://codereview.chromium.org/1054943005

Cr-Commit-Position: refs/heads/master@{#325911}
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
index 4689e5f6..b7192da 100644
--- a/ipc/mojo/ipc_channel_mojo.cc
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -251,6 +251,7 @@
     : mode_(mode),
       listener_(listener),
       peer_pid_(base::kNullProcessId),
+      io_runner_(io_runner),
       weak_factory_(this) {
   // Create MojoBootstrap after all members are set as it touches
   // ChannelMojo from a different thread.
@@ -293,31 +294,63 @@
 }
 
 void ChannelMojo::Close() {
-  message_reader_.reset();
+  scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted;
+
+  {
+    // |message_reader_| has to be cleared inside the lock,
+    // but the instance has to be deleted outside.
+    base::AutoLock l(lock_);
+    to_be_deleted = message_reader_.Pass();
+  }
+
   channel_info_.reset();
   ipc_support_.reset();
+  to_be_deleted.reset();
 }
 
 void ChannelMojo::OnBootstrapError() {
   listener_->OnChannelError();
 }
 
+namespace {
+
+// ClosingDeleter calls |CloseWithErrorIfPending| before deleting the
+// |MessagePipeReader|.
+struct ClosingDeleter {
+  typedef base::DefaultDeleter<internal::MessagePipeReader> DefaultType;
+
+  void operator()(internal::MessagePipeReader* ptr) const {
+    ptr->CloseWithErrorIfPending();
+    delete ptr;
+  }
+};
+
+} // namespace
+
 void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe,
                                     int32_t peer_pid) {
-  message_reader_ =
-      make_scoped_ptr(new internal::MessagePipeReader(pipe.Pass(), this));
+  scoped_ptr<internal::MessagePipeReader, ClosingDeleter> reader(
+      new internal::MessagePipeReader(pipe.Pass(), this));
 
-  for (size_t i = 0; i < pending_messages_.size(); ++i) {
-    bool sent = message_reader_->Send(make_scoped_ptr(pending_messages_[i]));
-    pending_messages_[i] = nullptr;
-    if (!sent) {
-      pending_messages_.clear();
-      listener_->OnChannelError();
-      return;
+  {
+    base::AutoLock l(lock_);
+    for (size_t i = 0; i < pending_messages_.size(); ++i) {
+      bool sent = reader->Send(make_scoped_ptr(pending_messages_[i]));
+      pending_messages_[i] = nullptr;
+      if (!sent) {
+        // OnChannelError() is notified through ClosingDeleter.
+        pending_messages_.clear();
+        LOG(ERROR)  << "Failed to flush pending messages";
+        return;
+      }
     }
-  }
 
-  pending_messages_.clear();
+    // We set |message_reader_| here and won't get any |pending_messages_|
+    // hereafter. Although we might have some if there is an error, we don't
+    // care. They cannot be sent anyway.
+    message_reader_.reset(reader.release());
+    pending_messages_.clear();
+  }
 
   set_peer_pid(peer_pid);
   listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
@@ -334,7 +367,9 @@
 }
 
 
+// Warning: Keep the implementation thread-safe.
 bool ChannelMojo::Send(Message* message) {
+  base::AutoLock l(lock_);
   if (!message_reader_) {
     pending_messages_.push_back(message);
     return true;
@@ -343,6 +378,10 @@
   return message_reader_->Send(make_scoped_ptr(message));
 }
 
+bool ChannelMojo::IsSendThreadSafe() const {
+  return false;
+}
+
 base::ProcessId ChannelMojo::GetPeerPID() const {
   return peer_pid_;
 }